Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preload data when loading and discard if aborted due a violation in a… #26

Merged
merged 1 commit into from
Sep 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions src/Load/DatabaseLoader/BatchInsUpdStmt.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
/**
* Author: Courtney Miles
* Date: 26/08/18
* Time: 9:07 AM
* Time: 8:53 AM
*/

namespace MilesAsylum\Slurp\Load\DatabaseLoader;

use MilesAsylum\Slurp\Load\Exception\MissingValueException;

abstract class AbstractBatchStmt implements BatchStmtInterface
class BatchInsertManager implements BatchManagerInterface
{
/**
* @var \PDO
Expand All @@ -26,11 +26,50 @@ abstract class AbstractBatchStmt implements BatchStmtInterface
*/
protected $columns;

public function __construct(\PDO $pdo, string $table, array $columns)
/**
* @var QueryFactory
*/
private $queryFactory;

/**
* @var \PDOStatement[]
*/
private $preparedBatchStmts = [];

public function __construct(\PDO $pdo, string $table, array $columns, QueryFactory $queryFactory)
{
$this->pdo = $pdo;
$this->table = $table;
$this->columns = $columns;
$this->queryFactory = $queryFactory;
}

/**
* @param array[] $rows
*/
public function write(array $rows)
{
if (!empty($rows)) {
$this->getPreparedBatchStmt(count($rows))
->execute(
$this->convertRowCollectionToParams($rows)
);
}
}

protected function getPreparedBatchStmt($count): \PDOStatement
{
if (!isset($this->preparedBatchStmts[$count])) {
$this->preparedBatchStmts[$count] = $this->pdo->prepare(
$this->queryFactory->createInsertQuery(
$this->table,
$this->columns,
$count
)
);
}

return $this->preparedBatchStmts[$count];
}

protected function ensureColumnMatch($rowId, array $rowValues): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace MilesAsylum\Slurp\Load\DatabaseLoader;

interface BatchStmtInterface
interface BatchManagerInterface
{
public function write(array $rows);
}
71 changes: 64 additions & 7 deletions src/Load/DatabaseLoader/DatabaseLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

namespace MilesAsylum\Slurp\Load\DatabaseLoader;

use MilesAsylum\Slurp\Load\DatabaseLoader\Exception\DatabaseLoaderException;
use MilesAsylum\Slurp\Load\LoaderInterface;

class DatabaseLoader implements LoaderInterface
{
/**
* @var \PDOStatement
* @var BatchManagerInterface
*/
protected $batchStmt;

Expand All @@ -30,35 +31,91 @@ class DatabaseLoader implements LoaderInterface
* @var array
*/
private $columnMapping;
/**
* @var string
*/
private $table;

/**
* @var StagedLoad
*/
private $stagedLoad;

/**
* @var LoaderFactory
*/
private $loaderFactory;

/**
* DatabaseLoader constructor.
* @param BatchStmtInterface $batchStmt
* @param int $batchSize
* @param string $table
* @param array $columnMapping Array key is the destination column and the array value is the source column.
* @param LoaderFactory $dmlFactory
* @param int $batchSize
*/
public function __construct(
BatchStmtInterface $batchStmt,
int $batchSize = 100,
array $columnMapping = []
string $table,
array $columnMapping,
LoaderFactory $dmlFactory,
int $batchSize = 100
) {
$this->loaderFactory = $dmlFactory;
$this->table = $table;
$this->batchSize = $batchSize;
$this->batchStmt = $batchStmt;
$this->columnMapping = $columnMapping;
}

/**
* @param array $values
* @throws DatabaseLoaderException
*/
public function loadValues(array $values): void
{
if (!$this->hasBegun()) {
throw new DatabaseLoaderException(
sprintf(
'Data cannot be loaded until %s has been called.',
__CLASS__ . '::begin()'
)
);
}

$this->rowCollection[] = $this->mapColumnNames($values);

if (count($this->rowCollection) >= $this->batchSize) {
$this->flush();
}
}

public function begin(): void
{
$this->stagedLoad = $this->loaderFactory->createStagedLoad(
$this->table,
array_keys($this->columnMapping)
);
$stagedTable = $this->stagedLoad->begin();
$this->batchStmt = $this->loaderFactory->createBatchInsStmt(
$stagedTable,
array_keys($this->columnMapping)
);
}

public function hasBegun(): bool
{
return isset($this->stagedLoad);
}

public function finalise(): void
{
$this->flush();
$this->stagedLoad->commit();
}

public function abort(): void
{
$this->stagedLoad->discard();
$this->stagedLoad = null;
$this->batchStmt = null;
}

protected function flush(): void
Expand Down
15 changes: 15 additions & 0 deletions src/Load/DatabaseLoader/Exception/DatabaseLoaderException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
/**
* Author: Courtney Miles
* Date: 16/09/18
* Time: 3:54 PM
*/

namespace MilesAsylum\Slurp\Load\DatabaseLoader\Exception;

use MilesAsylum\Slurp\Exception\ExceptionInterface;

class DatabaseLoaderException extends \Exception implements ExceptionInterface
{

}
36 changes: 36 additions & 0 deletions src/Load/DatabaseLoader/LoaderFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* Author: Courtney Miles
* Date: 16/09/18
* Time: 3:34 PM
*/

namespace MilesAsylum\Slurp\Load\DatabaseLoader;

class LoaderFactory
{
/**
* @var \PDO
*/
private $pdo;

public function __construct(\PDO $pdo)
{
$this->pdo = $pdo;
}

public function createBatchInsStmt(string $table, array $columns)
{
return new BatchInsertManager($this->pdo, $table, $columns, $this->createBatchInsQueryFactory());
}

public function createStagedLoad(string $table, array $columns)
{
return new StagedLoad($this->pdo, $table, $columns);
}

protected function createBatchInsQueryFactory()
{
return new QueryFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace MilesAsylum\Slurp\Load\DatabaseLoader;

class BatchInsUpdQueryFactory
class QueryFactory
{
/**
* @param $table
Expand All @@ -16,7 +16,7 @@ class BatchInsUpdQueryFactory
* @return string
* @throws \Exception
*/
public function createQuery($table, array $columns, $batchSize = 1): string
public function createInsertQuery($table, array $columns, $batchSize = 1): string
{
if (empty($columns)) {
throw new \InvalidArgumentException('One or more columns must be supplied.');
Expand All @@ -29,18 +29,10 @@ public function createQuery($table, array $columns, $batchSize = 1): string
$colsStr = '`' . implode('`, `', $columns) . '`';
$valueStr = '(' . implode(', ', array_fill(0, count($columns), '?')) . ')';
$batchValueStr = implode(",\n ", array_fill(0, $batchSize, $valueStr));
$updateValues = [];

foreach ($columns as $column) {
$updateValues[] = "`{$column}` = VALUES(`{$column}`)";
}

$updateValuesStr = implode(', ', $updateValues);

return <<<SQL
INSERT INTO `{$table}` ({$colsStr})
VALUES $batchValueStr
ON DUPLICATE KEY UPDATE {$updateValuesStr}
SQL;
}
}
Loading