Skip to content

Commit

Permalink
Merge 8b911e0 into 04b3e73
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Dec 11, 2017
2 parents 04b3e73 + 8b911e0 commit 538c7ff
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 18 deletions.
8 changes: 8 additions & 0 deletions Command/PopulateCommand.php
Expand Up @@ -18,6 +18,7 @@
use FOS\ElasticaBundle\Index\Resetter;
use FOS\ElasticaBundle\Persister\Event\Events;
use FOS\ElasticaBundle\Persister\Event\OnExceptionEvent;
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
use FOS\ElasticaBundle\Persister\Event\PostInsertObjectsEvent;
use FOS\ElasticaBundle\Persister\PagerPersisterInterface;
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
Expand Down Expand Up @@ -221,6 +222,13 @@ function(PostInsertObjectsEvent $event) use ($loggerClosure) {
$loggerClosure(count($event->getObjects()), $event->getPager()->getNbResults());
}
);

$this->dispatcher->addListener(
Events::POST_ASYNC_INSERT_OBJECTS,
function(PostAsyncInsertObjectsEvent $event) use ($loggerClosure) {
$loggerClosure($event->getObjectsCount(), $event->getPager()->getNbResults(), $event->getErrorMessage());
}
);
}

if ($options['ignore_errors']) {
Expand Down
2 changes: 2 additions & 0 deletions Persister/Event/Events.php
Expand Up @@ -11,6 +11,8 @@ final class Events

const POST_INSERT_OBJECTS = 'elastica.pager_persister.post_insert_objects';

const POST_ASYNC_INSERT_OBJECTS = 'elastica.pager_persister.post_async_insert_objects';

const ON_EXCEPTION = 'elastica.pager_persister.on_exception';

const POST_PERSIST = 'elastica.pager_persister.post_persist';
Expand Down
83 changes: 83 additions & 0 deletions Persister/Event/PostAsyncInsertObjectsEvent.php
@@ -0,0 +1,83 @@
<?php
namespace FOS\ElasticaBundle\Persister\Event;

use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOS\ElasticaBundle\Provider\PagerInterface;
use Symfony\Component\EventDispatcher\Event;

final class PostAsyncInsertObjectsEvent extends Event implements PersistEvent
{
/**
* @var PagerInterface
*/
private $pager;

/**
* @var ObjectPersisterInterface
*/
private $objectPersister;

/**
* @var int
*/
private $objectsCount;

/**
* @var string|null
*/
private $errorMessage;

/**
* @var array
*/
private $options;

public function __construct(PagerInterface $pager, ObjectPersisterInterface $objectPersister, $objectsCount, $errorMessage, array $options)
{
$this->pager = $pager;
$this->objectPersister = $objectPersister;
$this->objectsCount = $objectsCount;
$this->errorMessage = $errorMessage;
$this->options = $options;
}

/**
* @return PagerInterface
*/
public function getPager()
{
return $this->pager;
}

/**
* @return array
*/
public function getOptions()
{
return $this->options;
}

/**
* @return ObjectPersisterInterface
*/
public function getObjectPersister()
{
return $this->objectPersister;
}

/**
* @return null|string
*/
public function getErrorMessage()
{
return $this->errorMessage;
}

/**
* @return int
*/
public function getObjectsCount()
{
return $this->objectsCount;
}
}
21 changes: 16 additions & 5 deletions Persister/InPlacePagerPersister.php
Expand Up @@ -39,6 +39,16 @@ public function __construct(PersisterRegistry $registry, EventDispatcherInterfac
*/
public function insert(PagerInterface $pager, array $options = array())
{
$pager->setMaxPerPage(empty($options['batch_size']) ? 100 : $options['batch_size']);

$options = array_replace([
'batch_size' => $pager->getMaxPerPage(),
'first_page' => $pager->getCurrentPage(),
'last_page' => $pager->getNbPages(),
], $options);

$pager->setCurrentPage($options['first_page']);

$objectPersister = $this->registry->getPersister($options['indexName'], $options['typeName']);

try {
Expand All @@ -47,14 +57,15 @@ public function insert(PagerInterface $pager, array $options = array())
$pager = $event->getPager();
$options = $event->getOptions();

$pager->setMaxPerPage($options['batch_size']);

$lastPage = $options['last_page'];
$page = $pager->getCurrentPage();
while ($page <= $pager->getNbPages()) {
do {
$pager->setCurrentPage($page);

$this->insertPage($page, $pager, $objectPersister, $options);

$pager->setCurrentPage($page++);
}
$page++;
} while ($page <= $lastPage);
} finally {
$event = new PostPersistEvent($pager, $objectPersister, $options);
$this->dispatcher->dispatch(Events::POST_PERSIST, $event);
Expand Down
104 changes: 104 additions & 0 deletions Tests/Persister/Event/PostAsyncInsertObjectsEventTest.php
@@ -0,0 +1,104 @@
<?php
namespace FOS\ElasticaBundle\Tests\Persister\Event;

use FOS\ElasticaBundle\Persister\Event\PersistEvent;
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOS\ElasticaBundle\Provider\PagerInterface;
use Symfony\Component\EventDispatcher\Event;

final class PostAsyncInsertObjectsEventTest extends \PHPUnit_Framework_TestCase
{
public function testShouldBeSubClassOfEventClass()
{
$rc = new \ReflectionClass(PostAsyncInsertObjectsEvent::class);

$this->assertTrue($rc->isSubclassOf(Event::class));
}

public function testShouldImplementPersistEventInterface()
{
$rc = new \ReflectionClass(PostAsyncInsertObjectsEvent::class);

$this->assertTrue($rc->implementsInterface(PersistEvent::class));
}

public function testShouldFinal()
{
$rc = new \ReflectionClass(PostAsyncInsertObjectsEvent::class);

$this->assertTrue($rc->isFinal());
}

public function testCouldBeConstructedWithPagerAndObjectPersisterAndObjectsCountAndOptions()
{
new PostAsyncInsertObjectsEvent(
$this->createPagerMock(),
$this->createObjectPersisterMock(),
123,
$errorMessage = '',
$options = []
);
}

public function testShouldAllowGetPagerSetInConstructor()
{
$expectedPager = $this->createPagerMock();

$event = new PostAsyncInsertObjectsEvent($expectedPager, $this->createObjectPersisterMock(), 123, '', []);

$this->assertSame($expectedPager, $event->getPager());
}

public function testShouldAllowGetObjectPersisterSetInConstructor()
{
$expectedPersister = $this->createObjectPersisterMock();

$event = new PostAsyncInsertObjectsEvent($this->createPagerMock(), $expectedPersister, 123, '', []);

$this->assertSame($expectedPersister, $event->getObjectPersister());
}

public function testShouldAllowGetOptionsSetInConstructor()
{
$expectedOptions = ['foo' => 'fooVal', 'bar' => 'barVal'];

$event = new PostAsyncInsertObjectsEvent($this->createPagerMock(), $this->createObjectPersisterMock(), 123, '', $expectedOptions);

$this->assertSame($expectedOptions, $event->getOptions());
}

public function testShouldAllowGetObjectsSetInConstructor()
{
$expectedObjectsCount = 321;

$event = new PostAsyncInsertObjectsEvent($this->createPagerMock(), $this->createObjectPersisterMock(), $expectedObjectsCount, '', []);

$this->assertSame($expectedObjectsCount, $event->getObjectsCount());
}

public function testShouldAllowGetErrorMessageSetInConstructor()
{
$expectedErrorMessage = 'theErrorMessage';

$event = new PostAsyncInsertObjectsEvent($this->createPagerMock(), $this->createObjectPersisterMock(), [], 'theErrorMessage', []);

$this->assertSame($expectedErrorMessage, $event->getErrorMessage());
}

/**
* @return ObjectPersisterInterface|\PHPUnit_Framework_MockObject_MockObject
*/
private function createObjectPersisterMock()
{
return $this->getMock(ObjectPersisterInterface::class, [], [], '', false);
}

/**
* @return PagerInterface|\PHPUnit_Framework_MockObject_MockObject
*/
private function createPagerMock()
{
return $this->getMock(PagerInterface::class, [], [], '', false);
}
}

0 comments on commit 538c7ff

Please sign in to comment.