Skip to content

Commit

Permalink
Simplify statement preparing and storage
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Mar 25, 2019
1 parent 6791ece commit 1269216
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 95 deletions.
9 changes: 0 additions & 9 deletions src/Internal/PqStatementStorage.php

This file was deleted.

16 changes: 0 additions & 16 deletions src/Internal/StatementStorage.php

This file was deleted.

43 changes: 12 additions & 31 deletions src/PgSqlHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class PgSqlHandle implements Handle
/** @var callable */
private $unlisten;

/** @var Internal\StatementStorage[] */
/** @var Promise[] */
private $statements = [];

/** @var int */
Expand Down Expand Up @@ -312,12 +312,6 @@ public function statementDeallocate(string $name): Promise

\assert(isset($this->statements[$name]), "Named statement not found when deallocating");

$storage = $this->statements[$name];

if (--$storage->count) {
return new Success;
}

unset($this->statements[$name]);

return $this->query(\sprintf("DEALLOCATE %s", $name));
Expand Down Expand Up @@ -368,21 +362,17 @@ public function prepare(string $sql): Promise
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);

if (isset($this->statements[$name])) {
$storage = $this->statements[$name];
++$storage->count;

if ($storage->promise) {
return $storage->promise;
}

return new Success(new PgSqlStatement($this, $name, $sql, $names));
return $this->statements[$name];
}

$this->statements[$name] = $storage = new Internal\StatementStorage;

$promise = $storage->promise = call(function () use ($name, $names, $sql, $modifiedSql) {
/** @var resource $result PostgreSQL result resource. */
$result = yield from $this->send("pg_send_prepare", $name, $modifiedSql);
return $this->statements[$name] = call(function () use ($name, $names, $sql, $modifiedSql) {
try {
/** @var resource $result PostgreSQL result resource. */
$result = yield from $this->send("pg_send_prepare", $name, $modifiedSql);
} catch (\Throwable $exception) {
unset($this->statements[$name]);
throw $exception;
}

switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
case \PGSQL_COMMAND_OK:
Expand All @@ -391,8 +381,8 @@ public function prepare(string $sql): Promise
case \PGSQL_NONFATAL_ERROR:
case \PGSQL_FATAL_ERROR:
$diagnostics = [];
foreach (self::DIAGNOSTIC_CODES as $fieldCode => $desciption) {
$diagnostics[$desciption] = \pg_result_error_field($result, $fieldCode);
foreach (self::DIAGNOSTIC_CODES as $fieldCode => $description) {
$diagnostics[$description] = \pg_result_error_field($result, $fieldCode);
}
throw new QueryExecutionError(\pg_result_error($result), $diagnostics);

Expand All @@ -405,15 +395,6 @@ public function prepare(string $sql): Promise
// @codeCoverageIgnoreEnd
}
});
$promise->onResolve(function ($exception) use ($storage, $name) {
if ($exception) {
unset($this->statements[$name]);
return;
}

$storage->promise = null;
});
return $promise;
}

/**
Expand Down
64 changes: 26 additions & 38 deletions src/PqHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ final class PqHandle implements Handle
/** @var \Amp\Emitter[] */
private $listeners;

/** @var @return PromiseInternal\PqStatementStorage[] */
private $statements = [];
/** @var Promise[] */
private $statementPromises = [];

/** @var \pq\Statement[] */
private $statementHandles = [];

/** @var callable */
private $fetch;
Expand Down Expand Up @@ -202,10 +205,10 @@ private function send(callable $method, ...$args): \Generator
}

try {
$handle = $method(...$args);

$this->deferred = $this->busy = new Deferred;

$handle = $method(...$args);

Loop::enable($this->poll);
if (!$this->handle->flush()) {
Loop::enable($this->await);
Expand Down Expand Up @@ -311,11 +314,9 @@ private function release()
*/
public function statementExecute(string $name, array $params): Promise
{
\assert(isset($this->statements[$name]), "Named statement not found when executing");

$statement = $this->statements[$name]->statement;
\assert(isset($this->statementHandles[$name]), "Named statement not found when executing");

return new Coroutine($this->send([$statement, "execAsync"], $params));
return new Coroutine($this->send([$this->statementHandles[$name], "execAsync"], $params));
}

/**
Expand All @@ -331,17 +332,15 @@ public function statementDeallocate(string $name): Promise
return new Success; // Connection dead.
}

\assert(isset($this->statements[$name]), "Named statement not found when deallocating");

$storage = $this->statements[$name];

if (--$storage->count) {
return new Success;
}
\assert(
isset($this->statementPromises[$name], $this->statementHandles[$name]),
"Named statement not found when deallocating"
);

unset($this->statements[$name]);
$statement = $this->statementHandles[$name];
unset($this->statementPromises[$name], $this->statementHandles[$name]);

return new Coroutine($this->send([$storage->statement, "deallocateAsync"]));
return new Coroutine($this->send([$statement, "deallocateAsync"]));
}

/**
Expand Down Expand Up @@ -384,33 +383,22 @@ public function prepare(string $sql): Promise

$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);

if (isset($this->statements[$name])) {
$storage = $this->statements[$name];
++$storage->count;
if (isset($this->statementPromises[$name])) {
return $this->statementPromises[$name];
}

if ($storage->promise) {
return $storage->promise;
return $this->statementPromises[$name] = call(function () use ($names, $name, $sql, $modifiedSql) {
try {
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $modifiedSql);
} catch (\Throwable $exception) {
unset($this->statementPromises[$name]);
throw $exception;
}

return new Success(new PqStatement($this, $name, $sql, $names));
}

$this->statements[$name] = $storage = new Internal\PqStatementStorage;
$this->statementHandles[$name] = $statement;

$promise = $storage->promise = call(function () use ($storage, $names, $name, $sql, $modifiedSql) {
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $modifiedSql);
$storage->statement = $statement;
return new PqStatement($this, $name, $sql, $names);
});
$promise->onResolve(function ($exception) use ($storage, $name) {
if ($exception) {
unset($this->statements[$name]);
return;
}

$storage->promise = null;
});
return $promise;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/PqStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

final class PqStatement implements Statement
{
/** @var @return PromisePqHandle */
/** @var PqHandle */
private $handle;

/** @var string */
Expand Down

0 comments on commit 1269216

Please sign in to comment.