Browse files

added command for daemons on tailable cursors

  • Loading branch information...
1 parent afa2b16 commit 23e72f6930bddd1783b8f6d2518ff30388bf741c @bamarni bamarni committed May 24, 2012
Showing with 113 additions and 0 deletions.
  1. +88 −0 Command/TailCursorDoctrineODMCommand.php
  2. +25 −0 Cursor/TailableCursorProcessorInterface.php
View
88 Command/TailCursorDoctrineODMCommand.php
@@ -0,0 +1,88 @@
+<?php
+
+/*
+ * This file is part of the Doctrine MongoDBBundle
+ *
+ * The code was originally distributed inside the Symfony framework.
+ *
+ * (c) Fabien Potencier <fabien@symfony.com>
+ * (c) Doctrine Project
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Doctrine\Bundle\MongoDBBundle\Command;
+
+use Doctrine\Bundle\MongoDBBundle\Cursor\TailableCursorProcessorInterface;
+use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
+use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+/**
+ * Command helping to configure a daemon listening to a tailable cursor. Works only with capped collections.
+ *
+ * @author Jonathan H. Wage <jonwage@gmail.com>
+ * @author Bilal Amarni <bilal.amarni@gmail.com>
+ */
+class TailCursorDoctrineODMCommand extends ContainerAwareCommand
+{
+ protected function configure()
+ {
+ $this
+ ->setName('doctrine:mongodb:tail-cursor')
+ ->setDescription('Tails a mongodb cursor and processes the documents that come through')
+ ->addArgument('document', InputArgument::REQUIRED, 'The document we are going to tail the cursor for.')
+ ->addArgument('finder', InputArgument::REQUIRED, 'The repository finder method which returns the cursor to tail.')
+ ->addArgument('processor', InputArgument::REQUIRED, 'The service id to use to process the documents.')
+ ->addOption('sleep-time', null, InputOption::VALUE_REQUIRED, 'The number of seconds to wait between two checks.', 10)
+ ;
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $dm = $this->getContainer()->get('doctrine.odm.mongodb.document_manager');
+ $repository = $dm->getRepository($input->getArgument('document'));
+ $repositoryReflection = new \ReflectionClass(get_class($repository));
+ $documentReflection = $repository->getDocumentManager()->getMetadataFactory()->getMetadataFor($input->getArgument('document'))->getReflectionClass();
+ $processor = $this->getContainer()->get($input->getArgument('processor'));
+ $sleepTime = $input->getOption('sleep-time');
+
+ if (!$processor instanceof TailableCursorProcessorInterface) {
+ throw new \InvalidArgumentException('A tailable cursor processor must implement the ProcessorInterface.');
+ }
+
+ $processorReflection = new \ReflectionClass(get_class($processor));
+ $method = $input->getArgument('finder');
+
+ $output->writeln(sprintf('Getting cursor for <info>%s</info> from <info>%s#%s</info>', $input->getArgument('document'), $repositoryReflection->getShortName(), $method));
+
+ $cursor = $repository->$method();
+
+ while (true) {
+ while (!$cursor->hasNext()) {
+ $output->writeln('<comment>Nothing found, waiting to try again</comment>');
+ // read all results so far, wait for more
+ sleep($sleepTime);
+ }
+ $cursor->next();
+ $document = $cursor->current();
+ $id = $document->getId();
+
+ $output->writeln(sprintf('Processing <info>%s</info> with id of <info>%s</info>', $documentReflection->getShortName(), (string) $id));
+ $output->writeln(sprintf(' <info>%s</info><comment>#</comment><info>process</info>(<info>%s</info> <comment>$document</comment>)', $processorReflection->getShortName(), $documentReflection->getShortName()));
+
+ try {
+ $processor->process($document);
+ } catch (\Exception $e) {
+ $output->writeln(sprintf('Error occurred while processing document: <error>%s</error>', $e->getMessage()));
+ continue;
+ }
+
+ $dm->flush();
+ $dm->clear();
+ }
+ }
+}
View
25 Cursor/TailableCursorProcessorInterface.php
@@ -0,0 +1,25 @@
+<?php
+
+/*
+ * This file is part of the Doctrine MongoDBBundle
+ *
+ * The code was originally distributed inside the Symfony framework.
+ *
+ * (c) Fabien Potencier <fabien@symfony.com>
+ * (c) Doctrine Project
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Doctrine\Bundle\MongoDBBundle\Cursor;
+
+/**
+ * Contract for tailable cursor processors.
+ *
+ * @author Jonathan H. Wage <jonwage@gmail.com>
+ */
+interface TailableCursorProcessorInterface
+{
+ function process($document);
+}

0 comments on commit 23e72f6

Please sign in to comment.