Skip to content

Commit

Permalink
Merge branch 'v1.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
jmikola committed Apr 18, 2018
2 parents dadf1e6 + b27017d commit 480bc58
Show file tree
Hide file tree
Showing 11 changed files with 623 additions and 30 deletions.
104 changes: 79 additions & 25 deletions php_phongo.c
Expand Up @@ -719,7 +719,7 @@ bool phongo_cursor_advance_and_check_for_error(mongoc_cursor_t* cursor TSRMLS_DC
return true;
} /* }}} */

int phongo_execute_query(mongoc_client_t* client, const char* namespace, zval* zquery, zval* options, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC) /* {{{ */
bool phongo_execute_query(mongoc_client_t* client, const char* namespace, zval* zquery, zval* options, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC) /* {{{ */
{
const php_phongo_query_t* query;
mongoc_cursor_t* cursor;
Expand Down Expand Up @@ -797,7 +797,29 @@ static bson_t* create_wrapped_command_envelope(const char* db, bson_t* reply)
return tmp;
}

int phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t type, const char* db, zval* zcommand, zval* options, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC) /* {{{ */
static zval *phongo_create_implicit_session(mongoc_client_t *client TSRMLS_DC) /* {{{ */
{
mongoc_client_session_t *cs;
zval *zsession;

cs = mongoc_client_start_session(client, NULL, NULL);

if (!cs) {
return NULL;
}

#if PHP_VERSION_ID >= 70000
zsession = ecalloc(sizeof(zval), 1);
#else
ALLOC_INIT_ZVAL(zsession);
#endif

phongo_session_init(zsession, cs TSRMLS_CC);

return zsession;
} /* }}} */

bool phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t type, const char* db, zval* zcommand, zval* options, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC) /* {{{ */
{
const php_phongo_command_t* command;
bson_iter_t iter;
Expand All @@ -807,38 +829,50 @@ int phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t ty
mongoc_cursor_t* cmd_cursor;
zval* zreadPreference = NULL;
zval* zsession = NULL;
int result;
bool result = false;
bool free_reply = false;
bool free_zsession = false;

command = Z_COMMAND_OBJ_P(zcommand);

if ((type & PHONGO_OPTION_READ_CONCERN) && !phongo_parse_read_concern(options, &opts TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

if ((type & PHONGO_OPTION_READ_PREFERENCE) && !phongo_parse_read_preference(options, &zreadPreference TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

if (!phongo_parse_session(options, client, &opts, &zsession TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

/* If an explicit session was not provided, attempt to create an implicit
* client session (ignoring any errors). */
if (!zsession) {
zsession = phongo_create_implicit_session(client TSRMLS_CC);

if (zsession) {
free_zsession = true;

if (!mongoc_client_session_append(Z_SESSION_OBJ_P(zsession)->client_session, &opts, NULL)) {
phongo_throw_exception(PHONGO_ERROR_INVALID_ARGUMENT TSRMLS_CC, "Error appending implicit \"sessionId\" option");
goto cleanup;
}
}
}

if ((type & PHONGO_OPTION_WRITE_CONCERN) && !phongo_parse_write_concern(options, &opts, NULL TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

if (!BSON_APPEND_INT32(&opts, "serverId", server_id)) {
phongo_throw_exception(PHONGO_ERROR_INVALID_ARGUMENT TSRMLS_CC, "Error appending \"serverId\" option");
bson_destroy(&opts);
return false;
goto cleanup;
}

/* Although "opts" already always includes the serverId option, the read
Expand All @@ -861,9 +895,11 @@ int phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t ty
default:
/* Should never happen, but if it does: exception */
phongo_throw_exception(PHONGO_ERROR_LOGIC TSRMLS_CC, "Type '%d' should never have been passed to phongo_execute_command, please file a bug report", type);
bson_destroy(&opts);
return false;
goto cleanup;
}

free_reply = true;

if (!result) {
/* Server errors (other than ExceededTimeLimit) and write concern errors
* may use CommandException and report the result document for the
Expand All @@ -888,22 +924,18 @@ int phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t ty
} else {
phongo_throw_exception_from_bson_error_t(&error TSRMLS_CC);
}
bson_destroy(&reply);
bson_destroy(&opts);
return false;
goto cleanup;
}

bson_destroy(&opts);

if (!return_value_used) {
bson_destroy(&reply);
return true;
goto cleanup;
}

/* According to mongoc_cursor_new_from_command_reply(), the reply bson_t
* is ultimately destroyed on both success and failure. */
if (bson_iter_init_find(&iter, &reply, "cursor") && BSON_ITER_HOLDS_DOCUMENT(&iter)) {
bson_t initial_reply = BSON_INITIALIZER;
bson_error_t error = {0};

bson_copy_to(&reply, &initial_reply);

Expand All @@ -917,17 +949,39 @@ int phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t ty
bson_append_int64(&initial_reply, "batchSize", -1, command->batch_size);
}

if (zsession && !mongoc_client_session_append(Z_SESSION_OBJ_P(zsession)->client_session, &initial_reply, &error)) {
phongo_throw_exception_from_bson_error_t(&error TSRMLS_CC);
bson_destroy(&initial_reply);
result = false;
goto cleanup;
}

cmd_cursor = mongoc_cursor_new_from_command_reply(client, &initial_reply, server_id);
bson_destroy(&reply);
} else {
bson_t* wrapped_reply = create_wrapped_command_envelope(db, &reply);

cmd_cursor = mongoc_cursor_new_from_command_reply(client, wrapped_reply, server_id);
bson_destroy(&reply);
}

phongo_cursor_init_for_command(return_value, client, cmd_cursor, db, zcommand, zreadPreference, zsession TSRMLS_CC);
return true;

cleanup:
bson_destroy(&opts);

if (free_reply) {
bson_destroy(&reply);
}

if (free_zsession) {
#if PHP_VERSION_ID >= 70000
zval_ptr_dtor(zsession);
efree(zsession);
#else
zval_ptr_dtor(&zsession);
#endif
}

return result;
} /* }}} */
/* }}} */

Expand Down
4 changes: 2 additions & 2 deletions php_phongo.h
Expand Up @@ -134,8 +134,8 @@ void phongo_readconcern_init(zval* return_value, const mongoc_read_concern_t* re
void phongo_readpreference_init(zval* return_value, const mongoc_read_prefs_t* read_prefs TSRMLS_DC);
void phongo_writeconcern_init(zval* return_value, const mongoc_write_concern_t* write_concern TSRMLS_DC);
bool phongo_execute_bulk_write(mongoc_client_t* client, const char* namespace, php_phongo_bulkwrite_t* bulk_write, zval* zwriteConcern, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC);
int phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t type, const char* db, zval* zcommand, zval* zreadPreference, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC);
int phongo_execute_query(mongoc_client_t* client, const char* namespace, zval* zquery, zval* zreadPreference, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC);
bool phongo_execute_command(mongoc_client_t* client, php_phongo_command_type_t type, const char* db, zval* zcommand, zval* zreadPreference, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC);
bool phongo_execute_query(mongoc_client_t* client, const char* namespace, zval* zquery, zval* zreadPreference, uint32_t server_id, zval* return_value, int return_value_used TSRMLS_DC);

bool phongo_cursor_advance_and_check_for_error(mongoc_cursor_t* cursor TSRMLS_DC);

Expand Down
22 changes: 21 additions & 1 deletion src/MongoDB/Cursor.c
Expand Up @@ -28,6 +28,22 @@

zend_class_entry* php_phongo_cursor_ce;

/* Check if the cursor is exhausted (i.e. ID is zero) and free any reference to
* the session. Calling this function during iteration will allow an implicit
* session to return to the pool immediately after a getMore indicates that the
* server has no more results to return. */
static void php_phongo_cursor_free_session_if_exhausted(php_phongo_cursor_t *cursor) /* {{{ */
{
if (mongoc_cursor_get_id(cursor->cursor)) {
return;
}

if (!Z_ISUNDEF(cursor->session)) {
zval_ptr_dtor(&cursor->session);
ZVAL_UNDEF(&cursor->session);
}
} /* }}} */

static void php_phongo_cursor_free_current(php_phongo_cursor_t* cursor) /* {{{ */
{
if (!Z_ISUNDEF(cursor->visitor_data.zchild)) {
Expand Down Expand Up @@ -117,6 +133,8 @@ static void php_phongo_cursor_iterator_move_forward(zend_object_iterator* iter T
phongo_throw_exception_from_bson_error_t(&error TSRMLS_CC);
}
}

php_phongo_cursor_free_session_if_exhausted(cursor);
} /* }}} */

static void php_phongo_cursor_iterator_rewind(zend_object_iterator* iter TSRMLS_DC) /* {{{ */
Expand Down Expand Up @@ -147,6 +165,8 @@ static void php_phongo_cursor_iterator_rewind(zend_object_iterator* iter TSRMLS_
if (doc) {
php_phongo_bson_to_zval_ex(bson_get_data(doc), doc->len, &cursor->visitor_data);
}

php_phongo_cursor_free_session_if_exhausted(cursor);
} /* }}} */

static zend_object_iterator_funcs php_phongo_cursor_iterator_funcs = {
Expand Down Expand Up @@ -441,7 +461,7 @@ static HashTable* php_phongo_cursor_get_debug_info(zval* object, int* is_temp TS
*is_temp = 1;
intern = Z_CURSOR_OBJ_P(object);

array_init_size(&retval, 9);
array_init_size(&retval, 10);

if (intern->database) {
ADD_ASSOC_STRING(&retval, "database", intern->database);
Expand Down
133 changes: 133 additions & 0 deletions tests/cursor/bug1152-001.phpt
@@ -0,0 +1,133 @@
--TEST--
PHPC-1152: Command cursors should use the same session for getMore and killCursors (implicit)
--SKIPIF--
<?php if (PHP_INT_SIZE !== 8) { die("skip Can't represent 64-bit ints on a 32-bit platform"); } ?>
<?php require __DIR__ . "/../utils/basic-skipif.inc"; ?>
<?php NEEDS_CRYPTO(); ?>
<?php NEEDS('STANDALONE'); ?>
<?php NEEDS_ATLEAST_MONGODB_VERSION(STANDALONE, "3.6"); ?>
<?php CLEANUP(STANDALONE); ?>
--FILE--
<?php
require_once __DIR__ . "/../utils/basic.inc";

class Test implements MongoDB\Driver\Monitoring\CommandSubscriber
{
private $lsidByCursorId = [];
private $lsidByRequestId = [];

public function executeCommand()
{
$manager = new MongoDB\Driver\Manager(STANDALONE);

$bulk = new MongoDB\Driver\BulkWrite;
$bulk->insert(['_id' => 1]);
$bulk->insert(['_id' => 2]);
$bulk->insert(['_id' => 3]);
$manager->executeBulkWrite(NS, $bulk);

$command = new MongoDB\Driver\Command([
'aggregate' => COLLECTION_NAME,
'pipeline' => [['$match' => new stdClass]],
'cursor' => ['batchSize' => 2],
]);

MongoDB\Driver\Monitoring\addSubscriber($this);

/* By creating two cursors with the same name, PHP's reference counting
* will destroy the first after the second is created. Note that
* mongoc_cursor_destroy also destroys implicit sessions and returns
* them to the LIFO pool. This sequencing allows us to test that getMore
* and killCursors use the session ID corresponding to the original
* aggregate command. */
$cursor = $manager->executeCommand(DATABASE_NAME, $command);
$cursor->toArray();

$cursor = $manager->executeCommand(DATABASE_NAME, $command);
$cursor->toArray();

$cursor = $manager->executeCommand(DATABASE_NAME, $command);
$cursor = $manager->executeCommand(DATABASE_NAME, $command);
unset($cursor);

MongoDB\Driver\Monitoring\removeSubscriber($this);

/* We should expect two unique session IDs over the course of the test,
* since at most two implicit sessions would have been in use at any
* given time. */
printf("Unique session IDs used: %d\n", count(array_unique($this->lsidByRequestId)));
}

public function commandStarted(MongoDB\Driver\Monitoring\CommandStartedEvent $event)
{
$requestId = $event->getRequestId();
$sessionId = bin2hex((string) $event->getCommand()->lsid->id);

printf("%s session ID: %s\n", $event->getCommandName(), $sessionId);

if ($event->getCommandName() === 'aggregate') {
if (isset($this->lsidByRequestId[$requestId])) {
throw new UnexpectedValueException('Previous command observed for request ID: ' . $requestId);
}

$this->lsidByRequestId[$requestId] = $sessionId;
}

if ($event->getCommandName() === 'getMore') {
$cursorId = $event->getCommand()->getMore;

if ( ! isset($this->lsidByCursorId[$cursorId])) {
throw new UnexpectedValueException('No previous command observed for cursor ID: ' . $cursorId);
}

printf("getMore used same session as aggregate: %s\n", $sessionId === $this->lsidByCursorId[$cursorId] ? 'yes' : 'no');
}

if ($event->getCommandName() === 'killCursors') {
$cursorId = $event->getCommand()->cursors[0];

if ( ! isset($this->lsidByCursorId[$cursorId])) {
throw new UnexpectedValueException('No previous command observed for cursor ID: ' . $cursorId);
}

printf("killCursors used same session as aggregate: %s\n", $sessionId === $this->lsidByCursorId[$cursorId] ? 'yes' : 'no');
}
}

public function commandSucceeded(MongoDB\Driver\Monitoring\CommandSucceededEvent $event)
{
/* Associate the aggregate's session ID with its cursor ID so it can be
* looked up by the subsequent getMore or killCursors */
if ($event->getCommandName() === 'aggregate') {
$cursorId = $event->getReply()->cursor->id;
$requestId = $event->getRequestId();

$this->lsidByCursorId[$cursorId] = $this->lsidByRequestId[$requestId];
}
}

public function commandFailed(MongoDB\Driver\Monitoring\CommandFailedEvent $event)
{
}
}

(new Test)->executeCommand();

?>
===DONE===
<?php exit(0); ?>
--EXPECTF--
aggregate session ID: %x
getMore session ID: %x
getMore used same session as aggregate: yes
aggregate session ID: %x
getMore session ID: %x
getMore used same session as aggregate: yes
aggregate session ID: %x
aggregate session ID: %x
killCursors session ID: %x
killCursors used same session as aggregate: yes
killCursors session ID: %x
killCursors used same session as aggregate: yes
Unique session IDs used: 2
===DONE===

0 comments on commit 480bc58

Please sign in to comment.