Permalink
Fetching contributors…
Cannot retrieve contributors at this time
98 lines (82 sloc) 4.28 KB
<?php
/*
* This file is part of the Doctrine MongoDBBundle
*
* The code was originally distributed inside the Symfony framework.
*
* (c) Fabien Potencier <fabien@symfony.com>
* (c) Doctrine Project
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Doctrine\Bundle\MongoDBBundle\Command;
use Doctrine\Bundle\MongoDBBundle\Cursor\TailableCursorProcessorInterface;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Command helping to configure a daemon listening to a tailable cursor. Works only with capped collections.
*
* @author Jonathan H. Wage <jonwage@gmail.com>
* @author Bilal Amarni <bilal.amarni@gmail.com>
*/
class TailCursorDoctrineODMCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('doctrine:mongodb:tail-cursor')
->setDescription('Tails a mongodb cursor and processes the documents that come through')
->addArgument('document', InputArgument::REQUIRED, 'The document we are going to tail the cursor for.')
->addArgument('finder', InputArgument::REQUIRED, 'The repository finder method which returns the cursor to tail.')
->addArgument('processor', InputArgument::REQUIRED, 'The service id to use to process the documents.')
->addOption('no-flush', null, InputOption::VALUE_NONE, 'If set, the document manager won\'t be flushed after each document processing')
->addOption('sleep-time', null, InputOption::VALUE_REQUIRED, 'The number of seconds to wait between two checks.', 10)
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$dm = $this->getContainer()->get('doctrine_mongodb.odm.document_manager');
$repository = $dm->getRepository($input->getArgument('document'));
$repositoryReflection = new \ReflectionClass($repository);
$documentReflection = $repository->getDocumentManager()->getMetadataFactory()->getMetadataFor($input->getArgument('document'))->getReflectionClass();
$processor = $this->getContainer()->get($input->getArgument('processor'));
$sleepTime = $input->getOption('sleep-time');
if (!$processor instanceof TailableCursorProcessorInterface) {
throw new \InvalidArgumentException('A tailable cursor processor must implement the ProcessorInterface.');
}
$processorReflection = new \ReflectionClass($processor);
$method = $input->getArgument('finder');
$output->writeln(sprintf('Getting cursor for <info>%s</info> from <info>%s#%s</info>', $input->getArgument('document'), $repositoryReflection->getShortName(), $method));
$cursor = $repository->$method();
while (true) {
while (!$cursor->hasNext()) {
if (!$cursor->valid()) {
$output->writeln('<error>Invalid cursor, requerying</error>');
$cursor = $repository->$method();
}
$output->writeln('<comment>Nothing found, waiting to try again</comment>');
// read all results so far, wait for more
sleep($sleepTime);
}
$cursor->next();
$document = $cursor->current();
$id = $document->getId();
$output->writeln(sprintf('Processing <info>%s</info> with id of <info>%s</info>', $documentReflection->getShortName(), (string) $id));
$output->writeln(sprintf(' <info>%s</info><comment>#</comment><info>process</info>(<info>%s</info> <comment>$document</comment>)', $processorReflection->getShortName(), $documentReflection->getShortName()));
try {
$processor->process($document);
} catch (\Exception $e) {
$output->writeln(sprintf('Error occurred while processing document: <error>%s</error>', $e->getMessage()));
continue;
}
if (!$input->getOption('no-flush')) {
$dm->flush();
}
$dm->clear();
}
}
}