Skip to content

Commit

Permalink
Unbuffered datastore queries for download (#3703)
Browse files Browse the repository at this point in the history
  • Loading branch information
dafeder committed Nov 8, 2021
1 parent 5d58599 commit 0eed854
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 230 deletions.
12 changes: 9 additions & 3 deletions modules/common/src/Storage/AbstractDatabaseTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,27 @@ public function count(): int {
* Query object.
* @param string $alias
* (Optional) alias for primary table.
* @param bool $fetch
* Fetch the rows if true, just return the result statement if not.
*
* @return array|\Drupal\Core\Database\StatementInterface
* Array of results if $fetch is true, otherwise result of
* Select::execute() (prepared Statement object or null).
*/
public function query(Query $query, string $alias = 't'): array {
public function query(Query $query, string $alias = 't', $fetch = TRUE) {
$this->setTable();
$query->collection = $this->getTableName();
$selectFactory = new SelectFactory($this->connection, $alias);
$db_query = $selectFactory->create($query);

try {
$result = $db_query->execute()->fetchAll();
$result = $db_query->execute();
}
catch (DatabaseExceptionWrapper $e) {
throw new \Exception($this->sanitizedErrorMessage($e->getMessage()));
}

return $result;
return $fetch ? $result->fetchAll() : $result;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion modules/common/src/Storage/Query.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Query implements
*
* @var int|null
*/
public $limit = 500;
public $limit;

/**
* Number of records to offset by or skip before returning first record.
Expand Down
12 changes: 5 additions & 7 deletions modules/common/src/Storage/SelectFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,11 @@ private function setQueryDirectionOrderBy($sort, Select $db_query) {
* A DKAN query object.
*/
private function setQueryLimitAndOffset(Select $db_query, Query $query) {
if (isset($query->limit)) {
if (isset($query->offset)) {
$db_query->range($query->offset, $query->limit);
}
else {
$db_query->range(0, $query->limit);
}
if (isset($query->limit) && $query->limit !== NULL) {
$db_query->range(($query->offset ?? 0), ($query->limit));
}
elseif (isset($query->offset) && $query->offset) {
$db_query->range(($query->offset));
}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/common/tests/src/Unit/Storage/QueryDataProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static function noPropertiesQuery($return) {
return $query;

case self::SQL:
return "SELECT t.* FROM {table} t LIMIT 500 OFFSET 0";
return "SELECT t.* FROM {table} t";

case self::EXCEPTION:
return FALSE;
Expand Down Expand Up @@ -428,7 +428,7 @@ public static function offsetQuery($return) {
return $query;

case self::SQL:
return "LIMIT 500 OFFSET 5";
return "OFFSET 5";

case self::EXCEPTION:
return FALSE;
Expand Down
6 changes: 5 additions & 1 deletion modules/datastore/datastore.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ services:
parent: logger.channel_base
arguments: ['datastore']

dkan.datastore.database:
class: \Drupal\Core\Database\Connection
factory: \Drupal\datastore\Storage\DatabaseConnectionFactory::getConnection

dkan.datastore.database_table_factory:
class: \Drupal\datastore\Storage\DatabaseTableFactory
arguments:
- '@database'
- '@dkan.datastore.database'
calls:
- [setIndexManager, ['@?indexer.indexmanager']]

Expand Down
3 changes: 1 addition & 2 deletions modules/datastore/docs/query.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@
},
"limit": {
"type": "integer",
"description": "Limit for maximum number of records returned. Pass zero for no limit (may be restricted by user permissions).",
"default": 500
"description": "Limit for maximum number of records returned. Pass zero for no limit (may be restricted by user permissions)."
},
"offset": {
"type": "integer",
Expand Down
85 changes: 61 additions & 24 deletions modules/datastore/src/Controller/AbstractQueryController.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,19 @@ public static function create(ContainerInterface $container) {
* The json or CSV response.
*/
public function query(Request $request) {
$payloadJson = static::getPayloadJson($request);

try {
$datastoreQuery = new DatastoreQuery($payloadJson, $this->getRowsLimit());
$datastoreQuery = $this->buildDatastoreQuery($request);
}
catch (\Exception $e) {
return $this->getResponseFromException($e, 400);
}

$result = $this->datastoreService->runQuery($datastoreQuery);
try {
$result = $this->datastoreService->runQuery($datastoreQuery);
}
catch (\Exception $e) {
$code = (strpos($e->getMessage(), "Error retrieving") !== FALSE) ? 404 : 400;
return $this->getResponseFromException($e, $code);
}

$dependencies = $this->extractMetastoreDependencies($datastoreQuery);
return $this->formatResponse($datastoreQuery, $result, $dependencies, $request->query);
Expand All @@ -115,19 +118,13 @@ public function query(Request $request) {
* The json response.
*/
public function queryResource(string $identifier, Request $request) {
$payloadJson = static::getPayloadJson($request);

try {
$this->prepareQueryResourcePayload($payloadJson, $identifier);
$datastoreQuery = $this->buildDatastoreQuery($request, $identifier);
}
catch (\Exception $e) {
return $this->getResponseFromException(
new \Exception("Invalid query JSON: {$e->getMessage()}"),
400
);
return $this->getResponseFromException($e, 400);
}
try {
$datastoreQuery = new DatastoreQuery($payloadJson, $this->getRowsLimit());
$result = $this->datastoreService->runQuery($datastoreQuery);
}
catch (\Exception $e) {
Expand Down Expand Up @@ -211,21 +208,61 @@ protected function extractMetastoreDependencies(DatastoreQuery $datastoreQuery):
* array. But one needs to be inferred from the request params and added
* before execution.
*
* @param string $json
* A JSON payload.
* @param \Symfony\Component\HttpFoundation\Request $request
* The client request.
* @param mixed $identifier
* Resource identifier to query against.
* Resource identifier to query against, if supplied via path.
*/
protected function prepareQueryResourcePayload(&$json, $identifier) {
protected function buildDatastoreQuery(Request $request, $identifier = NULL) {
$json = static::getPayloadJson($request);
$data = json_decode($json);
if (!empty($data->resources) || !empty($data->joins)) {
throw new \Exception("Joins are not available and "
. "resources should not be explicitly passed when using the resource "
. "query endpoint. Try /api/1/datastore/query.");
$this->additionalPayloadValidation($data);
if ($identifier) {
$resource = (object) ["id" => $identifier, "alias" => "t"];
$data->resources = [$resource];
}
return new DatastoreQuery(json_encode($data), $this->getRowsLimit());
}

/**
* Run some additional validation on incoming request.
*
* @param object $data
* The decoded request data.
* @param mixed $identifier
* Resource identifier.
*/
protected function additionalPayloadValidation($data, $identifier = NULL) {
$this->checkForRowIdProperty($data);
if (!empty($data->properties) && !empty($data->rowIds)) {
throw new \Exception('The rowIds property cannot be set to true if you are requesting specific properties.');
}
if ($identifier && (!empty($data->resources) || !empty($data->joins))) {
throw new \Exception('Joins are not available and resources should not be explicitly passed ' .
'when using the resource query endpoint. Try /api/1/datastore/query.');
}
}

/**
* Check if the record_number is being explicitly requested.
*
* @param object $data
* The query object.
*/
protected function checkForRowIdProperty($data) {
if (!isset($data->properties)) {
return;
}
$hasProperty = FALSE;
foreach ($data->properties as $property) {
$hasProperty = (is_string($property) && $property == 'record_number');
$hasProperty = $hasProperty ?: (isset($property->property) && $property->property == 'record_number');
if ($hasProperty) {
throw new \Exception('The record_number property is for internal use and cannot be requested ' .
'directly. Set rowIds to true and remove properties from your query to see the full table ' .
'with row IDs.');
}
}
$resource = (object) ["id" => $identifier, "alias" => "t"];
$data->resources = [$resource];
$json = json_encode($data);
}

/**
Expand Down
1 change: 0 additions & 1 deletion modules/datastore/src/Controller/QueryController.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public function formatResponse(
default:
return $this->metastoreApiResponse->cachedJsonResponse($result->{"$"}, 200, $dependencies, $params);
}

}

/**
Expand Down
96 changes: 32 additions & 64 deletions modules/datastore/src/Controller/QueryDownloadController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@
class QueryDownloadController extends AbstractQueryController {

/**
* Stream a CSV response.
*
* @param \Drupal\datastore\Service\DatastoreQuery $datastoreQuery
* A datastore query object.
* @param \RootedData\RootedJsonData $result
* The result of the datastore query.
* @param array $dependencies
* A dependency array for use by \Drupal\metastore\MetastoreApiResponse.
* @param \Symfony\Component\HttpFoundation\ParameterBag|null $params
* The parameter object from the request.
*
* @return \Symfony\Component\HttpFoundation\Response
* The json response.
* {@inheritdoc}
*/
public function formatResponse(
DatastoreQuery $datastoreQuery,
Expand All @@ -48,6 +36,21 @@ public function formatResponse(
}
}

/**
* {@inheritdoc}
*/
protected function buildDatastoreQuery($request, $identifier = NULL) {
$json = static::getPayloadJson($request);
$data = json_decode($json);
$this->additionalPayloadValidation($data);
if ($identifier) {
$resource = (object) ["id" => $identifier, "alias" => "t"];
$data->resources = [$resource];
}
$data->results = FALSE;
return new DatastoreQuery(json_encode($data));
}

/**
* Set up the Streamed Response callback.
*
Expand All @@ -63,61 +66,23 @@ protected function streamCsvResponse(DatastoreQuery $datastoreQuery, RootedJsonD
$response = $this->initStreamedCsvResponse();

$response->setCallback(function () use ($result, $datastoreQuery) {
$count = $result->{'$.count'};
$rows = $result->{'$.results'};
array_unshift($rows, $this->getHeaderRow($result));

// Open the stream and send the header.
set_time_limit(0);
$handle = fopen('php://output', 'wb');
$this->sendRows($handle, $rows);
$this->sendRow($handle, $this->getHeaderRow($result));

// If we've already sent the full result set we can end now.
$progress = (count($rows) - 1);
if ($count <= $progress) {
fclose($handle);
return TRUE;
}
// Otherwise, we're going to redo as an iterator from the beginning.
$result = $this->datastoreService->runResultsQuery($datastoreQuery, FALSE);

// Otherwise, we iterate.
$this->streamIterate($result, $datastoreQuery, $handle);
while ($row = $result->fetchAssoc()) {
$this->sendRow($handle, array_values($row));
}

fclose($handle);
});
return $response;
}

/**
* Iterator for CSV streaming.
*
* @param \RootedData\RootedJsonData $result
* The result data from the initial query.
* @param \Drupal\datastore\Service\DatastoreQuery $datastoreQuery
* The unmodified datastore query object.
* @param resource $handle
* The PHP output stream.
*/
private function streamIterate(RootedJsonData $result, DatastoreQuery $datastoreQuery, $handle) {
$pageCount = $progress = count($result->{'$.results'});
$pageLimit = $this->getRowsLimit();
$iteratorQuery = clone $datastoreQuery;

// Disable extra information in response.
$iteratorQuery->{"$.count"} = FALSE;
$iteratorQuery->{"$.schema"} = FALSE;
$iteratorQuery->{"$.keys"} = FALSE;

$i = 1;
while ($pageCount >= $pageLimit) {
$iteratorQuery->{"$.offset"} = $pageLimit * $i;
$result = $this->datastoreService->runQuery($iteratorQuery);
$rows = $result->{"$.results"};
$this->sendRows($handle, $rows);
$pageCount = count($rows);
$progress += $pageCount;
$i++;
}
}

/**
* Create initial streamed response object.
*
Expand All @@ -137,13 +102,11 @@ private function initStreamedCsvResponse($filename = "data.csv") {
*
* @param resource $handle
* The file handler.
* @param array $rows
* Rows of data to send as CSV.
* @param array $row
* Row of data to send as CSV.
*/
private function sendRows($handle, array $rows) {
foreach ($rows as $row) {
fputcsv($handle, $row);
}
private function sendRow($handle, array $row) {
fputcsv($handle, $row);
ob_flush();
flush();
}
Expand All @@ -170,6 +133,11 @@ private function getHeaderRow(RootedJsonData &$result) {
if (empty($header_row) || !is_array($header_row)) {
throw new \Exception("Could not generate header for CSV.");
}
array_walk($header_row, function (&$header) {
if (is_array($header)) {
$header = $header['alias'] ?? $header['property'];
}
});
return $header_row;
}

Expand Down
Loading

0 comments on commit 0eed854

Please sign in to comment.