Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Octet Streaming of Datastore Dump #7839

Merged
merged 26 commits into from Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d4d997a
fix(views): datastore ext dump octet streaming;
JVickery-TBS Sep 27, 2023
c4387a6
fix(views): changelog, syntax, typing;
JVickery-TBS Sep 27, 2023
3c1ac4e
fix(views): member dump, typing;
JVickery-TBS Sep 28, 2023
233946d
fix(views): typing;
JVickery-TBS Sep 28, 2023
36b44ec
fix(typing): more typing fixes;
JVickery-TBS Sep 28, 2023
f25d487
fix(typing): more typing;
JVickery-TBS Sep 28, 2023
f54720c
feat(views): removed u strings, new 404 condition;
JVickery-TBS Oct 5, 2023
bca4462
feat(views): reusable code;
JVickery-TBS Oct 5, 2023
9705110
fix(syntax): removed unused import;
JVickery-TBS Oct 5, 2023
07535d7
feat(views): dump_to method for cli and view;
JVickery-TBS Oct 5, 2023
4fef35d
fix(syntax): type syntax for unbound;
JVickery-TBS Oct 5, 2023
ca5c643
feat(views): dump_to always retur generator;
JVickery-TBS Oct 6, 2023
50b9aa6
fix(tests): basestring for StringIO;
JVickery-TBS Oct 11, 2023
53baace
fix(views): bytes buffer;
JVickery-TBS Oct 12, 2023
17df4fa
fix(encoding): byte encoding for writers;
JVickery-TBS Oct 12, 2023
10a3ce6
fix(encoding): byte encoding for writers;
JVickery-TBS Oct 12, 2023
8f3c866
fix(encoding): byte encoding for writers;
JVickery-TBS Oct 12, 2023
fd77b4f
fix(encoding): byte encoding for writers;
JVickery-TBS Oct 12, 2023
e3ab5c4
revert(encoding): changed bytes back to str;
JVickery-TBS Oct 12, 2023
6c1ea9f
fix(syntax): misc syntax;
JVickery-TBS Oct 12, 2023
7c8cfd0
revert(encoding): changed bytes back to str;
JVickery-TBS Oct 13, 2023
c3e44ac
feat(dev): another attempt;
JVickery-TBS Oct 13, 2023
06b654c
feat(dev): another attempt;
JVickery-TBS Oct 13, 2023
0f3ea9a
feat(dev): moved output buffers into writer methods;
JVickery-TBS Oct 13, 2023
38ef3d0
fix(syntax): lint and typing;
JVickery-TBS Oct 16, 2023
514c6b5
removal(misc): removed method param;
JVickery-TBS Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/7839.bugfix
@@ -0,0 +1 @@
Fixed Octet Streaming for Datastore Dump requests.
57 changes: 28 additions & 29 deletions ckan/views/group.py
Expand Up @@ -8,6 +8,9 @@
from typing_extensions import Literal

from urllib.parse import urlencode
import csv
from io import StringIO
from codecs import BOM_UTF8

import ckan.lib.base as base
import ckan.lib.helpers as h
Expand All @@ -21,7 +24,6 @@
from ckan.common import g, config, request, current_user, _
from ckan.views.home import CACHE_PARAMETERS
from ckan.views.dataset import _get_search_details
from ckanext.datastore.writer import csv_writer

from flask import Blueprint, make_response
from flask.views import MethodView
Expand Down Expand Up @@ -566,12 +568,6 @@ def manage_members(id: str, group_type: str, is_organization: bool) -> str:


def member_dump(id: str, group_type: str, is_organization: bool):
response = make_response()
response.headers[u'content-type'] = u'application/octet-stream'

writer_factory = csv_writer
records_format = u'csv'

group_obj = model.Group.get(id)
if not group_obj:
base.abort(404,
Expand All @@ -593,37 +589,40 @@ def member_dump(id: str, group_type: str, is_organization: bool):
members = get_action(u'member_list')(context, {
u'id': id,
u'object_type': u'user',
u'records_format': records_format,
u'records_format': u'csv',
u'include_total': False,
})
except NotFound:
base.abort(404, _('Members not found'))

results = ''
results = [[_('Username'), _('Email'), _('Name'), _('Role')]]
for uid, _user, role in members:
user_obj = model.User.get(uid)
if not user_obj:
continue
results += '{name},{email},{fullname},{role}\n'.format(
name=user_obj.name,
email=user_obj.email,
fullname=user_obj.fullname if user_obj.fullname else _('N/A'),
role=role)

fields = [
{'id': _('Username')},
{'id': _('Email')},
{'id': _('Name')},
{'id': _('Role')}]

def start_writer(fields: Any):
file_name = u'{group_id}-{members}'.format(
group_id=group_obj.name,
members=_(u'members'))
return writer_factory(response, fields, file_name, bom=True)

with start_writer(fields) as wr:
wr.write_records(results) # type: ignore
results.append([
user_obj.name,
user_obj.email, # type: ignore
user_obj.fullname if user_obj.fullname else _('N/A'),
role,
])

output_stream = StringIO()
output_stream.write(BOM_UTF8) # type: ignore
csv.writer(output_stream).writerows(results)

file_name = u'{org_id}-{members}'.format(
org_id=group_obj.name,
members=_(u'members'))

output_stream.seek(0)
response = make_response(output_stream.read())
output_stream.close()
content_disposition = u'attachment; filename="{name}.csv"'.format(
name=file_name)
content_type = b'text/csv; charset=utf-8'
response.headers['Content-Type'] = content_type # type: ignore
response.headers['Content-Disposition'] = content_disposition

return response

Expand Down
2 changes: 1 addition & 1 deletion ckanext/datastore/backend/postgres.py
Expand Up @@ -1410,7 +1410,7 @@ def search_data(context: Context, data_dict: dict[str, Any]):
else:
v = list(_execute_single_statement(
context, sql_string, where_values))[0][0]
if v is None:
if v is None or v == '[]':
records = []
else:
records = LazyJSONObject(v)
Expand Down
203 changes: 143 additions & 60 deletions ckanext/datastore/blueprint.py
@@ -1,10 +1,11 @@
# encoding: utf-8
from __future__ import annotations

from typing import Any, Optional, cast
from typing import Any, Optional, cast, Union
from itertools import zip_longest
from io import StringIO

from flask import Blueprint, make_response
from flask import Blueprint, Response
from flask.views import MethodView

import ckan.lib.navl.dictization_functions as dict_fns
Expand Down Expand Up @@ -59,38 +60,80 @@ def dump_schema() -> Schema:


def dump(resource_id: str):
try:
get_action('datastore_search')({}, {'resource_id': resource_id,
'limit': 0})
except ObjectNotFound:
abort(404, _('DataStore resource not found'))

data, errors = dict_fns.validate(request.args.to_dict(), dump_schema())
if errors:
abort(
400, u'\n'.join(
u'{0}: {1}'.format(k, u' '.join(e)) for k, e in errors.items()
400, '\n'.join(
'{0}: {1}'.format(k, ' '.join(e)) for k, e in errors.items()
)
)

response = make_response()
response.headers[u'content-type'] = u'application/octet-stream'
fmt = data['format']
offset = data['offset']
limit = data.get('limit')
options = {'bom': data['bom']}
sort = data['sort']
search_params = {
k: v
for k, v in data.items()
if k in [
'filters', 'q', 'distinct', 'plain', 'language',
'fields'
]
}

user_context = g.user

content_type = None
content_disposition = None

if fmt == 'csv':
content_disposition = 'attachment; filename="{name}.csv"'.format(
name=resource_id)
content_type = b'text/csv; charset=utf-8'
elif fmt == 'tsv':
content_disposition = 'attachment; filename="{name}.tsv"'.format(
name=resource_id)
content_type = b'text/tab-separated-values; charset=utf-8'
elif fmt == 'json':
content_disposition = 'attachment; filename="{name}.json"'.format(
name=resource_id)
content_type = b'application/json; charset=utf-8'
elif fmt == 'xml':
content_disposition = 'attachment; filename="{name}.xml"'.format(
name=resource_id)
content_type = b'text/xml; charset=utf-8'
else:
abort(404, _('Unsupported format'))

output_stream = StringIO()

headers = {}
if content_type:
headers['Content-Type'] = content_type
if content_disposition:
headers['Content-disposition'] = content_disposition

try:
dump_to(
JVickery-TBS marked this conversation as resolved.
Show resolved Hide resolved
resource_id,
response,
fmt=data[u'format'],
offset=data[u'offset'],
limit=data.get(u'limit'),
options={u'bom': data[u'bom']},
sort=data[u'sort'],
search_params={
k: v
for k, v in data.items()
if k in [
u'filters', u'q', u'distinct', u'plain', u'language',
u'fields'
]
},
)
return Response(dump_to(resource_id,
output_stream,
fmt=fmt,
offset=offset,
limit=limit,
options=options,
sort=sort,
search_params=search_params,
user=user_context),
mimetype='application/octet-stream',
headers=headers)
except ObjectNotFound:
abort(404, _(u'DataStore resource not found'))
return response
abort(404, _('DataStore resource not found'))


class DictionaryView(MethodView):
Expand Down Expand Up @@ -163,63 +206,103 @@ def post(self, id: str, resource_id: str):


def dump_to(
resource_id: str, output: Any, fmt: str, offset: int, limit: Optional[int],
options: dict[str, Any], sort: str, search_params: dict[str, Any]
resource_id: str, output_stream: Any, fmt: str, offset: int,
limit: Optional[int], options: dict[str, Any], sort: str,
search_params: dict[str, Any], user: str, is_generator: bool = True
):
if fmt == u'csv':
if fmt == 'csv':
writer_factory = csv_writer
records_format = u'csv'
elif fmt == u'tsv':
records_format = 'csv'
elif fmt == 'tsv':
writer_factory = tsv_writer
records_format = u'tsv'
elif fmt == u'json':
records_format = 'tsv'
elif fmt == 'json':
writer_factory = json_writer
records_format = u'lists'
elif fmt == u'xml':
records_format = 'lists'
elif fmt == 'xml':
writer_factory = xml_writer
records_format = u'objects'
records_format = 'objects'
else:
assert False, u'Unsupported format'
assert False, 'Unsupported format'

bom = options.get('bom', False)

def start_writer(fields: Any):
bom = options.get(u'bom', False)
return writer_factory(output, fields, resource_id, bom)
def start_stream_writer(output_stream: StringIO,
fields: list[dict[str, Any]]):
return writer_factory(output_stream, fields, bom=bom)

def result_page(offs: int, lim: Optional[int]):
return get_action(u'datastore_search')(
{},
def stream_result_page(offs: int, lim: Union[None, int]):
return get_action('datastore_search')(
{'user': user},
dict({
u'resource_id': resource_id,
u'limit': PAGINATE_BY
if lim is None else min(PAGINATE_BY, lim),
u'offset': offs,
u'sort': sort,
u'records_format': records_format,
u'include_total': False,
'resource_id': resource_id,
'limit': PAGINATE_BY
if limit is None else min(PAGINATE_BY, lim), # type: ignore
'offset': offs,
'sort': sort,
'records_format': records_format,
'include_total': False,
}, **search_params)
)

result = result_page(offset, limit)
def stream_dump(offset: int, limit: Union[None, int],
paginate_by: int, result: dict[str, Any]):
with start_stream_writer(output_stream, result['fields']) as output:
while True:
if limit is not None and limit <= 0:
break

records = result['records']

output.write_records(records)
output_stream.seek(0)
JVickery-TBS marked this conversation as resolved.
Show resolved Hide resolved
yield output_stream.read()
output_stream.truncate(0)
output_stream.seek(0)

if records_format == 'objects' or records_format == 'lists':
if len(records) < paginate_by:
break
elif not records:
break

offset += paginate_by
if limit is not None:
limit -= paginate_by
if limit <= 0:
break

result = stream_result_page(offset, limit)
output_stream.seek(0)
yield output_stream.read()

if result[u'limit'] != limit:
result = stream_result_page(offset, limit)

if result['limit'] != limit:
# `limit` (from PAGINATE_BY) must have been more than
# ckan.datastore.search.rows_max, so datastore_search responded with a
# limit matching ckan.datastore.search.rows_max. So we need to paginate
# by that amount instead, otherwise we'll have gaps in the records.
paginate_by = result[u'limit']
# ckan.datastore.search.rows_max, so datastore_search responded
# with a limit matching ckan.datastore.search.rows_max.
# So we need to paginate by that amount instead, otherwise
# we'll have gaps in the records.
paginate_by = result['limit']
else:
paginate_by = PAGINATE_BY

with start_writer(result[u'fields']) as wr:
# return generator method to yield data
if is_generator:
return stream_dump(offset, limit, paginate_by, result)

# do not yield any data, just write it to the output stream
with start_stream_writer(output_stream, result['fields']) as output:
while True:
if limit is not None and limit <= 0:
break

records = result[u'records']
records = result['records']

wr.write_records(records)
output.write_records(records)

if records_format == u'objects' or records_format == u'lists':
if records_format == 'objects' or records_format == 'lists':
if len(records) < paginate_by:
break
elif not records:
Expand All @@ -231,7 +314,7 @@ def result_page(offs: int, lim: Optional[int]):
if limit <= 0:
break

result = result_page(offset, limit)
result = stream_result_page(offset, limit)


datastore.add_url_rule(u'/datastore/dump/<resource_id>', view_func=dump)
Expand Down