From d4d997a4f12e8cf7cdb268ad8d9c498194615700 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 27 Sep 2023 19:58:28 +0000 Subject: [PATCH 01/26] fix(views): datastore ext dump octet streaming; - Fixed view method for datastore dump to actually stream data in Flask response. - Fixed possible issue in sql return. --- ckanext/datastore/backend/postgres.py | 2 +- ckanext/datastore/blueprint.py | 141 +++++++++++++++++++++----- ckanext/datastore/writer.py | 123 ++++++++-------------- 3 files changed, 164 insertions(+), 102 deletions(-) diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index 5413176e31b..7d3918ddb82 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -1410,7 +1410,7 @@ def search_data(context: Context, data_dict: dict[str, Any]): else: v = list(_execute_single_statement( context, sql_string, where_values))[0][0] - if v is None: + if v is None or v == '[]': records = [] else: records = LazyJSONObject(v) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index f7d787a30f3..dfca38502bd 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -3,8 +3,9 @@ from typing import Any, Optional, cast from itertools import zip_longest +from io import StringIO -from flask import Blueprint, make_response +from flask import Blueprint, Response from flask.views import MethodView import ckan.lib.navl.dictization_functions as dict_fns @@ -14,7 +15,7 @@ ) from ckan.plugins.toolkit import ( ObjectNotFound, NotAuthorized, get_action, get_validator, _, request, - abort, render, g, h + abort, render, g, h, Invalid ) from ckan.types import Schema, ValidatorFactory from ckanext.datastore.logic.schema import ( @@ -35,6 +36,7 @@ one_of = cast(ValidatorFactory, get_validator(u'one_of')) default = cast(ValidatorFactory, get_validator(u'default')) unicode_only = get_validator(u'unicode_only') +resource_id_validator = get_validator(u'resource_id_validator') DUMP_FORMATS = u'csv', u'tsv', u'json', u'xml' PAGINATE_BY = 32000 @@ -59,6 +61,11 @@ def dump_schema() -> Schema: def dump(resource_id: str): + try: + resource_id = resource_id_validator(resource_id) + except Invalid: + abort(404, _(u'DataStore resource not found')) + data, errors = dict_fns.validate(request.args.to_dict(), dump_schema()) if errors: abort( @@ -67,30 +74,118 @@ def dump(resource_id: str): ) ) - response = make_response() - response.headers[u'content-type'] = u'application/octet-stream' + fmt = data[u'format'] + offset = data[u'offset'] + limit = data.get(u'limit') + options = {u'bom': data[u'bom']} + sort = data[u'sort'] + search_params = { + k: v + for k, v in data.items() + if k in [ + u'filters', u'q', u'distinct', u'plain', u'language', + u'fields' + ] + } - try: - dump_to( - resource_id, - response, - fmt=data[u'format'], - offset=data[u'offset'], - limit=data.get(u'limit'), - options={u'bom': data[u'bom']}, - sort=data[u'sort'], - search_params={ - k: v - for k, v in data.items() - if k in [ - u'filters', u'q', u'distinct', u'plain', u'language', - u'fields' - ] - }, + if fmt == u'csv': + writer_factory = csv_writer + records_format = u'csv' + content_disposition = u'attachment; filename="{name}.csv"'.format( + name=resource_id) + content_type = b'text/csv; charset=utf-8' + elif fmt == u'tsv': + writer_factory = tsv_writer + records_format = u'tsv' + content_disposition = u'attachment; filename="{name}.tsv"'.format( + name=resource_id) + content_type = b'text/tab-separated-values; charset=utf-8' + elif fmt == u'json': + writer_factory = json_writer + records_format = u'lists' + content_disposition = u'attachment; filename="{name}.json"'.format( + name=resource_id) + content_type = b'application/json; charset=utf-8' + elif fmt == u'xml': + writer_factory = xml_writer + records_format = u'objects' + content_disposition = u'attachment; filename="{name}.xml"'.format( + name=resource_id) + content_type = b'text/xml; charset=utf-8' + + bom = options.get(u'bom', False) + + output_stream = StringIO() + + user_context = g.user + + def start_stream_writer(output_stream: StringIO, + fields: list[dict[str, Any]]): + return writer_factory(output_stream, fields, bom=bom) + + def stream_result_page(offs: int, lim: int): + return get_action(u'datastore_search')( + {u'user': user_context}, + dict({ + u'resource_id': resource_id, + u'limit': PAGINATE_BY + if limit is None else min(PAGINATE_BY, lim), + u'offset': offs, + u'sort': sort, + u'records_format': records_format, + u'include_total': False, + }, **search_params) ) + + def stream_dump(offset: int, limit: int, + paginate_by: int, result: dict[str, Any]): + with start_stream_writer(output_stream, result[u'fields']) as output: + while True: + if limit is not None and limit <= 0: + break + + records = result[u'records'] + + output.write_records(records) + output_stream.seek(0) + yield output_stream.read() + output_stream.truncate(0) + output_stream.seek(0) + + if records_format == u'objects' or records_format == u'lists': + if len(records) < paginate_by: + break + elif not records: + break + + offset += paginate_by + if limit is not None: + limit -= paginate_by + if limit <= 0: + break + + result = stream_result_page(offset, limit) + output_stream.seek(0) + yield output_stream.read() + + try: + result = stream_result_page(offset, limit) + + if result[u'limit'] != limit: + # `limit` (from PAGINATE_BY) must have been more than + # ckan.datastore.search.rows_max, so datastore_search responded with a + # limit matching ckan.datastore.search.rows_max. So we need to paginate + # by that amount instead, otherwise we'll have gaps in the records. + paginate_by = result[u'limit'] + else: + paginate_by = PAGINATE_BY + + return Response(stream_dump(offset, limit, paginate_by, result), + mimetype=u'application/octet-stream', + headers={'Content-Type': content_type, + 'Content-disposition': content_disposition,}) except ObjectNotFound: abort(404, _(u'DataStore resource not found')) - return response class DictionaryView(MethodView): @@ -183,7 +278,7 @@ def dump_to( def start_writer(fields: Any): bom = options.get(u'bom', False) - return writer_factory(output, fields, resource_id, bom) + return writer_factory(output, fields, bom) def result_page(offs: int, lim: Optional[int]): return get_action(u'datastore_search')( diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index c2a2b90bd80..2e7f3285130 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -14,146 +14,113 @@ @contextmanager -def csv_writer(response: Any, fields: list[dict[str, Any]], - name: Optional[str] = None, bom: bool = False): - u'''Context manager for writing UTF-8 CSV data to response +def csv_writer(output: Any, fields: list[dict[str, Any]], + bom: bool = False): + u'''Context manager for writing UTF-8 CSV data to file - :param response: file-like or response-like object for writing - data and headers (response-like objects only) + :param response: file-like object for writing data :param fields: list of datastore fields - :param name: file name (for headers, response-like objects only) :param bom: True to include a UTF-8 BOM at the start of the file ''' - if hasattr(response, u'headers'): - response.headers['Content-Type'] = b'text/csv; charset=utf-8' - if name: - response.headers['Content-disposition'] = ( - u'attachment; filename="{name}.csv"'.format( - name=encode_rfc2231(name))) if bom: - response.stream.write(BOM_UTF8) + output.write(BOM_UTF8) - csv.writer(response.stream).writerow( + csv.writer(output, encoding=u'utf-8').writerow( f['id'] for f in fields) - yield TextWriter(response.stream) + yield TextWriter(output) @contextmanager -def tsv_writer(response: Any, fields: list[dict[str, Any]], - name: Optional[str] = None, bom: bool = False): - u'''Context manager for writing UTF-8 TSV data to response +def tsv_writer(output: Any, fields: list[dict[str, Any]], + bom: bool = False): + u'''Context manager for writing UTF-8 TSV data to file - :param response: file-like or response-like object for writing - data and headers (response-like objects only) + :param response: file-like object for writing data :param fields: list of datastore fields - :param name: file name (for headers, response-like objects only) :param bom: True to include a UTF-8 BOM at the start of the file ''' - if hasattr(response, u'headers'): - response.headers['Content-Type'] = ( - b'text/tab-separated-values; charset=utf-8') - if name: - response.headers['Content-disposition'] = ( - u'attachment; filename="{name}.tsv"'.format( - name=encode_rfc2231(name))) if bom: - response.stream.write(BOM_UTF8) + output.write(BOM_UTF8) csv.writer( - response.stream, + output, + encoding=u'utf-8', dialect='excel-tab').writerow( f['id'] for f in fields) - yield TextWriter(response.stream) + yield TextWriter(output) class TextWriter(object): u'text in, text out' - def __init__(self, response: Any): - self.response = response + def __init__(self, output: Any): + self.output = output def write_records(self, records: list[Any]): - self.response.write(records) + self.output.write(records) @contextmanager -def json_writer(response: Any, fields: list[dict[str, Any]], - name: Optional[str] = None, bom: bool = False): - u'''Context manager for writing UTF-8 JSON data to response +def json_writer(output: Any, fields: list[dict[str, Any]], + bom: bool = False): + u'''Context manager for writing UTF-8 JSON data to file - :param response: file-like or response-like object for writing - data and headers (response-like objects only) + :param response: file-like object for writing data :param fields: list of datastore fields - :param name: file name (for headers, response-like objects only) :param bom: True to include a UTF-8 BOM at the start of the file ''' - if hasattr(response, u'headers'): - response.headers['Content-Type'] = ( - b'application/json; charset=utf-8') - if name: - response.headers['Content-disposition'] = ( - u'attachment; filename="{name}.json"'.format( - name=encode_rfc2231(name))) if bom: - response.stream.write(BOM_UTF8) - response.stream.write( + output.write(BOM_UTF8) + output.write( '{\n "fields": %s,\n "records": [' % dumps( fields, ensure_ascii=False, separators=(',', ':'))) - yield JSONWriter(response.stream) - response.stream.write(b'\n]}\n') + yield JSONWriter(output) + output.write(b'\n]}\n') class JSONWriter(object): - def __init__(self, response: Any): - self.response = response + def __init__(self, output: Any): + self.output = output self.first = True def write_records(self, records: list[Any]): for r in records: if self.first: self.first = False - self.response.write(b'\n ') + self.output.write(b'\n ') else: - self.response.write(b',\n ') + self.output.write(b',\n ') - self.response.write(dumps( - r, ensure_ascii=False, separators=(u',', u':'))) + self.output.write(dumps( + r, ensure_ascii=False, separators=(u',', u':')) + .encode('utf-8')) @contextmanager -def xml_writer(response: Any, fields: list[dict[str, Any]], - name: Optional[str] = None, bom: bool = False): - u'''Context manager for writing UTF-8 XML data to response +def xml_writer(output: Any, fields: list[dict[str, Any]], + bom: bool = False): + u'''Context manager for writing UTF-8 XML data to file - :param response: file-like or response-like object for writing - data and headers (response-like objects only) + :param response: file-like object for writing data :param fields: list of datastore fields - :param name: file name (for headers, response-like objects only) :param bom: True to include a UTF-8 BOM at the start of the file ''' - if hasattr(response, u'headers'): - response.headers['Content-Type'] = ( - b'text/xml; charset=utf-8') - if name: - response.headers['Content-disposition'] = ( - u'attachment; filename="{name}.xml"'.format( - name=encode_rfc2231(name))) if bom: - response.stream.write(BOM_UTF8) - response.stream.write(b'\n') - yield XMLWriter(response.stream, [f[u'id'] for f in fields]) - response.stream.write(b'\n') + output.write(BOM_UTF8) + output.write(b'\n') + yield XMLWriter(output, [f[u'id'] for f in fields]) + output.write(b'\n') class XMLWriter(object): _key_attr = u'key' _value_tag = u'value' - def __init__(self, response: Any, columns: list[str]): - self.response = response + def __init__(self, output: Any, columns: list[str]): + self.output = output self.id_col = columns[0] == u'_id' if self.id_col: columns = columns[1:] @@ -184,5 +151,5 @@ def write_records(self, records: list[Any]): root.attrib[u'_id'] = str(r[u'_id']) for c in self.columns: self._insert_node(root, c, r[c]) - ElementTree(root).write(self.response, encoding=u'utf-8') - self.response.write(b'\n') + ElementTree(root).write(self.output, encoding=u'utf-8') + self.output.write(b'\n') From c4387a6f33db42da7d3c8528af18a64239e99be4 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 27 Sep 2023 20:10:19 +0000 Subject: [PATCH 02/26] fix(views): changelog, syntax, typing; - Added change log file. - Fixed flake8 syntax reporting. - Fixed typing reporting. --- changes/7839.bugfix | 1 + ckanext/datastore/blueprint.py | 19 +++++++++---------- ckanext/datastore/writer.py | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) create mode 100644 changes/7839.bugfix diff --git a/changes/7839.bugfix b/changes/7839.bugfix new file mode 100644 index 00000000000..6720052049a --- /dev/null +++ b/changes/7839.bugfix @@ -0,0 +1 @@ +Fixed Octet Streaming for Datastore Dump requests. diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index dfca38502bd..df99c6d3bfc 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -62,7 +62,7 @@ def dump_schema() -> Schema: def dump(resource_id: str): try: - resource_id = resource_id_validator(resource_id) + resource_id = resource_id_validator(resource_id) # type: ignore except Invalid: abort(404, _(u'DataStore resource not found')) @@ -119,11 +119,10 @@ def dump(resource_id: str): user_context = g.user - def start_stream_writer(output_stream: StringIO, - fields: list[dict[str, Any]]): + def start_stream_writer(output_stream, fields): return writer_factory(output_stream, fields, bom=bom) - def stream_result_page(offs: int, lim: int): + def stream_result_page(offs, lim): return get_action(u'datastore_search')( {u'user': user_context}, dict({ @@ -137,8 +136,7 @@ def stream_result_page(offs: int, lim: int): }, **search_params) ) - def stream_dump(offset: int, limit: int, - paginate_by: int, result: dict[str, Any]): + def stream_dump(offset, limit, paginate_by, result): with start_stream_writer(output_stream, result[u'fields']) as output: while True: if limit is not None and limit <= 0: @@ -173,9 +171,10 @@ def stream_dump(offset: int, limit: int, if result[u'limit'] != limit: # `limit` (from PAGINATE_BY) must have been more than - # ckan.datastore.search.rows_max, so datastore_search responded with a - # limit matching ckan.datastore.search.rows_max. So we need to paginate - # by that amount instead, otherwise we'll have gaps in the records. + # ckan.datastore.search.rows_max, so datastore_search responded + # with a limit matching ckan.datastore.search.rows_max. + # So we need to paginate by that amount instead, otherwise + # we'll have gaps in the records. paginate_by = result[u'limit'] else: paginate_by = PAGINATE_BY @@ -183,7 +182,7 @@ def stream_dump(offset: int, limit: int, return Response(stream_dump(offset, limit, paginate_by, result), mimetype=u'application/octet-stream', headers={'Content-Type': content_type, - 'Content-disposition': content_disposition,}) + 'Content-disposition': content_disposition}) except ObjectNotFound: abort(404, _(u'DataStore resource not found')) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 2e7f3285130..8234c30263b 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -2,7 +2,6 @@ from __future__ import annotations from contextlib import contextmanager -from email.utils import encode_rfc2231 from typing import Any, Optional from simplejson import dumps @@ -110,7 +109,8 @@ def xml_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) - output.write(b'\n') + output.write( + b'\n') yield XMLWriter(output, [f[u'id'] for f in fields]) output.write(b'\n') From 3c1ac4e3e4996b0657bd8c8c547e8bc21d1f7670 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 28 Sep 2023 14:29:40 +0000 Subject: [PATCH 03/26] fix(views): member dump, typing; - Changed member dump view to not use datastore writer. - Updated typing for datastore dump inner methods. --- ckan/views/group.py | 57 +++++++++++++++++----------------- ckanext/datastore/blueprint.py | 13 ++++---- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/ckan/views/group.py b/ckan/views/group.py index b0b7021228c..37f13cfd1ad 100644 --- a/ckan/views/group.py +++ b/ckan/views/group.py @@ -8,6 +8,9 @@ from typing_extensions import Literal from urllib.parse import urlencode +import unicodecsv +from io import StringIO +from codecs import BOM_UTF8 import ckan.lib.base as base import ckan.lib.helpers as h @@ -21,7 +24,6 @@ from ckan.common import g, config, request, current_user, _ from ckan.views.home import CACHE_PARAMETERS from ckan.views.dataset import _get_search_details -from ckanext.datastore.writer import csv_writer from flask import Blueprint, make_response from flask.views import MethodView @@ -566,12 +568,6 @@ def manage_members(id: str, group_type: str, is_organization: bool) -> str: def member_dump(id: str, group_type: str, is_organization: bool): - response = make_response() - response.headers[u'content-type'] = u'application/octet-stream' - - writer_factory = csv_writer - records_format = u'csv' - group_obj = model.Group.get(id) if not group_obj: base.abort(404, @@ -593,37 +589,40 @@ def member_dump(id: str, group_type: str, is_organization: bool): members = get_action(u'member_list')(context, { u'id': id, u'object_type': u'user', - u'records_format': records_format, + u'records_format': u'csv', u'include_total': False, }) except NotFound: base.abort(404, _('Members not found')) - results = '' + results = [[_('Username'), _('Email'), _('Name'), _('Role')]] for uid, _user, role in members: user_obj = model.User.get(uid) if not user_obj: continue - results += '{name},{email},{fullname},{role}\n'.format( - name=user_obj.name, - email=user_obj.email, - fullname=user_obj.fullname if user_obj.fullname else _('N/A'), - role=role) - - fields = [ - {'id': _('Username')}, - {'id': _('Email')}, - {'id': _('Name')}, - {'id': _('Role')}] - - def start_writer(fields: Any): - file_name = u'{group_id}-{members}'.format( - group_id=group_obj.name, - members=_(u'members')) - return writer_factory(response, fields, file_name, bom=True) - - with start_writer(fields) as wr: - wr.write_records(results) # type: ignore + results.append([ + user_obj.name, + user_obj.email, + user_obj.fullname if user_obj.fullname else _('N/A'), + role, + ]) + + output_stream = StringIO() + output_stream.write(BOM_UTF8) + unicodecsv.writer(output_stream, encoding=u'utf-8').writerows(results) + + file_name = u'{org_id}-{members}'.format( + org_id=group_obj.name, + members=_(u'members')) + + output_stream.seek(0) + response = make_response(output_stream.read()) + output_stream.close() + content_disposition = u'attachment; filename="{name}.csv"'.format( + name=file_name) + content_type = b'text/csv; charset=utf-8' + response.headers['Content-Type'] = content_type + response.headers['Content-Disposition'] = content_disposition return response diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index df99c6d3bfc..cfef3c10222 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -1,7 +1,7 @@ # encoding: utf-8 from __future__ import annotations -from typing import Any, Optional, cast +from typing import Any, Optional, cast, Union from itertools import zip_longest from io import StringIO @@ -119,10 +119,10 @@ def dump(resource_id: str): user_context = g.user - def start_stream_writer(output_stream, fields): + def start_stream_writer(output_stream: StringIO, fields: dict[str, Any]): return writer_factory(output_stream, fields, bom=bom) - def stream_result_page(offs, lim): + def stream_result_page(offs: int, lim: Union[None, int]): return get_action(u'datastore_search')( {u'user': user_context}, dict({ @@ -136,7 +136,8 @@ def stream_result_page(offs, lim): }, **search_params) ) - def stream_dump(offset, limit, paginate_by, result): + def stream_dump(offset: int, limit: Union[None, int], + paginate_by: int, result: dict[str, Any]): with start_stream_writer(output_stream, result[u'fields']) as output: while True: if limit is not None and limit <= 0: @@ -181,8 +182,8 @@ def stream_dump(offset, limit, paginate_by, result): return Response(stream_dump(offset, limit, paginate_by, result), mimetype=u'application/octet-stream', - headers={'Content-Type': content_type, - 'Content-disposition': content_disposition}) + headers={'Content-Type': content_type, # type: ignore + 'Content-disposition': content_disposition}) # type: ignore except ObjectNotFound: abort(404, _(u'DataStore resource not found')) From 233946dd9c363ce4c529001c91eb147472f5d886 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 28 Sep 2023 14:41:10 +0000 Subject: [PATCH 04/26] fix(views): typing; - Fix more typing issues. --- ckan/views/group.py | 4 ++-- ckanext/datastore/blueprint.py | 7 ++++--- ckanext/datastore/writer.py | 3 +-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ckan/views/group.py b/ckan/views/group.py index 37f13cfd1ad..ead4bd18d12 100644 --- a/ckan/views/group.py +++ b/ckan/views/group.py @@ -8,7 +8,7 @@ from typing_extensions import Literal from urllib.parse import urlencode -import unicodecsv +import csv from io import StringIO from codecs import BOM_UTF8 @@ -609,7 +609,7 @@ def member_dump(id: str, group_type: str, is_organization: bool): output_stream = StringIO() output_stream.write(BOM_UTF8) - unicodecsv.writer(output_stream, encoding=u'utf-8').writerows(results) + csv.writer(output_stream).writerows(results) file_name = u'{org_id}-{members}'.format( org_id=group_obj.name, diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index cfef3c10222..c72179cfb94 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -119,7 +119,8 @@ def dump(resource_id: str): user_context = g.user - def start_stream_writer(output_stream: StringIO, fields: dict[str, Any]): + def start_stream_writer(output_stream: StringIO, + fields: list[dict[str, Any]]): return writer_factory(output_stream, fields, bom=bom) def stream_result_page(offs: int, lim: Union[None, int]): @@ -128,7 +129,7 @@ def stream_result_page(offs: int, lim: Union[None, int]): dict({ u'resource_id': resource_id, u'limit': PAGINATE_BY - if limit is None else min(PAGINATE_BY, lim), + if limit is None else min(PAGINATE_BY, lim), # type: ignore u'offset': offs, u'sort': sort, u'records_format': records_format, @@ -183,7 +184,7 @@ def stream_dump(offset: int, limit: Union[None, int], return Response(stream_dump(offset, limit, paginate_by, result), mimetype=u'application/octet-stream', headers={'Content-Type': content_type, # type: ignore - 'Content-disposition': content_disposition}) # type: ignore + 'Content-disposition': content_disposition}) except ObjectNotFound: abort(404, _(u'DataStore resource not found')) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 8234c30263b..455fa6a4535 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -25,7 +25,7 @@ def csv_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) - csv.writer(output, encoding=u'utf-8').writerow( + csv.writer(output).writerow( f['id'] for f in fields) yield TextWriter(output) @@ -45,7 +45,6 @@ def tsv_writer(output: Any, fields: list[dict[str, Any]], csv.writer( output, - encoding=u'utf-8', dialect='excel-tab').writerow( f['id'] for f in fields) yield TextWriter(output) From 36b44ec9f47d2a5a6fb6709bd890b0ef2fa86f4b Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 28 Sep 2023 15:00:10 +0000 Subject: [PATCH 05/26] fix(typing): more typing fixes; - Even more typing fixes. --- ckan/views/group.py | 6 +++--- ckanext/datastore/blueprint.py | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ckan/views/group.py b/ckan/views/group.py index ead4bd18d12..5858bcbd209 100644 --- a/ckan/views/group.py +++ b/ckan/views/group.py @@ -602,13 +602,13 @@ def member_dump(id: str, group_type: str, is_organization: bool): continue results.append([ user_obj.name, - user_obj.email, + user_obj.email, # type: ignore user_obj.fullname if user_obj.fullname else _('N/A'), role, ]) output_stream = StringIO() - output_stream.write(BOM_UTF8) + output_stream.write(BOM_UTF8) # type: ignore csv.writer(output_stream).writerows(results) file_name = u'{org_id}-{members}'.format( @@ -621,7 +621,7 @@ def member_dump(id: str, group_type: str, is_organization: bool): content_disposition = u'attachment; filename="{name}.csv"'.format( name=file_name) content_type = b'text/csv; charset=utf-8' - response.headers['Content-Type'] = content_type + response.headers['Content-Type'] = content_type # type: ignore response.headers['Content-Disposition'] = content_disposition return response diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index c72179cfb94..ce55f1cdcb3 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -181,10 +181,15 @@ def stream_dump(offset: int, limit: Union[None, int], else: paginate_by = PAGINATE_BY + headers = {} + if content_type: + headers['Content-Type'] = content_type + if content_disposition: + headers['Content-disposition'] = content_disposition + return Response(stream_dump(offset, limit, paginate_by, result), mimetype=u'application/octet-stream', - headers={'Content-Type': content_type, # type: ignore - 'Content-disposition': content_disposition}) + headers=headers) # type: ignore except ObjectNotFound: abort(404, _(u'DataStore resource not found')) From f25d487aabd187c75ecce4e731e099e7530f1b18 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 28 Sep 2023 15:04:39 +0000 Subject: [PATCH 06/26] fix(typing): more typing; - Please let this be the last fix. --- ckanext/datastore/blueprint.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index ce55f1cdcb3..e9090e9b136 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -88,6 +88,9 @@ def dump(resource_id: str): ] } + content_type = None + content_disposition = None + if fmt == u'csv': writer_factory = csv_writer records_format = u'csv' @@ -189,7 +192,7 @@ def stream_dump(offset: int, limit: Union[None, int], return Response(stream_dump(offset, limit, paginate_by, result), mimetype=u'application/octet-stream', - headers=headers) # type: ignore + headers=headers) except ObjectNotFound: abort(404, _(u'DataStore resource not found')) From f54720ceb0ab1333199c38f40c26021ac6ec3f6d Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 5 Oct 2023 13:48:30 +0000 Subject: [PATCH 07/26] feat(views): removed u strings, new 404 condition; - Removed `u` strings in new code. - Changed resource id validator to datastore search. --- ckanext/datastore/blueprint.py | 82 +++++++++++++++++----------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index e9090e9b136..3ec43071007 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -36,7 +36,6 @@ one_of = cast(ValidatorFactory, get_validator(u'one_of')) default = cast(ValidatorFactory, get_validator(u'default')) unicode_only = get_validator(u'unicode_only') -resource_id_validator = get_validator(u'resource_id_validator') DUMP_FORMATS = u'csv', u'tsv', u'json', u'xml' PAGINATE_BY = 32000 @@ -62,61 +61,62 @@ def dump_schema() -> Schema: def dump(resource_id: str): try: - resource_id = resource_id_validator(resource_id) # type: ignore - except Invalid: - abort(404, _(u'DataStore resource not found')) + get_action('datastore_search')({}, {'resource_id': resource_id, + 'limit': 0}) + except ObjectNotFound: + abort(404, _('DataStore resource not found')) data, errors = dict_fns.validate(request.args.to_dict(), dump_schema()) if errors: abort( - 400, u'\n'.join( - u'{0}: {1}'.format(k, u' '.join(e)) for k, e in errors.items() + 400, '\n'.join( + '{0}: {1}'.format(k, ' '.join(e)) for k, e in errors.items() ) ) - fmt = data[u'format'] - offset = data[u'offset'] - limit = data.get(u'limit') - options = {u'bom': data[u'bom']} - sort = data[u'sort'] + fmt = data['format'] + offset = data['offset'] + limit = data.get('limit') + options = {'bom': data['bom']} + sort = data['sort'] search_params = { k: v for k, v in data.items() if k in [ - u'filters', u'q', u'distinct', u'plain', u'language', - u'fields' + 'filters', 'q', 'distinct', 'plain', 'language', + 'fields' ] } content_type = None content_disposition = None - if fmt == u'csv': + if fmt == 'csv': writer_factory = csv_writer - records_format = u'csv' - content_disposition = u'attachment; filename="{name}.csv"'.format( + records_format = 'csv' + content_disposition = 'attachment; filename="{name}.csv"'.format( name=resource_id) content_type = b'text/csv; charset=utf-8' - elif fmt == u'tsv': + elif fmt == 'tsv': writer_factory = tsv_writer - records_format = u'tsv' - content_disposition = u'attachment; filename="{name}.tsv"'.format( + records_format = 'tsv' + content_disposition = 'attachment; filename="{name}.tsv"'.format( name=resource_id) content_type = b'text/tab-separated-values; charset=utf-8' - elif fmt == u'json': + elif fmt == 'json': writer_factory = json_writer - records_format = u'lists' - content_disposition = u'attachment; filename="{name}.json"'.format( + records_format = 'lists' + content_disposition = 'attachment; filename="{name}.json"'.format( name=resource_id) content_type = b'application/json; charset=utf-8' - elif fmt == u'xml': + elif fmt == 'xml': writer_factory = xml_writer - records_format = u'objects' - content_disposition = u'attachment; filename="{name}.xml"'.format( + records_format = 'objects' + content_disposition = 'attachment; filename="{name}.xml"'.format( name=resource_id) content_type = b'text/xml; charset=utf-8' - bom = options.get(u'bom', False) + bom = options.get('bom', False) output_stream = StringIO() @@ -127,27 +127,27 @@ def start_stream_writer(output_stream: StringIO, return writer_factory(output_stream, fields, bom=bom) def stream_result_page(offs: int, lim: Union[None, int]): - return get_action(u'datastore_search')( - {u'user': user_context}, + return get_action('datastore_search')( + {'user': user_context}, dict({ - u'resource_id': resource_id, - u'limit': PAGINATE_BY + 'resource_id': resource_id, + 'limit': PAGINATE_BY if limit is None else min(PAGINATE_BY, lim), # type: ignore - u'offset': offs, - u'sort': sort, - u'records_format': records_format, - u'include_total': False, + 'offset': offs, + 'sort': sort, + 'records_format': records_format, + 'include_total': False, }, **search_params) ) def stream_dump(offset: int, limit: Union[None, int], paginate_by: int, result: dict[str, Any]): - with start_stream_writer(output_stream, result[u'fields']) as output: + with start_stream_writer(output_stream, result['fields']) as output: while True: if limit is not None and limit <= 0: break - records = result[u'records'] + records = result['records'] output.write_records(records) output_stream.seek(0) @@ -155,7 +155,7 @@ def stream_dump(offset: int, limit: Union[None, int], output_stream.truncate(0) output_stream.seek(0) - if records_format == u'objects' or records_format == u'lists': + if records_format == 'objects' or records_format == 'lists': if len(records) < paginate_by: break elif not records: @@ -174,13 +174,13 @@ def stream_dump(offset: int, limit: Union[None, int], try: result = stream_result_page(offset, limit) - if result[u'limit'] != limit: + if result['limit'] != limit: # `limit` (from PAGINATE_BY) must have been more than # ckan.datastore.search.rows_max, so datastore_search responded # with a limit matching ckan.datastore.search.rows_max. # So we need to paginate by that amount instead, otherwise # we'll have gaps in the records. - paginate_by = result[u'limit'] + paginate_by = result['limit'] else: paginate_by = PAGINATE_BY @@ -191,10 +191,10 @@ def stream_dump(offset: int, limit: Union[None, int], headers['Content-disposition'] = content_disposition return Response(stream_dump(offset, limit, paginate_by, result), - mimetype=u'application/octet-stream', + mimetype='application/octet-stream', headers=headers) except ObjectNotFound: - abort(404, _(u'DataStore resource not found')) + abort(404, _('DataStore resource not found')) class DictionaryView(MethodView): From bca446270de11eea0bf024df09c9dda41693173c Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 5 Oct 2023 14:21:25 +0000 Subject: [PATCH 08/26] feat(views): reusable code; - Reworked new code into `dump_to`. --- ckanext/datastore/blueprint.py | 210 +++++++++++++-------------------- 1 file changed, 81 insertions(+), 129 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 3ec43071007..6bb35d6f168 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -92,105 +92,41 @@ def dump(resource_id: str): content_disposition = None if fmt == 'csv': - writer_factory = csv_writer - records_format = 'csv' content_disposition = 'attachment; filename="{name}.csv"'.format( name=resource_id) content_type = b'text/csv; charset=utf-8' elif fmt == 'tsv': - writer_factory = tsv_writer - records_format = 'tsv' content_disposition = 'attachment; filename="{name}.tsv"'.format( name=resource_id) content_type = b'text/tab-separated-values; charset=utf-8' elif fmt == 'json': - writer_factory = json_writer - records_format = 'lists' content_disposition = 'attachment; filename="{name}.json"'.format( name=resource_id) content_type = b'application/json; charset=utf-8' elif fmt == 'xml': - writer_factory = xml_writer - records_format = 'objects' content_disposition = 'attachment; filename="{name}.xml"'.format( name=resource_id) content_type = b'text/xml; charset=utf-8' - - bom = options.get('bom', False) + else: + abort(404, _('Unsupported format')) output_stream = StringIO() - user_context = g.user - - def start_stream_writer(output_stream: StringIO, - fields: list[dict[str, Any]]): - return writer_factory(output_stream, fields, bom=bom) - - def stream_result_page(offs: int, lim: Union[None, int]): - return get_action('datastore_search')( - {'user': user_context}, - dict({ - 'resource_id': resource_id, - 'limit': PAGINATE_BY - if limit is None else min(PAGINATE_BY, lim), # type: ignore - 'offset': offs, - 'sort': sort, - 'records_format': records_format, - 'include_total': False, - }, **search_params) - ) - - def stream_dump(offset: int, limit: Union[None, int], - paginate_by: int, result: dict[str, Any]): - with start_stream_writer(output_stream, result['fields']) as output: - while True: - if limit is not None and limit <= 0: - break - - records = result['records'] - - output.write_records(records) - output_stream.seek(0) - yield output_stream.read() - output_stream.truncate(0) - output_stream.seek(0) - - if records_format == 'objects' or records_format == 'lists': - if len(records) < paginate_by: - break - elif not records: - break - - offset += paginate_by - if limit is not None: - limit -= paginate_by - if limit <= 0: - break - - result = stream_result_page(offset, limit) - output_stream.seek(0) - yield output_stream.read() + headers = {} + if content_type: + headers['Content-Type'] = content_type + if content_disposition: + headers['Content-disposition'] = content_disposition try: - result = stream_result_page(offset, limit) - - if result['limit'] != limit: - # `limit` (from PAGINATE_BY) must have been more than - # ckan.datastore.search.rows_max, so datastore_search responded - # with a limit matching ckan.datastore.search.rows_max. - # So we need to paginate by that amount instead, otherwise - # we'll have gaps in the records. - paginate_by = result['limit'] - else: - paginate_by = PAGINATE_BY - - headers = {} - if content_type: - headers['Content-Type'] = content_type - if content_disposition: - headers['Content-disposition'] = content_disposition - - return Response(stream_dump(offset, limit, paginate_by, result), + return Response(dump_to(resource_id, + output_stream, + fmt=fmt, + offset=offset, + limit=limit, + options=options, + sort=sort, + search_params=search_params), mimetype='application/octet-stream', headers=headers) except ObjectNotFound: @@ -267,75 +203,91 @@ def post(self, id: str, resource_id: str): def dump_to( - resource_id: str, output: Any, fmt: str, offset: int, limit: Optional[int], - options: dict[str, Any], sort: str, search_params: dict[str, Any] + resource_id: str, output_stream: Any, fmt: str, offset: int, + limit: Optional[int], options: dict[str, Any], sort: str, + search_params: dict[str, Any] ): - if fmt == u'csv': + if fmt == 'csv': writer_factory = csv_writer - records_format = u'csv' - elif fmt == u'tsv': + records_format = 'csv' + elif fmt == 'tsv': writer_factory = tsv_writer - records_format = u'tsv' - elif fmt == u'json': + records_format = 'tsv' + elif fmt == 'json': writer_factory = json_writer - records_format = u'lists' - elif fmt == u'xml': + records_format = 'lists' + elif fmt == 'xml': writer_factory = xml_writer - records_format = u'objects' + records_format = 'objects' else: - assert False, u'Unsupported format' + assert False, 'Unsupported format' - def start_writer(fields: Any): - bom = options.get(u'bom', False) - return writer_factory(output, fields, bom) + bom = options.get('bom', False) + + user_context = g.user + + def start_stream_writer(output_stream: StringIO, + fields: list[dict[str, Any]]): + return writer_factory(output_stream, fields, bom=bom) - def result_page(offs: int, lim: Optional[int]): - return get_action(u'datastore_search')( - {}, + def stream_result_page(offs: int, lim: Union[None, int]): + return get_action('datastore_search')( + {'user': user_context}, dict({ - u'resource_id': resource_id, - u'limit': PAGINATE_BY - if lim is None else min(PAGINATE_BY, lim), - u'offset': offs, - u'sort': sort, - u'records_format': records_format, - u'include_total': False, + 'resource_id': resource_id, + 'limit': PAGINATE_BY + if limit is None else min(PAGINATE_BY, lim), # type: ignore + 'offset': offs, + 'sort': sort, + 'records_format': records_format, + 'include_total': False, }, **search_params) ) - result = result_page(offset, limit) + def stream_dump(offset: int, limit: Union[None, int], + paginate_by: int, result: dict[str, Any]): + with start_stream_writer(output_stream, result['fields']) as output: + while True: + if limit is not None and limit <= 0: + break + + records = result['records'] - if result[u'limit'] != limit: - # `limit` (from PAGINATE_BY) must have been more than - # ckan.datastore.search.rows_max, so datastore_search responded with a - # limit matching ckan.datastore.search.rows_max. So we need to paginate - # by that amount instead, otherwise we'll have gaps in the records. - paginate_by = result[u'limit'] - else: - paginate_by = PAGINATE_BY + output.write_records(records) + output_stream.seek(0) + yield output_stream.read() + output_stream.truncate(0) + output_stream.seek(0) - with start_writer(result[u'fields']) as wr: - while True: - if limit is not None and limit <= 0: - break + if records_format == 'objects' or records_format == 'lists': + if len(records) < paginate_by: + break + elif not records: + break - records = result[u'records'] + offset += paginate_by + if limit is not None: + limit -= paginate_by + if limit <= 0: + break - wr.write_records(records) + result = stream_result_page(offset, limit) + output_stream.seek(0) + yield output_stream.read() - if records_format == u'objects' or records_format == u'lists': - if len(records) < paginate_by: - break - elif not records: - break + result = stream_result_page(offset, limit) - offset += paginate_by - if limit is not None: - limit -= paginate_by - if limit <= 0: - break + if result['limit'] != limit: + # `limit` (from PAGINATE_BY) must have been more than + # ckan.datastore.search.rows_max, so datastore_search responded + # with a limit matching ckan.datastore.search.rows_max. + # So we need to paginate by that amount instead, otherwise + # we'll have gaps in the records. + paginate_by = result['limit'] + else: + paginate_by = PAGINATE_BY - result = result_page(offset, limit) + return stream_dump(offset, limit, paginate_by, result) datastore.add_url_rule(u'/datastore/dump/', view_func=dump) From 970511060887fe12fd6b25ec4b549b7f486b857c Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 5 Oct 2023 14:26:46 +0000 Subject: [PATCH 09/26] fix(syntax): removed unused import; - Removed unused `Invalid` import. --- ckanext/datastore/blueprint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 6bb35d6f168..9fb0303c005 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -15,7 +15,7 @@ ) from ckan.plugins.toolkit import ( ObjectNotFound, NotAuthorized, get_action, get_validator, _, request, - abort, render, g, h, Invalid + abort, render, g, h ) from ckan.types import Schema, ValidatorFactory from ckanext.datastore.logic.schema import ( From 07535d78c6f5825e0795d0520f41fb2d4a9f796b Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 5 Oct 2023 17:19:53 +0000 Subject: [PATCH 10/26] feat(views): dump_to method for cli and view; - Reworked the `dump_to` method to work for the view method and the cli method. - Removed `bom` flag from cli command and handled condition based on format. --- ckanext/datastore/blueprint.py | 39 ++++++++++++++++++++++++++++------ ckanext/datastore/cli.py | 13 +++++++++--- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 9fb0303c005..9940bcc2cdc 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -88,6 +88,8 @@ def dump(resource_id: str): ] } + user_context = g.user + content_type = None content_disposition = None @@ -126,7 +128,8 @@ def dump(resource_id: str): limit=limit, options=options, sort=sort, - search_params=search_params), + search_params=search_params, + user=user_context), mimetype='application/octet-stream', headers=headers) except ObjectNotFound: @@ -205,7 +208,7 @@ def post(self, id: str, resource_id: str): def dump_to( resource_id: str, output_stream: Any, fmt: str, offset: int, limit: Optional[int], options: dict[str, Any], sort: str, - search_params: dict[str, Any] + search_params: dict[str, Any], user: str, is_generator: bool = True ): if fmt == 'csv': writer_factory = csv_writer @@ -224,15 +227,13 @@ def dump_to( bom = options.get('bom', False) - user_context = g.user - def start_stream_writer(output_stream: StringIO, fields: list[dict[str, Any]]): return writer_factory(output_stream, fields, bom=bom) def stream_result_page(offs: int, lim: Union[None, int]): return get_action('datastore_search')( - {'user': user_context}, + {'user': user}, dict({ 'resource_id': resource_id, 'limit': PAGINATE_BY @@ -287,7 +288,33 @@ def stream_dump(offset: int, limit: Union[None, int], else: paginate_by = PAGINATE_BY - return stream_dump(offset, limit, paginate_by, result) + # return generator method to yield data + if is_generator: + return stream_dump(offset, limit, paginate_by, result) + + # do not yield any data, just write it to the output stream + with start_stream_writer(output_stream, result['fields']) as output: + while True: + if limit is not None and limit <= 0: + break + + records = result['records'] + + output.write_records(records) + + if records_format == 'objects' or records_format == 'lists': + if len(records) < paginate_by: + break + elif not records: + break + + offset += paginate_by + if limit is not None: + limit -= paginate_by + if limit <= 0: + break + + result = stream_result_page(offset, limit) datastore.add_url_rule(u'/datastore/dump/', view_func=dump) diff --git a/ckanext/datastore/cli.py b/ckanext/datastore/cli.py index 7eeea7705b9..3380d688678 100644 --- a/ckanext/datastore/cli.py +++ b/ckanext/datastore/cli.py @@ -86,13 +86,18 @@ def permissions_sql(maindb: str, datastoredb: str, mainuser: str, @click.option(u'--format', default=u'csv', type=click.Choice(DUMP_FORMATS)) @click.option(u'--offset', type=click.IntRange(0, None), default=0) @click.option(u'--limit', type=click.IntRange(0)) -@click.option(u'--bom', is_flag=True) # FIXME: options based on format @click.pass_context def dump(ctx: Any, resource_id: str, output_file: Any, format: str, - offset: int, limit: int, bom: bool): + offset: int, limit: int): u'''Dump a datastore resource. ''' flask_app = ctx.meta['flask_app'] + user = logic.get_action('get_site_user')( + {'ignore_auth': True}, {}) + if format == 'csv' or format == 'tsv': + bom = True + elif format == 'json' or format == 'xml': + bom = False with flask_app.test_request_context(): dump_to( resource_id, @@ -102,7 +107,9 @@ def dump(ctx: Any, resource_id: str, output_file: Any, format: str, limit=limit, options={u'bom': bom}, sort=u'_id', - search_params={} + search_params={}, + user=user['name'], + is_generator=False ) From 4fef35d4cd95ba839920ae95356d94caacedc875 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 5 Oct 2023 17:24:34 +0000 Subject: [PATCH 11/26] fix(syntax): type syntax for unbound; - Fixed unbound for `bom`. --- ckanext/datastore/cli.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ckanext/datastore/cli.py b/ckanext/datastore/cli.py index 3380d688678..69c01a5788f 100644 --- a/ckanext/datastore/cli.py +++ b/ckanext/datastore/cli.py @@ -94,10 +94,9 @@ def dump(ctx: Any, resource_id: str, output_file: Any, format: str, flask_app = ctx.meta['flask_app'] user = logic.get_action('get_site_user')( {'ignore_auth': True}, {}) + bom = False if format == 'csv' or format == 'tsv': bom = True - elif format == 'json' or format == 'xml': - bom = False with flask_app.test_request_context(): dump_to( resource_id, From ca5c6430fa302b2eb046cd0ecebde83a2b346cb8 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 6 Oct 2023 13:23:06 +0000 Subject: [PATCH 12/26] feat(views): dump_to always retur generator; - Made `dump_to` always return a generator. - Removed unnecessary params from `dump_to`. - Brought back BOM flag in cli dump command. --- ckanext/datastore/blueprint.py | 55 +++++++++------------------------- ckanext/datastore/cli.py | 27 +++++++---------- 2 files changed, 25 insertions(+), 57 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 9940bcc2cdc..5160ee06b18 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -112,8 +112,6 @@ def dump(resource_id: str): else: abort(404, _('Unsupported format')) - output_stream = StringIO() - headers = {} if content_type: headers['Content-Type'] = content_type @@ -122,7 +120,6 @@ def dump(resource_id: str): try: return Response(dump_to(resource_id, - output_stream, fmt=fmt, offset=offset, limit=limit, @@ -206,10 +203,12 @@ def post(self, id: str, resource_id: str): def dump_to( - resource_id: str, output_stream: Any, fmt: str, offset: int, + resource_id: str, fmt: str, offset: int, limit: Optional[int], options: dict[str, Any], sort: str, - search_params: dict[str, Any], user: str, is_generator: bool = True + search_params: dict[str, Any], user: str ): + output_buffer = StringIO() + if fmt == 'csv': writer_factory = csv_writer records_format = 'csv' @@ -227,9 +226,9 @@ def dump_to( bom = options.get('bom', False) - def start_stream_writer(output_stream: StringIO, + def start_stream_writer(output_buffer: StringIO, fields: list[dict[str, Any]]): - return writer_factory(output_stream, fields, bom=bom) + return writer_factory(output_buffer, fields, bom=bom) def stream_result_page(offs: int, lim: Union[None, int]): return get_action('datastore_search')( @@ -247,7 +246,7 @@ def stream_result_page(offs: int, lim: Union[None, int]): def stream_dump(offset: int, limit: Union[None, int], paginate_by: int, result: dict[str, Any]): - with start_stream_writer(output_stream, result['fields']) as output: + with start_stream_writer(output_buffer, result['fields']) as output: while True: if limit is not None and limit <= 0: break @@ -255,10 +254,10 @@ def stream_dump(offset: int, limit: Union[None, int], records = result['records'] output.write_records(records) - output_stream.seek(0) - yield output_stream.read() - output_stream.truncate(0) - output_stream.seek(0) + output_buffer.seek(0) + yield output_buffer.read() + output_buffer.truncate(0) + output_buffer.seek(0) if records_format == 'objects' or records_format == 'lists': if len(records) < paginate_by: @@ -273,8 +272,8 @@ def stream_dump(offset: int, limit: Union[None, int], break result = stream_result_page(offset, limit) - output_stream.seek(0) - yield output_stream.read() + output_buffer.seek(0) + yield output_buffer.read() result = stream_result_page(offset, limit) @@ -288,33 +287,7 @@ def stream_dump(offset: int, limit: Union[None, int], else: paginate_by = PAGINATE_BY - # return generator method to yield data - if is_generator: - return stream_dump(offset, limit, paginate_by, result) - - # do not yield any data, just write it to the output stream - with start_stream_writer(output_stream, result['fields']) as output: - while True: - if limit is not None and limit <= 0: - break - - records = result['records'] - - output.write_records(records) - - if records_format == 'objects' or records_format == 'lists': - if len(records) < paginate_by: - break - elif not records: - break - - offset += paginate_by - if limit is not None: - limit -= paginate_by - if limit <= 0: - break - - result = stream_result_page(offset, limit) + return stream_dump(offset, limit, paginate_by, result) datastore.add_url_rule(u'/datastore/dump/', view_func=dump) diff --git a/ckanext/datastore/cli.py b/ckanext/datastore/cli.py index 69c01a5788f..8091cba93d8 100644 --- a/ckanext/datastore/cli.py +++ b/ckanext/datastore/cli.py @@ -86,30 +86,25 @@ def permissions_sql(maindb: str, datastoredb: str, mainuser: str, @click.option(u'--format', default=u'csv', type=click.Choice(DUMP_FORMATS)) @click.option(u'--offset', type=click.IntRange(0, None), default=0) @click.option(u'--limit', type=click.IntRange(0)) +@click.option(u'--bom', is_flag=True) @click.pass_context def dump(ctx: Any, resource_id: str, output_file: Any, format: str, - offset: int, limit: int): + offset: int, limit: int, bom: bool): u'''Dump a datastore resource. ''' flask_app = ctx.meta['flask_app'] user = logic.get_action('get_site_user')( {'ignore_auth': True}, {}) - bom = False - if format == 'csv' or format == 'tsv': - bom = True with flask_app.test_request_context(): - dump_to( - resource_id, - output_file, - fmt=format, - offset=offset, - limit=limit, - options={u'bom': bom}, - sort=u'_id', - search_params={}, - user=user['name'], - is_generator=False - ) + for block in dump_to(resource_id, + fmt=format, + offset=offset, + limit=limit, + options={u'bom': bom}, + sort=u'_id', + search_params={}, + user=user['name']): + output_file.write(block) def _parse_db_config(config_key: str = u'sqlalchemy.url'): From 50b9aa67853b3f3a45c9af30f4d217c17859b51c Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 11 Oct 2023 12:41:41 +0000 Subject: [PATCH 13/26] fix(tests): basestring for StringIO; - Removed byte strings in StringIO buffer usage. --- ckanext/datastore/writer.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 455fa6a4535..647eae29cd6 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -75,7 +75,7 @@ def json_writer(output: Any, fields: list[dict[str, Any]], '{\n "fields": %s,\n "records": [' % dumps( fields, ensure_ascii=False, separators=(',', ':'))) yield JSONWriter(output) - output.write(b'\n]}\n') + output.write('\n]}\n') class JSONWriter(object): @@ -87,9 +87,9 @@ def write_records(self, records: list[Any]): for r in records: if self.first: self.first = False - self.output.write(b'\n ') + self.output.write('\n ') else: - self.output.write(b',\n ') + self.output.write(',\n ') self.output.write(dumps( r, ensure_ascii=False, separators=(u',', u':')) @@ -109,9 +109,9 @@ def xml_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) output.write( - b'\n') + '\n') yield XMLWriter(output, [f[u'id'] for f in fields]) - output.write(b'\n') + output.write('\n') class XMLWriter(object): @@ -151,4 +151,4 @@ def write_records(self, records: list[Any]): for c in self.columns: self._insert_node(root, c, r[c]) ElementTree(root).write(self.output, encoding=u'utf-8') - self.output.write(b'\n') + self.output.write('\n') From 53baace45b7159d890b925953d64442771a4b674 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 14:28:31 +0000 Subject: [PATCH 14/26] fix(views): bytes buffer; - Use `BytesIO` instead of `StringIO`. --- ckanext/datastore/blueprint.py | 6 +++--- ckanext/datastore/writer.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 5160ee06b18..abbd3b31232 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -3,7 +3,7 @@ from typing import Any, Optional, cast, Union from itertools import zip_longest -from io import StringIO +from io import BytesIO from flask import Blueprint, Response from flask.views import MethodView @@ -207,7 +207,7 @@ def dump_to( limit: Optional[int], options: dict[str, Any], sort: str, search_params: dict[str, Any], user: str ): - output_buffer = StringIO() + output_buffer = BytesIO() if fmt == 'csv': writer_factory = csv_writer @@ -226,7 +226,7 @@ def dump_to( bom = options.get('bom', False) - def start_stream_writer(output_buffer: StringIO, + def start_stream_writer(output_buffer: BytesIO, fields: list[dict[str, Any]]): return writer_factory(output_buffer, fields, bom=bom) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 647eae29cd6..e43cf936fb3 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -72,10 +72,10 @@ def json_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) output.write( - '{\n "fields": %s,\n "records": [' % dumps( + b'{\n "fields": %s,\n "records": [' % dumps( fields, ensure_ascii=False, separators=(',', ':'))) yield JSONWriter(output) - output.write('\n]}\n') + output.write(b'\n]}\n') class JSONWriter(object): @@ -87,9 +87,9 @@ def write_records(self, records: list[Any]): for r in records: if self.first: self.first = False - self.output.write('\n ') + self.output.write(b'\n ') else: - self.output.write(',\n ') + self.output.write(b',\n ') self.output.write(dumps( r, ensure_ascii=False, separators=(u',', u':')) @@ -109,9 +109,9 @@ def xml_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) output.write( - '\n') + b'\n') yield XMLWriter(output, [f[u'id'] for f in fields]) - output.write('\n') + output.write(b'\n') class XMLWriter(object): @@ -151,4 +151,4 @@ def write_records(self, records: list[Any]): for c in self.columns: self._insert_node(root, c, r[c]) ElementTree(root).write(self.output, encoding=u'utf-8') - self.output.write('\n') + self.output.write(b'\n') From 17df4fa4d68e7d64838793288d6bf8f234b77faf Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 17:30:27 +0000 Subject: [PATCH 15/26] fix(encoding): byte encoding for writers; - Added `utf-8` encoding for byte writing. --- ckanext/datastore/writer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index e43cf936fb3..4e335226968 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -25,7 +25,7 @@ def csv_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) - csv.writer(output).writerow( + csv.writer(output, encoding=u'utf-8').writerow( # type: ignore f['id'] for f in fields) yield TextWriter(output) @@ -45,6 +45,7 @@ def tsv_writer(output: Any, fields: list[dict[str, Any]], csv.writer( output, + encoding=u'utf-8', # type: ignore dialect='excel-tab').writerow( f['id'] for f in fields) yield TextWriter(output) @@ -73,7 +74,7 @@ def json_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) output.write( b'{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':'))) + fields, ensure_ascii=False, separators=(',', ':')).encode('utf8')) yield JSONWriter(output) output.write(b'\n]}\n') From 10a3ce60c78d0fcdd0a06ee037b6368c8dd5a2c7 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 19:01:09 +0000 Subject: [PATCH 16/26] fix(encoding): byte encoding for writers; - Added `utf-8` encoding for byte writing. --- ckanext/datastore/writer.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 4e335226968..6cc5ccd3547 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -25,8 +25,8 @@ def csv_writer(output: Any, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) - csv.writer(output, encoding=u'utf-8').writerow( # type: ignore - f['id'] for f in fields) + csv.writer(output).writerow( + f['id'].encode('utf8') for f in fields) yield TextWriter(output) @@ -45,9 +45,8 @@ def tsv_writer(output: Any, fields: list[dict[str, Any]], csv.writer( output, - encoding=u'utf-8', # type: ignore dialect='excel-tab').writerow( - f['id'] for f in fields) + f['id'].encode('utf8') for f in fields) yield TextWriter(output) @@ -57,7 +56,7 @@ def __init__(self, output: Any): self.output = output def write_records(self, records: list[Any]): - self.output.write(records) + self.output.write([r.encode('utf-8') for r in records]) @contextmanager @@ -111,7 +110,7 @@ def xml_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) output.write( b'\n') - yield XMLWriter(output, [f[u'id'] for f in fields]) + yield XMLWriter(output, [f['id'].encode('utf8') for f in fields]) output.write(b'\n') From 8f3c8666a875070b50dc8f3e500bcf24b58226cd Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 19:25:33 +0000 Subject: [PATCH 17/26] fix(encoding): byte encoding for writers; - Added `utf-8` encoding for byte writing. --- ckanext/datastore/writer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 6cc5ccd3547..1c6f298bac7 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -26,7 +26,7 @@ def csv_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) csv.writer(output).writerow( - f['id'].encode('utf8') for f in fields) + f['id'].encode('utf-8') for f in fields) yield TextWriter(output) @@ -46,7 +46,7 @@ def tsv_writer(output: Any, fields: list[dict[str, Any]], csv.writer( output, dialect='excel-tab').writerow( - f['id'].encode('utf8') for f in fields) + f['id'].encode('utf-8') for f in fields) yield TextWriter(output) @@ -73,7 +73,7 @@ def json_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) output.write( b'{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':')).encode('utf8')) + fields, ensure_ascii=False, separators=(',', ':')).encode('utf-8')) yield JSONWriter(output) output.write(b'\n]}\n') @@ -110,7 +110,7 @@ def xml_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) output.write( b'\n') - yield XMLWriter(output, [f['id'].encode('utf8') for f in fields]) + yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) output.write(b'\n') From fd77b4f70c452e2fc2d47675cd1b884dcb1f678d Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 19:52:10 +0000 Subject: [PATCH 18/26] fix(encoding): byte encoding for writers; - Added `utf-8` encoding for byte writing. --- ckanext/datastore/writer.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 1c6f298bac7..71053585115 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -1,6 +1,8 @@ # encoding: utf-8 from __future__ import annotations +from io import BytesIO + from contextlib import contextmanager from typing import Any, Optional from simplejson import dumps @@ -13,7 +15,7 @@ @contextmanager -def csv_writer(output: Any, fields: list[dict[str, Any]], +def csv_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 CSV data to file @@ -26,12 +28,12 @@ def csv_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) csv.writer(output).writerow( - f['id'].encode('utf-8') for f in fields) + (f['id']).encode('utf-8') for f in fields) yield TextWriter(output) @contextmanager -def tsv_writer(output: Any, fields: list[dict[str, Any]], +def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 TSV data to file @@ -46,13 +48,13 @@ def tsv_writer(output: Any, fields: list[dict[str, Any]], csv.writer( output, dialect='excel-tab').writerow( - f['id'].encode('utf-8') for f in fields) + (f['id']).encode('utf-8') for f in fields) yield TextWriter(output) class TextWriter(object): u'text in, text out' - def __init__(self, output: Any): + def __init__(self, output: BytesIO): self.output = output def write_records(self, records: list[Any]): @@ -60,7 +62,7 @@ def write_records(self, records: list[Any]): @contextmanager -def json_writer(output: Any, fields: list[dict[str, Any]], +def json_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 JSON data to file @@ -79,7 +81,7 @@ def json_writer(output: Any, fields: list[dict[str, Any]], class JSONWriter(object): - def __init__(self, output: Any): + def __init__(self, output: BytesIO): self.output = output self.first = True @@ -97,7 +99,7 @@ def write_records(self, records: list[Any]): @contextmanager -def xml_writer(output: Any, fields: list[dict[str, Any]], +def xml_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 XML data to file @@ -110,7 +112,7 @@ def xml_writer(output: Any, fields: list[dict[str, Any]], output.write(BOM_UTF8) output.write( b'\n') - yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) + yield XMLWriter(output, [(f['id']).encode('utf-8') for f in fields]) output.write(b'\n') @@ -118,7 +120,7 @@ class XMLWriter(object): _key_attr = u'key' _value_tag = u'value' - def __init__(self, output: Any, columns: list[str]): + def __init__(self, output: BytesIO, columns: list[str]): self.output = output self.id_col = columns[0] == u'_id' if self.id_col: From e3ab5c4f652152c78a47b9d7263ccfd73c659222 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 20:00:57 +0000 Subject: [PATCH 19/26] revert(encoding): changed bytes back to str; - Reverted some `str` to `bytes` changes. --- ckanext/datastore/blueprint.py | 6 +++--- ckanext/datastore/writer.py | 37 +++++++++++++++++----------------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index abbd3b31232..5160ee06b18 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -3,7 +3,7 @@ from typing import Any, Optional, cast, Union from itertools import zip_longest -from io import BytesIO +from io import StringIO from flask import Blueprint, Response from flask.views import MethodView @@ -207,7 +207,7 @@ def dump_to( limit: Optional[int], options: dict[str, Any], sort: str, search_params: dict[str, Any], user: str ): - output_buffer = BytesIO() + output_buffer = StringIO() if fmt == 'csv': writer_factory = csv_writer @@ -226,7 +226,7 @@ def dump_to( bom = options.get('bom', False) - def start_stream_writer(output_buffer: BytesIO, + def start_stream_writer(output_buffer: StringIO, fields: list[dict[str, Any]]): return writer_factory(output_buffer, fields, bom=bom) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 71053585115..e2dd838e0d3 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -1,7 +1,7 @@ # encoding: utf-8 from __future__ import annotations -from io import BytesIO +from io import StringIO from contextlib import contextmanager from typing import Any, Optional @@ -15,7 +15,7 @@ @contextmanager -def csv_writer(output: BytesIO, fields: list[dict[str, Any]], +def csv_writer(output: StringIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 CSV data to file @@ -28,12 +28,12 @@ def csv_writer(output: BytesIO, fields: list[dict[str, Any]], output.write(BOM_UTF8) csv.writer(output).writerow( - (f['id']).encode('utf-8') for f in fields) + f['id'].encode('utf-8') for f in fields) yield TextWriter(output) @contextmanager -def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], +def tsv_writer(output: StringIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 TSV data to file @@ -48,13 +48,13 @@ def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], csv.writer( output, dialect='excel-tab').writerow( - (f['id']).encode('utf-8') for f in fields) + f['id'].encode('utf-8') for f in fields) yield TextWriter(output) class TextWriter(object): u'text in, text out' - def __init__(self, output: BytesIO): + def __init__(self, output: StringIO): self.output = output def write_records(self, records: list[Any]): @@ -62,7 +62,7 @@ def write_records(self, records: list[Any]): @contextmanager -def json_writer(output: BytesIO, fields: list[dict[str, Any]], +def json_writer(output: StringIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 JSON data to file @@ -74,14 +74,15 @@ def json_writer(output: BytesIO, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) output.write( - b'{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':')).encode('utf-8')) + '{\n "fields": %s,\n "records": [' % dumps( + fields, ensure_ascii=False, separators=(',', ':')) + .encode('utf-8')) yield JSONWriter(output) - output.write(b'\n]}\n') + output.write('\n]}\n') class JSONWriter(object): - def __init__(self, output: BytesIO): + def __init__(self, output: StringIO): self.output = output self.first = True @@ -89,9 +90,9 @@ def write_records(self, records: list[Any]): for r in records: if self.first: self.first = False - self.output.write(b'\n ') + self.output.write('\n ') else: - self.output.write(b',\n ') + self.output.write(',\n ') self.output.write(dumps( r, ensure_ascii=False, separators=(u',', u':')) @@ -99,7 +100,7 @@ def write_records(self, records: list[Any]): @contextmanager -def xml_writer(output: BytesIO, fields: list[dict[str, Any]], +def xml_writer(output: StringIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 XML data to file @@ -111,16 +112,16 @@ def xml_writer(output: BytesIO, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) output.write( - b'\n') - yield XMLWriter(output, [(f['id']).encode('utf-8') for f in fields]) - output.write(b'\n') + '\n') + yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) + output.write('\n') class XMLWriter(object): _key_attr = u'key' _value_tag = u'value' - def __init__(self, output: BytesIO, columns: list[str]): + def __init__(self, output: StringIO, columns: list[str]): self.output = output self.id_col = columns[0] == u'_id' if self.id_col: From 6c1ea9f277745cf72cea13684a0da06493bf25ad Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 12 Oct 2023 20:07:32 +0000 Subject: [PATCH 20/26] fix(syntax): misc syntax; - Typing, - Indents. --- ckanext/datastore/writer.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index e2dd838e0d3..5e1a9782e4d 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -25,7 +25,7 @@ def csv_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8) + output.write(BOM_UTF8.decode()) csv.writer(output).writerow( f['id'].encode('utf-8') for f in fields) @@ -43,7 +43,7 @@ def tsv_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8) + output.write(BOM_UTF8.decode()) csv.writer( output, @@ -72,11 +72,10 @@ def json_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8) + output.write(BOM_UTF8.decode()) output.write( '{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':')) - .encode('utf-8')) + fields, ensure_ascii=False, separators=(',', ':')).encode('utf-8')) yield JSONWriter(output) output.write('\n]}\n') @@ -110,7 +109,7 @@ def xml_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8) + output.write(BOM_UTF8.decode()) output.write( '\n') yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) From 7c8cfd04b9d89109a55a53fffec552c05ec139a0 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 13 Oct 2023 13:27:07 +0000 Subject: [PATCH 21/26] revert(encoding): changed bytes back to str; - Reverted some `str` to `bytes` changes. --- ckanext/datastore/writer.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 5e1a9782e4d..2b7b50fa75a 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -28,7 +28,7 @@ def csv_writer(output: StringIO, fields: list[dict[str, Any]], output.write(BOM_UTF8.decode()) csv.writer(output).writerow( - f['id'].encode('utf-8') for f in fields) + f['id'] for f in fields) yield TextWriter(output) @@ -48,7 +48,7 @@ def tsv_writer(output: StringIO, fields: list[dict[str, Any]], csv.writer( output, dialect='excel-tab').writerow( - f['id'].encode('utf-8') for f in fields) + f['id'] for f in fields) yield TextWriter(output) @@ -58,7 +58,7 @@ def __init__(self, output: StringIO): self.output = output def write_records(self, records: list[Any]): - self.output.write([r.encode('utf-8') for r in records]) + self.output.write(r for r in records) @contextmanager @@ -75,7 +75,7 @@ def json_writer(output: StringIO, fields: list[dict[str, Any]], output.write(BOM_UTF8.decode()) output.write( '{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':')).encode('utf-8')) + fields, ensure_ascii=False, separators=(',', ':'))) yield JSONWriter(output) output.write('\n]}\n') @@ -94,8 +94,7 @@ def write_records(self, records: list[Any]): self.output.write(',\n ') self.output.write(dumps( - r, ensure_ascii=False, separators=(u',', u':')) - .encode('utf-8')) + r, ensure_ascii=False, separators=(u',', u':'))) @contextmanager @@ -112,7 +111,7 @@ def xml_writer(output: StringIO, fields: list[dict[str, Any]], output.write(BOM_UTF8.decode()) output.write( '\n') - yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) + yield XMLWriter(output, [f['id'] for f in fields]) output.write('\n') @@ -153,4 +152,4 @@ def write_records(self, records: list[Any]): for c in self.columns: self._insert_node(root, c, r[c]) ElementTree(root).write(self.output, encoding=u'utf-8') - self.output.write(b'\n') + self.output.write('\n') From c3e44ac8b785008e2ad02b917dce49153377ff56 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 13 Oct 2023 13:57:21 +0000 Subject: [PATCH 22/26] feat(dev): another attempt; - More attempts at byte writing for datastore dump. --- ckanext/datastore/blueprint.py | 6 ++-- ckanext/datastore/writer.py | 51 +++++++++++++++++----------------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 5160ee06b18..abbd3b31232 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -3,7 +3,7 @@ from typing import Any, Optional, cast, Union from itertools import zip_longest -from io import StringIO +from io import BytesIO from flask import Blueprint, Response from flask.views import MethodView @@ -207,7 +207,7 @@ def dump_to( limit: Optional[int], options: dict[str, Any], sort: str, search_params: dict[str, Any], user: str ): - output_buffer = StringIO() + output_buffer = BytesIO() if fmt == 'csv': writer_factory = csv_writer @@ -226,7 +226,7 @@ def dump_to( bom = options.get('bom', False) - def start_stream_writer(output_buffer: StringIO, + def start_stream_writer(output_buffer: BytesIO, fields: list[dict[str, Any]]): return writer_factory(output_buffer, fields, bom=bom) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 2b7b50fa75a..8686397d93c 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -1,7 +1,7 @@ # encoding: utf-8 from __future__ import annotations -from io import StringIO +from io import BytesIO from contextlib import contextmanager from typing import Any, Optional @@ -15,7 +15,7 @@ @contextmanager -def csv_writer(output: StringIO, fields: list[dict[str, Any]], +def csv_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 CSV data to file @@ -25,15 +25,15 @@ def csv_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8.decode()) + output.write(BOM_UTF8) csv.writer(output).writerow( - f['id'] for f in fields) + f['id'].encode('utf-8') for f in fields) yield TextWriter(output) @contextmanager -def tsv_writer(output: StringIO, fields: list[dict[str, Any]], +def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 TSV data to file @@ -43,26 +43,26 @@ def tsv_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8.decode()) + output.write(BOM_UTF8) csv.writer( output, dialect='excel-tab').writerow( - f['id'] for f in fields) + f['id'].encode('utf-8') for f in fields) yield TextWriter(output) class TextWriter(object): u'text in, text out' - def __init__(self, output: StringIO): + def __init__(self, output: BytesIO, dialect): self.output = output def write_records(self, records: list[Any]): - self.output.write(r for r in records) + self.output.write(records) # type: ignore @contextmanager -def json_writer(output: StringIO, fields: list[dict[str, Any]], +def json_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 JSON data to file @@ -72,16 +72,16 @@ def json_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8.decode()) + output.write(BOM_UTF8) output.write( - '{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':'))) + b'{\n "fields": %s,\n "records": [' % dumps( + fields, ensure_ascii=False, separators=(',', ':')).encode('utf-8')) yield JSONWriter(output) - output.write('\n]}\n') + output.write(b'\n]}\n') class JSONWriter(object): - def __init__(self, output: StringIO): + def __init__(self, output: BytesIO): self.output = output self.first = True @@ -89,16 +89,17 @@ def write_records(self, records: list[Any]): for r in records: if self.first: self.first = False - self.output.write('\n ') + self.output.write(b'\n ') else: - self.output.write(',\n ') + self.output.write(b',\n ') self.output.write(dumps( - r, ensure_ascii=False, separators=(u',', u':'))) + r, ensure_ascii=False, separators=(u',', u':')) + .encode('utf-8')) @contextmanager -def xml_writer(output: StringIO, fields: list[dict[str, Any]], +def xml_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): u'''Context manager for writing UTF-8 XML data to file @@ -108,18 +109,18 @@ def xml_writer(output: StringIO, fields: list[dict[str, Any]], ''' if bom: - output.write(BOM_UTF8.decode()) + output.write(BOM_UTF8) output.write( - '\n') - yield XMLWriter(output, [f['id'] for f in fields]) - output.write('\n') + b'\n') + yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) + output.write(b'\n') class XMLWriter(object): _key_attr = u'key' _value_tag = u'value' - def __init__(self, output: StringIO, columns: list[str]): + def __init__(self, output: BytesIO, columns: list[str]): self.output = output self.id_col = columns[0] == u'_id' if self.id_col: @@ -152,4 +153,4 @@ def write_records(self, records: list[Any]): for c in self.columns: self._insert_node(root, c, r[c]) ElementTree(root).write(self.output, encoding=u'utf-8') - self.output.write('\n') + self.output.write(b'\n') From 06b654c4ea6765b0447a0342d3140fb08731acaf Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 13 Oct 2023 14:32:01 +0000 Subject: [PATCH 23/26] feat(dev): another attempt; - More attempts at byte writing for datastore dump. --- ckanext/datastore/writer.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 8686397d93c..5be9c225b09 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -17,7 +17,7 @@ @contextmanager def csv_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): - u'''Context manager for writing UTF-8 CSV data to file + '''Context manager for writing UTF-8 CSV data to file :param response: file-like object for writing data :param fields: list of datastore fields @@ -27,7 +27,7 @@ def csv_writer(output: BytesIO, fields: list[dict[str, Any]], if bom: output.write(BOM_UTF8) - csv.writer(output).writerow( + csv.writer(output).writerow( # type: ignore f['id'].encode('utf-8') for f in fields) yield TextWriter(output) @@ -35,7 +35,7 @@ def csv_writer(output: BytesIO, fields: list[dict[str, Any]], @contextmanager def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): - u'''Context manager for writing UTF-8 TSV data to file + '''Context manager for writing UTF-8 TSV data to file :param response: file-like object for writing data :param fields: list of datastore fields @@ -46,15 +46,15 @@ def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], output.write(BOM_UTF8) csv.writer( - output, + output, # type: ignore dialect='excel-tab').writerow( f['id'].encode('utf-8') for f in fields) yield TextWriter(output) class TextWriter(object): - u'text in, text out' - def __init__(self, output: BytesIO, dialect): + 'text in, text out' + def __init__(self, output: BytesIO): self.output = output def write_records(self, records: list[Any]): @@ -64,7 +64,7 @@ def write_records(self, records: list[Any]): @contextmanager def json_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): - u'''Context manager for writing UTF-8 JSON data to file + '''Context manager for writing UTF-8 JSON data to file :param response: file-like object for writing data :param fields: list of datastore fields @@ -94,14 +94,14 @@ def write_records(self, records: list[Any]): self.output.write(b',\n ') self.output.write(dumps( - r, ensure_ascii=False, separators=(u',', u':')) + r, ensure_ascii=False, separators=(',', ':')) .encode('utf-8')) @contextmanager def xml_writer(output: BytesIO, fields: list[dict[str, Any]], bom: bool = False): - u'''Context manager for writing UTF-8 XML data to file + '''Context manager for writing UTF-8 XML data to file :param response: file-like object for writing data :param fields: list of datastore fields @@ -112,17 +112,17 @@ def xml_writer(output: BytesIO, fields: list[dict[str, Any]], output.write(BOM_UTF8) output.write( b'\n') - yield XMLWriter(output, [f['id'].encode('utf-8') for f in fields]) + yield XMLWriter(output, [f['id'] for f in fields]) output.write(b'\n') class XMLWriter(object): - _key_attr = u'key' - _value_tag = u'value' + _key_attr = 'key' + _value_tag = 'value' def __init__(self, output: BytesIO, columns: list[str]): self.output = output - self.id_col = columns[0] == u'_id' + self.id_col = columns[0] == '_id' if self.id_col: columns = columns[1:] self.columns = columns @@ -131,7 +131,7 @@ def _insert_node(self, root: Any, k: str, v: Any, key_attr: Optional[Any] = None): element = SubElement(root, k) if v is None: - element.attrib[u'xsi:nil'] = u'true' + element.attrib['xsi:nil'] = 'true' elif not isinstance(v, (list, dict)): element.text = str(v) else: @@ -147,10 +147,10 @@ def _insert_node(self, root: Any, k: str, v: Any, def write_records(self, records: list[Any]): for r in records: - root = Element(u'row') + root = Element('row') if self.id_col: - root.attrib[u'_id'] = str(r[u'_id']) + root.attrib['_id'] = str(r['_id']) for c in self.columns: self._insert_node(root, c, r[c]) - ElementTree(root).write(self.output, encoding=u'utf-8') + ElementTree(root).write(self.output, encoding='utf-8') self.output.write(b'\n') From 0f3ea9a710829a2fe356ade3da3c3c3cf458d2aa Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 13 Oct 2023 18:45:52 +0000 Subject: [PATCH 24/26] feat(dev): moved output buffers into writer methods; - Moved output buffers into the datastore writer methods. - Made write record methods return bytes. - Added end file method for returning final bytes of the files. --- ckanext/datastore/blueprint.py | 20 ++----- ckanext/datastore/tests/test_dump.py | 2 +- ckanext/datastore/writer.py | 84 ++++++++++++++++++---------- 3 files changed, 63 insertions(+), 43 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index abbd3b31232..765aeecf4c1 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -3,7 +3,6 @@ from typing import Any, Optional, cast, Union from itertools import zip_longest -from io import BytesIO from flask import Blueprint, Response from flask.views import MethodView @@ -207,8 +206,6 @@ def dump_to( limit: Optional[int], options: dict[str, Any], sort: str, search_params: dict[str, Any], user: str ): - output_buffer = BytesIO() - if fmt == 'csv': writer_factory = csv_writer records_format = 'csv' @@ -226,9 +223,8 @@ def dump_to( bom = options.get('bom', False) - def start_stream_writer(output_buffer: BytesIO, - fields: list[dict[str, Any]]): - return writer_factory(output_buffer, fields, bom=bom) + def start_stream_writer(fields: list[dict[str, Any]]): + return writer_factory(fields, bom=bom) def stream_result_page(offs: int, lim: Union[None, int]): return get_action('datastore_search')( @@ -246,18 +242,14 @@ def stream_result_page(offs: int, lim: Union[None, int]): def stream_dump(offset: int, limit: Union[None, int], paginate_by: int, result: dict[str, Any]): - with start_stream_writer(output_buffer, result['fields']) as output: + with start_stream_writer(result['fields']) as writer: while True: if limit is not None and limit <= 0: break records = result['records'] - output.write_records(records) - output_buffer.seek(0) - yield output_buffer.read() - output_buffer.truncate(0) - output_buffer.seek(0) + yield writer.write_records(records) if records_format == 'objects' or records_format == 'lists': if len(records) < paginate_by: @@ -272,8 +264,8 @@ def stream_dump(offset: int, limit: Union[None, int], break result = stream_result_page(offset, limit) - output_buffer.seek(0) - yield output_buffer.read() + + yield writer.end_file() result = stream_result_page(offset, limit) diff --git a/ckanext/datastore/tests/test_dump.py b/ckanext/datastore/tests/test_dump.py index ec1c0416569..bdd1ce3723e 100644 --- a/ckanext/datastore/tests/test_dump.py +++ b/ckanext/datastore/tests/test_dump.py @@ -426,7 +426,7 @@ def test_dump_xml(self, app): res = app.get(f"/datastore/dump/{resource['id']}?limit=1&format=xml") expected_content = ( - u"\n" + u'\n' r'' u"annakarenina" u"tolstoy" diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index 5be9c225b09..acd1ba05b0d 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -1,7 +1,7 @@ # encoding: utf-8 from __future__ import annotations -from io import BytesIO +from io import StringIO, BytesIO from contextlib import contextmanager from typing import Any, Optional @@ -14,113 +14,132 @@ from codecs import BOM_UTF8 +BOM = "\N{bom}" + + @contextmanager -def csv_writer(output: BytesIO, fields: list[dict[str, Any]], - bom: bool = False): +def csv_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 CSV data to file :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' + output = StringIO() if bom: - output.write(BOM_UTF8) + output.write(BOM) csv.writer(output).writerow( # type: ignore - f['id'].encode('utf-8') for f in fields) + f['id'] for f in fields) yield TextWriter(output) @contextmanager -def tsv_writer(output: BytesIO, fields: list[dict[str, Any]], - bom: bool = False): +def tsv_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 TSV data to file :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' + output = StringIO() if bom: - output.write(BOM_UTF8) + output.write(BOM) csv.writer( output, # type: ignore dialect='excel-tab').writerow( - f['id'].encode('utf-8') for f in fields) + f['id'] for f in fields) yield TextWriter(output) class TextWriter(object): 'text in, text out' - def __init__(self, output: BytesIO): + def __init__(self, output: StringIO): self.output = output - def write_records(self, records: list[Any]): + def write_records(self, records: list[Any]) -> bytes: self.output.write(records) # type: ignore + self.output.seek(0) + output = self.output.read().encode('utf-8') + self.output.truncate(0) + self.output.seek(0) + return output + + def end_file(self) -> bytes: + return b'' @contextmanager -def json_writer(output: BytesIO, fields: list[dict[str, Any]], - bom: bool = False): +def json_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 JSON data to file :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' + output = StringIO() if bom: - output.write(BOM_UTF8) + output.write(BOM) + output.write( - b'{\n "fields": %s,\n "records": [' % dumps( - fields, ensure_ascii=False, separators=(',', ':')).encode('utf-8')) + '{\n "fields": %s,\n "records": [' % dumps( + fields, ensure_ascii=False, separators=(',', ':'))) yield JSONWriter(output) - output.write(b'\n]}\n') class JSONWriter(object): - def __init__(self, output: BytesIO): + def __init__(self, output: StringIO): self.output = output self.first = True - def write_records(self, records: list[Any]): + def write_records(self, records: list[Any]) -> bytes: for r in records: if self.first: self.first = False - self.output.write(b'\n ') + self.output.write('\n ') else: - self.output.write(b',\n ') + self.output.write(',\n ') self.output.write(dumps( - r, ensure_ascii=False, separators=(',', ':')) - .encode('utf-8')) + r, ensure_ascii=False, separators=(',', ':'))) + + self.output.seek(0) + output = self.output.read().encode('utf-8') + self.output.truncate(0) + self.output.seek(0) + return output + + def end_file(self) -> bytes: + return b'\n]}\n' @contextmanager -def xml_writer(output: BytesIO, fields: list[dict[str, Any]], - bom: bool = False): +def xml_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 XML data to file :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' + output = BytesIO() if bom: output.write(BOM_UTF8) + output.write( b'\n') yield XMLWriter(output, [f['id'] for f in fields]) - output.write(b'\n') class XMLWriter(object): _key_attr = 'key' _value_tag = 'value' - def __init__(self, output: BytesIO, columns: list[str]): + def __init__(self, output: StringIO, columns: list[str]): self.output = output self.id_col = columns[0] == '_id' if self.id_col: @@ -145,7 +164,7 @@ def _insert_node(self, root: Any, k: str, v: Any, if key_attr is not None: element.attrib[self._key_attr] = str(key_attr) - def write_records(self, records: list[Any]): + def write_records(self, records: list[Any]) -> bytes: for r in records: root = Element('row') if self.id_col: @@ -154,3 +173,12 @@ def write_records(self, records: list[Any]): self._insert_node(root, c, r[c]) ElementTree(root).write(self.output, encoding='utf-8') self.output.write(b'\n') + self.output.seek(0) + output = self.output.read() + self.output.truncate(0) + self.output.seek(0) + return output + + def end_file(self) -> bytes: + return b'\n' + From 38ef3d0681f2335f7cfa14bf1eceb8038c50b42c Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 16 Oct 2023 13:11:25 +0000 Subject: [PATCH 25/26] fix(syntax): lint and typing; - Lint fixes. - Typing fixes. --- ckanext/datastore/writer.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index acd1ba05b0d..f54ba46e912 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -30,7 +30,7 @@ def csv_writer(fields: list[dict[str, Any]], bom: bool = False): if bom: output.write(BOM) - csv.writer(output).writerow( # type: ignore + csv.writer(output).writerow( f['id'] for f in fields) yield TextWriter(output) @@ -49,7 +49,7 @@ def tsv_writer(fields: list[dict[str, Any]], bom: bool = False): output.write(BOM) csv.writer( - output, # type: ignore + output, dialect='excel-tab').writerow( f['id'] for f in fields) yield TextWriter(output) @@ -139,7 +139,7 @@ class XMLWriter(object): _key_attr = 'key' _value_tag = 'value' - def __init__(self, output: StringIO, columns: list[str]): + def __init__(self, output: BytesIO, columns: list[str]): self.output = output self.id_col = columns[0] == '_id' if self.id_col: @@ -181,4 +181,3 @@ def write_records(self, records: list[Any]) -> bytes: def end_file(self) -> bytes: return b'\n' - From 514c6b554408e9b692333962302f71f4142e4b76 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 16 Oct 2023 14:32:04 +0000 Subject: [PATCH 26/26] removal(misc): removed method param; - Removed method param comments. --- ckanext/datastore/writer.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ckanext/datastore/writer.py b/ckanext/datastore/writer.py index f54ba46e912..e37c0b833cb 100644 --- a/ckanext/datastore/writer.py +++ b/ckanext/datastore/writer.py @@ -21,7 +21,6 @@ def csv_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 CSV data to file - :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' @@ -39,7 +38,6 @@ def csv_writer(fields: list[dict[str, Any]], bom: bool = False): def tsv_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 TSV data to file - :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' @@ -76,7 +74,6 @@ def end_file(self) -> bytes: def json_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 JSON data to file - :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file ''' @@ -121,7 +118,6 @@ def end_file(self) -> bytes: def xml_writer(fields: list[dict[str, Any]], bom: bool = False): '''Context manager for writing UTF-8 XML data to file - :param response: file-like object for writing data :param fields: list of datastore fields :param bom: True to include a UTF-8 BOM at the start of the file '''