Skip to content

Commit

Permalink
Add BatchWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
Baachi authored and ddeboer committed May 30, 2015
1 parent a53efc4 commit 5726169
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 94 deletions.
57 changes: 57 additions & 0 deletions src/Writer/BatchWriter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

namespace Ddeboer\DataImport\Writer;

use Ddeboer\DataImport\Writer;

/**
* @author Markus Bachmann <markus.bachmann@bachi.biz>
*/
class BatchWriter implements Writer
{
private $delegate;

private $size;

private $queue;

public function __construct(Writer $delegate, $size = 20)
{
$this->delegate = $delegate;
$this->size = $size;
}

public function prepare()
{
$this->delegate->prepare();

$this->queue = new \SplQueue();
}

public function writeItem(array $item)
{
$this->queue->push($item);

if (count($this->queue) >= $this->size) {
$this->flush();
}
}

public function finish()
{
$this->flush();

$this->delegate->finish();
}

private function flush()
{
foreach ($this->queue as $item) {
$this->delegate->writeItem($item);
}

if ($this->delegate instanceof FlushableWriter) {
$this->delegate->flush();
}
}
}
51 changes: 5 additions & 46 deletions src/Writer/DoctrineWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
* @author David de Boer <david@ddeboer.nl>
*/
class DoctrineWriter implements Writer
class DoctrineWriter implements Writer, FlushableWriter
{
/**
* @var EntityManagerInterface
Expand All @@ -38,18 +38,6 @@ class DoctrineWriter implements Writer
*/
protected $entityMetadata;

/**
* Number of entities to be persisted per flush
*
* @var integer
*/
protected $batchSize = 20;

/**
* @var integer
*/
protected $counter = 0;

/**
* Original Doctrine logger
*
Expand Down Expand Up @@ -83,37 +71,15 @@ public function __construct(EntityManagerInterface $entityManager, $entityName,
$this->entityMetadata = $entityManager->getClassMetadata($entityName);
//translate entityName in case a namespace alias is used
$this->entityName = $this->entityMetadata->getName();
if($index) {
if(is_array($index)) {
if ($index) {
if (is_array($index)) {
$this->lookupFields = $index;
} else {
$this->lookupFields = [$index];
}
}
}

/**
* @return integer
*/
public function getBatchSize()
{
return $this->batchSize;
}

/**
* Set number of entities that may be persisted before a new flush
*
* @param integer $batchSize
*
* @return $this
*/
public function setBatchSize($batchSize)
{
$this->batchSize = $batchSize;

return $this;
}

/**
* @return boolean
*/
Expand Down Expand Up @@ -199,8 +165,7 @@ protected function setValue($entity, $value, $setter)
*/
public function finish()
{
$this->entityManager->flush();
$this->entityManager->clear($this->entityName);
$this->flush();
$this->reEnableLogging();
}

Expand All @@ -209,17 +174,12 @@ public function finish()
*/
public function writeItem(array $item)
{
$this->counter++;
$entity = $this->findOrCreateItem($item);

$this->loadAssociationObjectsToEntity($item, $entity);
$this->updateEntity($item, $entity);

$this->entityManager->persist($entity);

if (($this->counter % $this->batchSize) == 0) {
$this->flushAndClear();
}
}

/**
Expand All @@ -230,7 +190,6 @@ protected function updateEntity(array $item, $entity)
{
$fieldNames = array_merge($this->entityMetadata->getFieldNames(), $this->entityMetadata->getAssociationNames());
foreach ($fieldNames as $fieldName) {

$value = null;
if (isset($item[$fieldName])) {
$value = $item[$fieldName];
Expand Down Expand Up @@ -339,7 +298,7 @@ protected function findOrCreateItem(array $item)
/**
* Flush and clear the entity manager
*/
protected function flushAndClear()
public function flush()
{
$this->entityManager->flush();
$this->entityManager->clear($this->entityName);
Expand Down
8 changes: 8 additions & 0 deletions src/Writer/FlushableWriter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Ddeboer\DataImport\Writer;

interface FlushableWriter
{
public function flush();
}
62 changes: 43 additions & 19 deletions src/Writer/PdoWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*
* @author Stefan Warman
*/
class PdoWriter implements Writer
class PdoWriter implements Writer, FlushableWriter
{
use WriterTemplate;

/**
* @var \PDO
*/
Expand All @@ -34,6 +32,11 @@ class PdoWriter implements Writer
*/
protected $statement;

/**
* @var array
*/
private $stack;

/**
* Note if your table name is a reserved word for your target DB you should quote it in the appropriate way e.g.
* for MySQL enclose the name in `backticks`.
Expand All @@ -45,38 +48,59 @@ public function __construct(\PDO $pdo, $tableName)
{
$this->pdo = $pdo;
$this->tableName = $tableName;

if (\PDO::ERRMODE_EXCEPTION !== $this->pdo->getAttribute(\PDO::ATTR_ERRMODE)) {
throw new WriterException('Please set the pdo error mode to PDO::ERRMODE_EXCEPTION');
}
}

public function prepare()
{
$this->stack = [];
$this->statement = null;
}

/**
* {@inheritdoc}
*/
public function writeItem(array $item)
{
try {
//prepare the statment as soon as we know how many values there are
if (!$this->statement) {

if (null === $this->statement) {
try {
$this->statement = $this->pdo->prepare(sprintf(
'INSERT INTO %s(%s) VALUES (%s)',
'INSERT INTO %s (%s) VALUES (%s)',
$this->tableName,
implode(',', array_keys($item)),
substr(str_repeat('?,', count($item)), 0, -1)
));

//for PDO objects that do not have exceptions enabled
if (!$this->statement) {
throw new WriterException('Failed to prepare write statement for item: '.implode(',', $item));
}
} catch (\PDOException $e) {
throw new WriterException('Failed to send query', null, $e);
}
}

$this->stack[] = array_values($item);
}

public function finish()
{
$this->flush();

//do the insert
if (!$this->statement->execute(array_values($item))) {
throw new WriterException('Failed to write item: '.implode(',', $item));
return $this;
}

public function flush()
{
$this->pdo->beginTransaction();

try {
foreach ($this->stack as $data) {
$this->statement->execute($data);
}
$this->stack = [];

} catch (\Exception $e) {
//convert exception so the abstracton doesn't leak
throw new WriterException(sprintf('Write failed (%s)', $e->getMessage()), null, $e);
$this->pdo->commit();
} catch (\PDOException $e) {
throw new WriterException('Failed to write to database', null, $e);
}
}
}
38 changes: 38 additions & 0 deletions tests/Writer/BatchWriterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Ddeboer\DataImport\Tests\Writer;

use Ddeboer\DataImport\Writer\BatchWriter;

class BatchWriterTest extends \PHPUnit_Framework_TestCase
{
public function testWriteItem()
{
$delegate = $this->getMock('Ddeboer\DataImport\Writer');
$writer = new BatchWriter($delegate);

$delegate->expects($this->once())
->method('prepare');

$delegate->expects($this->never())
->method('writeItem');

$writer->prepare();
$writer->writeItem(['Test']);
}

public function testFlush()
{
$delegate = $this->getMock('Ddeboer\DataImport\Writer');
$writer = new BatchWriter($delegate);

$delegate->expects($this->exactly(20))
->method('writeItem');

$writer->prepare();

for ($i = 0; $i < 20; $i++) {
$writer->writeItem(['Test']);
}
}
}
29 changes: 0 additions & 29 deletions tests/Writer/DoctrineWriterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,6 @@ public function testWriteItem()
$writer->writeItem($item);
}

public function testBatches()
{
$em = $this->getEntityManager();
$em->expects($this->exactly(11))
->method('persist');

$em->expects($this->exactly(4))
->method('flush');

$writer = new DoctrineWriter($em, 'DdeboerDataImport:TestEntity');
$writer->prepare();

$writer->setBatchSize(3);
$this->assertEquals(3, $writer->getBatchSize());

$association = new TestEntity();
$item = array(
'firstProperty' => 'some value',
'secondProperty' => 'some other value',
'firstAssociation'=> $association
);

for ($i = 0; $i < 11; $i++) {
$writer->writeItem($item);
}

$writer->finish();
}

protected function getEntityManager()
{
$em = $this->getMockBuilder('Doctrine\ORM\EntityManager')
Expand Down

0 comments on commit 5726169

Please sign in to comment.