Skip to content
This repository has been archived by the owner on Nov 11, 2020. It is now read-only.

Commit

Permalink
Adding experimental retry functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
jwage committed Dec 17, 2011
1 parent e8e1e8e commit 5ccb182
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 39 deletions.
120 changes: 100 additions & 20 deletions lib/Doctrine/MongoDB/Collection.php
Expand Up @@ -66,6 +66,13 @@ class Collection
*/
protected $cmd;

/**
* Number of times to retry queries.
*
* @var mixed
*/
protected $numRetries;

/**
* Create a new MongoCollection instance that wraps a PHP MongoCollection instance
* for a given ClassMetadata instance.
Expand All @@ -74,13 +81,15 @@ class Collection
* @param Database $database The Database instance.
* @param EventManager $evm The EventManager instance.
* @param string $cmd Mongo cmd character.
* @param mixed $numRetries Number of times to retry queries.
*/
public function __construct(\MongoCollection $mongoCollection, Database $database, EventManager $evm, $cmd)
public function __construct(\MongoCollection $mongoCollection, Database $database, EventManager $evm, $cmd, $numRetries = false)
{
$this->mongoCollection = $mongoCollection;
$this->database = $database;
$this->eventManager = $evm;
$this->cmd = $cmd;
$this->numRetries = $numRetries;
}

/** @proxy */
Expand Down Expand Up @@ -194,7 +203,7 @@ protected function doFind(array $query, array $fields)

protected function wrapCursor(\MongoCursor $cursor, $query, $fields)
{
return new Cursor($cursor);
return new Cursor($cursor, $this->numRetries);
}

/** @override */
Expand All @@ -215,7 +224,10 @@ public function findOne(array $query = array(), array $fields = array())

protected function doFindOne(array $query, array $fields)
{
return $this->mongoCollection->findOne($query, $fields);
$mongoCollection = $this->mongoCollection;
return $this->retry(function() use ($mongoCollection, $query, $fields) {
return $mongoCollection->findOne($query, $fields);
});
}

public function findAndRemove(array $query, array $options = array())
Expand All @@ -242,12 +254,20 @@ protected function doFindAndRemove(array $query, array $options = array())
$command = array_merge($command, $options);

$document = null;
$result = $this->database->command($command);

$database = $this->database;
$result = $this->retry(function() use ($database, $command) {
return $database->command($command);
});

if (isset($result['value'])) {
$document = $result['value'];
if ($this->mongoCollection instanceof \MongoGridFS) {
// Remove the file data from the chunks collection
$this->mongoCollection->chunks->remove(array('files_id' => $document['_id']), $options);
$mongoCollection = $this->mongoCollection;
$this->retry(function() use ($mongoCollection, $document, $options) {
return $mongoCollection->chunks->remove(array('files_id' => $document['_id']), $options);
});
}
}
return $document;
Expand Down Expand Up @@ -275,7 +295,11 @@ protected function doFindAndUpdate(array $query, array $newObj, array $options)
$command['query'] = $query;
$command['update'] = $newObj;
$command = array_merge($command, $options);
$result = $this->database->command($command);

$database = $this->database;
$result = $this->retry(function() use ($database, $command) {
return $database->command($command);
});
return isset($result['value']) ? $result['value'] : null;
}

Expand All @@ -301,7 +325,11 @@ protected function doNear(array $near, array $query, array $options)
$command['near'] = $near;
$command['query'] = $query;
$command = array_merge($command, $options);
$result = $this->database->command($command);

$database = $this->database;
$result = $this->retry(function() use ($database, $command) {
return $database->command($command);
});
return new ArrayIterator(isset($result['results']) ? $result['results'] : array());
}

Expand All @@ -327,7 +355,11 @@ protected function doDistinct($field, array $query, array $options)
$command['key'] = $field;
$command['query'] = $query;
$command = array_merge($command, $options);
$result = $this->database->command($command);

$database = $this->database;
$result = $this->retry(function() use ($database, $command) {
return $database->command($command);
});
return new ArrayIterator(isset($result['values']) ? $result['values'] : array());
}

Expand Down Expand Up @@ -362,7 +394,10 @@ protected function doMapReduce($map, $reduce, array $out, array $query, array $o
$command['out'] = $out;
$command = array_merge($command, $options);

$result = $this->database->command($command);
$database = $this->database;
$result = $this->retry(function() use ($database, $command) {
return $database->command($command);
});

if (!$result['ok']) {
throw new \RuntimeException($result['errmsg']);
Expand All @@ -372,13 +407,18 @@ protected function doMapReduce($map, $reduce, array $out, array $query, array $o
return new ArrayIterator($result['results']);
}

return $this->database->selectCollection($result['result'])->find();
return $this->retry(function() use ($database, $command, $result) {
return $database->selectCollection($result['result'])->find();
});
}

/** @proxy */
public function count(array $query = array(), $limit = 0, $skip = 0)
{
return $this->mongoCollection->count($query, $limit, $skip);
$mongoCollection = $this->mongoCollection;
return $this->retry(function() use ($mongoCollection, $query, $limit, $skip) {
return $mongoCollection->count($query, $limit, $skip);
});
}

/** @proxy */
Expand Down Expand Up @@ -465,7 +505,10 @@ public function getDBRef(array $reference)

protected function doGetDBRef(array $reference)
{
return $this->mongoCollection->getDBRef($reference);
$mongoCollection = $this->mongoCollection;
return $this->retry(function() use ($mongoCollection, $reference) {
return $mongoCollection->getDBRef($reference);
});
}

/** @proxy */
Expand All @@ -486,7 +529,10 @@ public function group($keys, array $initial, $reduce, array $options = array())

protected function doGroup($keys, array $initial, $reduce, array $options)
{
$result = $this->mongoCollection->group($keys, $initial, $reduce, $options);
$mongoCollection = $this->mongoCollection;
$result = $this->retry(function() use ($mongoCollection, $keys, $initial, $reduce, $options) {
return $mongoCollection->group($keys, $initial, $reduce, $options);
});
return new ArrayIterator($result);
}

Expand All @@ -508,11 +554,20 @@ public function insert(array &$a, array $options = array())
protected function doInsert(array &$a, array $options)
{
$document = $a;
$result = $this->mongoCollection->insert($document, $options);
if ($result && isset($document['_id'])) {
$a['_id'] = $document['_id'];

$mongoCollection = $this->mongoCollection;
$result = $this->retry(function() use ($mongoCollection, $document, $options) {
$return = $mongoCollection->insert($document, $options);
return array(
'return' => $return,
'document' => $document
);
});

if ($result && isset($result['document']['_id'])) {
$a['_id'] = $result['document']['_id'];
}
return $result;
return $result['return'];
}

/** @proxy */
Expand Down Expand Up @@ -554,18 +609,43 @@ public function save(array &$a, array $options = array())

protected function doSave(array &$a, array $options)
{
return $this->mongoCollection->save($a, $options);
$mongoCollection = $this->mongoCollection;
return $this->retry(function() use ($mongoCollection, &$a, $options) {
return $mongoCollection->save($a, $options);
});
}

/** @proxy */
public function validate($scanData = false)
{
return $this->mongoCollection->validate($scanData);
$mongoCollection = $this->mongoCollection;
return $this->retry(function() use ($mongoCollection, $scanData) {
return $mongoCollection->validate($scanData);
});
}

/** @proxy */
public function __toString()
{
return $this->mongoCollection->__toString();
}
}

protected function retry(\Closure $retry)
{
if ($this->numRetries !== null && $this->numRetries !== false) {
for ($i = 1; $i <= $this->numRetries; $i++) {
try {
return $retry();
break;
} catch (\MongoException $e) {
if ($i === $this->numRetries) {
throw $e;
}
sleep(1);
}
}
} else {
return $retry();
}
}
}
46 changes: 45 additions & 1 deletion lib/Doctrine/MongoDB/Configuration.php
Expand Up @@ -34,7 +34,11 @@ class Configuration
*
* @var array $attributes
*/
protected $attributes = array('mongoCmd' => '$');
protected $attributes = array(
'mongoCmd' => '$',
'retryConnect' => 3,
'retryQuery' => 3
);

/**
* Set the logger callable.
Expand Down Expand Up @@ -74,4 +78,44 @@ public function setMongoCmd($cmd)
{
$this->attributes['mongoCmd'] = $cmd;
}

/**
* Get number of times to retry connect when errors occur.
*
* @return mixed True/False or number of times to retry.
*/
public function getRetryConnect()
{
return $this->attributes['retryConnect'];
}

/**
* Set number of times to retry connect when errors occur.
*
* @param string $retryConnect
*/
public function setRetryConnect($retryConnect)
{
$this->attributes['retryConnect'] = $retryConnect;
}

/**
* Get number of times to retry queries when
*
* @return mixed True/False or number of times to retry queries.
*/
public function getRetryQuery()
{
return $this->attributes['retryQuery'];
}

/**
* Set true/false whether or not to retry connect upon failure or number of times to retry.
*
* @param mixed $retryQuery True/false or number of times to retry queries.
*/
public function setRetryQuery($retryQuery)
{
$this->attributes['retryQuery'] = $retryQuery;
}
}
34 changes: 29 additions & 5 deletions lib/Doctrine/MongoDB/Connection.php
Expand Up @@ -91,10 +91,21 @@ public function initialize()
$this->eventManager->dispatchEvent(Events::preConnect, new EventArgs($this));
}

if ($this->server) {
$this->mongo = new \Mongo($this->server, $this->options);
$numRetries = $this->config->getRetryConnect();
if ($numRetries !== null && $numRetries !== false) {
for ($i = 1; $i <= $numRetries; $i++) {
try {
$this->initializeMongo();
break;
} catch (\MongoConnectionException $e) {
if ($i === $numRetries) {
throw $e;
}
sleep(1);
}
}
} else {
$this->mongo = new \Mongo();
$this->initializeMongo();
}

if ($this->eventManager->hasListeners(Events::postConnect)) {
Expand Down Expand Up @@ -261,16 +272,29 @@ public function selectDatabase($name)
*/
protected function wrapDatabase(\MongoDB $database)
{
$numRetries = $this->config->getRetryQuery();
if (null !== $this->config->getLoggerCallable()) {
return new LoggableDatabase(
$database, $this->eventManager, $this->cmd, $this->config->getLoggerCallable()
$database, $this->eventManager, $this->cmd, $numRetries, $this->config->getLoggerCallable()
);
}
return new Database(
$database, $this->eventManager, $this->cmd
$database, $this->eventManager, $this->cmd, $numRetries
);
}

/**
* Initialize new Mongo instance.
*/
protected function initializeMongo()
{
if ($this->server) {
$this->mongo = new \Mongo($this->server, $this->options);
} else {
$this->mongo = new \Mongo();
}
}

/** @proxy */
public function __toString()
{
Expand Down

0 comments on commit 5ccb182

Please sign in to comment.