Skip to content

Commit

Permalink
Merge pull request #8014 from ckan/7971-data-dictionary-form
Browse files Browse the repository at this point in the history
IDataDictionaryForm for extending and validating datastore data dictionary fields
  • Loading branch information
amercader committed Feb 7, 2024
2 parents f255092 + 95556e3 commit f2aa96e
Show file tree
Hide file tree
Showing 22 changed files with 817 additions and 84 deletions.
8 changes: 8 additions & 0 deletions changes/7971.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
IDataDictionaryForm for extending and validating new keys in the `fields`
dicts. Unlike the `info` free-form dict these new keys are possible to
tightly control with a schema. The schema is built by combining schemas
from from all plugins implementing this interface so plugins implementing
different features may all contribute to the same schema.

The underlying storage for data dictionary fields has changed. Use:
`ckan datastore upgrade` after upgrading to this release.
1 change: 1 addition & 0 deletions ckan/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Context(TypedDict, total=False):
with_capacity: bool

table_names: list[str]
plugin_data: dict[Any, Any]


class AuthResult(TypedDict, total=False):
Expand Down
9 changes: 8 additions & 1 deletion ckanext/datastore/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ def configure(self, config: CKANConfig):

return config

def create(self, context: Context, data_dict: dict[str, Any]) -> Any:
def create(
self,
context: Context,
data_dict: dict[str, Any],
plugin_data: dict[int, dict[str, Any]]) -> Any:
"""Create new resourct inside datastore.
Called by `datastore_create`.
Expand Down Expand Up @@ -226,3 +230,6 @@ def drop_function(self, *args: Any, **kwargs: Any) -> Any:
"""Called by `datastore_function_delete` action.
"""
raise NotImplementedError()

def resource_plugin_data(self, resource_id: str) -> dict[str, Any]:
raise NotImplementedError()
110 changes: 82 additions & 28 deletions ckanext/datastore/backend/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,24 +277,51 @@ def _get_unique_key(context: Context, data_dict: dict[str, Any]) -> list[str]:
return [x[0] for x in key_parts]


def _get_field_info(connection: Any, resource_id: str) -> dict[str, Any]:
u'''return a dictionary mapping column names to their info data,
def _get_field_info(
connection: Any,
resource_id: str,
) -> dict[str, Any]:
'''return a dictionary mapping column names to their info data,
when present'''
qtext = sa.text(u'''
select pa.attname as name, pd.description as info
qtext = sa.text(
'''select pa.attname as name, pd.description::json -> '_info' as info
from pg_class pc, pg_attribute pa, pg_description pd
where pa.attrelid = pc.oid and pd.objoid = pc.oid
and pd.objsubid = pa.attnum and pc.relname = :res_id
and pa.attnum > 0
''')
and pa.attnum > 0'''
)
try:
return dict(
(n, json.loads(v)) for (n, v) in
connection.execute(qtext, {"res_id": resource_id}).fetchall())
except ValueError: # don't die on non-json comments
except (TypeError, ValueError): # don't die on non-json comments
return {}


def _get_raw_field_info(
connection: Any,
resource_id: str,
) -> tuple[dict[str, Any], bool]:
'''return a dictionary mapping column names to their raw info data,
when present and a flag if old data schema is present (for upgrade)'''
qtext = sa.text(
'''select pa.attname as name, pd.description as info,
substring(pd.description for 1) = '{' as old_schema
from pg_class pc, pg_attribute pa, pg_description pd
where pa.attrelid = pc.oid and pd.objoid = pc.oid
and pd.objsubid = pa.attnum and pc.relname = :res_id
and pa.attnum > 0'''
)
try:
results = list(connection.execute(
qtext, {"res_id": resource_id}).fetchall())
return {
n: json.loads(v) for n, v, _old in results
}, any(old for _n, _v, old in results)
except (TypeError, ValueError): # don't die on non-json comments
return {}, False


def _get_fields(connection: Any, resource_id: str):
u'''
return a list of {'id': column_name, 'type': column_type} dicts
Expand Down Expand Up @@ -965,7 +992,10 @@ def create_indexes(context: Context, data_dict: dict[str, Any]):
connection.execute(sa.text(sql_index_string))


def create_table(context: Context, data_dict: dict[str, Any]):
def create_table(
context: Context,
data_dict: dict[str, Any],
plugin_data: dict[int, dict[str, Any]]):
'''Creates table, columns and column info (stored as comments).
:param resource_id: The resource ID (i.e. postgres table name)
Expand Down Expand Up @@ -1047,20 +1077,28 @@ def create_table(context: Context, data_dict: dict[str, Any]):
)

info_sql = []
for f in supplied_fields:
for i, f in enumerate(supplied_fields):
column_comment = plugin_data.get(i, {})
info = f.get(u'info')
if isinstance(info, dict):
column_comment['_info'] = info
if column_comment:
info_sql.append(u'COMMENT ON COLUMN {0}.{1} is {2}'.format(
identifier(data_dict['resource_id']),
identifier(f['id']),
literal_string(
json.dumps(info, ensure_ascii=False))))
literal_string(' ' + json.dumps( # ' ' prefix for data version
column_comment, ensure_ascii=False, separators=(',', ':')))
))

context['connection'].execute(sa.text(
sql_string + u';'.join(info_sql)))
sql_string + u';'.join(info_sql).replace(':', r'\:') # no bind params
))


def alter_table(context: Context, data_dict: dict[str, Any]):
def alter_table(
context: Context,
data_dict: dict[str, Any],
plugin_data: dict[int, dict[str, Any]]):
'''Add/remove columns and updates column info (stored as comments).
:param resource_id: The resource ID (i.e. postgres table name)
Expand Down Expand Up @@ -1127,18 +1165,27 @@ def alter_table(context: Context, data_dict: dict[str, Any]):
identifier(f['id']),
f['type']))

for f in supplied_fields:
if u'info' in f:
info = f.get(u'info')
if isinstance(info, dict):
info_sql = literal_string(
json.dumps(info, ensure_ascii=False))
else:
info_sql = 'NULL'
if plugin_data or any('info' in f for f in supplied_fields):
raw_field_info, _old = _get_raw_field_info(
context['connection'],
data_dict['resource_id'],
)

for i, f in enumerate(supplied_fields):
raw = raw_field_info.get(f['id'], {})

if 'info' in f and isinstance(f['info'], dict):
raw['_info'] = f['info']
if i in plugin_data:
raw.update(plugin_data[i])

# ' ' prefix for data version
column_comment = literal_string(' ' + json.dumps(
raw, ensure_ascii=False, separators=(',', ':')))
alter_sql.append(u'COMMENT ON COLUMN {0}.{1} is {2}'.format(
identifier(data_dict['resource_id']),
identifier(f['id']),
info_sql))
column_comment))

if data_dict['delete_fields']:
for id_ in current_ids - field_ids - set(f['id'] for f in new_fields):
Expand All @@ -1148,7 +1195,7 @@ def alter_table(context: Context, data_dict: dict[str, Any]):

if alter_sql:
context['connection'].execute(sa.text(
';'.join(alter_sql)
';'.join(alter_sql).replace(':', r'\:') # no bind params
))


Expand Down Expand Up @@ -2069,7 +2116,11 @@ def delete(self, context: Context, data_dict: dict[str, Any]):

return _unrename_json_field(data_dict)

def create(self, context: Context, data_dict: dict[str, Any]):
def create(
self,
context: Context,
data_dict: dict[str, Any],
plugin_data: dict[int, dict[str, Any]]):
'''
The first row will be used to guess types not in the fields and the
guessed types will be added to the headers permanently.
Expand Down Expand Up @@ -2103,12 +2154,12 @@ def create(self, context: Context, data_dict: dict[str, Any]):
'SELECT * FROM pg_tables WHERE tablename = :table'
), {"table": data_dict['resource_id']}).fetchone()
if not result:
create_table(context, data_dict)
create_table(context, data_dict, plugin_data)
_create_fulltext_trigger(
context['connection'],
data_dict['resource_id'])
else:
alter_table(context, data_dict)
alter_table(context, data_dict, plugin_data)
if 'triggers' in data_dict:
_create_triggers(
context['connection'],
Expand Down Expand Up @@ -2187,8 +2238,11 @@ def resource_id_from_alias(self, alias: str) -> tuple[bool, Optional[str]]:
real_id = row[0]
return res_exists, real_id

# def resource_info(self, id):
# pass
def resource_plugin_data(self, id: str) -> dict[str, Any]:
engine = self._get_read_engine()
with engine.connect() as conn:
plugin_data, _old = _get_raw_field_info(conn, id)
return plugin_data

def resource_fields(self, id: str) -> dict[str, Any]:

Expand Down
70 changes: 45 additions & 25 deletions ckanext/datastore/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from ckan.plugins.toolkit import (
ObjectNotFound, NotAuthorized, get_action, get_validator, _, request,
abort, render, g, h
abort, render, g, h, ValidationError
)
from ckan.types import Schema, ValidatorFactory
from ckanext.datastore.logic.schema import (
Expand Down Expand Up @@ -137,14 +137,9 @@ class DictionaryView(MethodView):
def _prepare(self, id: str, resource_id: str) -> dict[str, Any]:
try:
# resource_edit_base template uses these
pkg_dict = get_action(u'package_show')({}, {u'id': id})
resource = get_action(u'resource_show')({}, {u'id': resource_id})
rec = get_action(u'datastore_search')(
{}, {
u'resource_id': resource_id,
u'limit': 0
}
)
pkg_dict = get_action(u'package_show')({}, {'id': id})
resource = get_action(u'resource_show')({}, {'id': resource_id})
rec = get_action(u'datastore_info')({}, {'id': resource_id})
return {
u'pkg_dict': pkg_dict,
u'resource': resource,
Expand All @@ -156,16 +151,25 @@ def _prepare(self, id: str, resource_id: str) -> dict[str, Any]:
except (ObjectNotFound, NotAuthorized):
abort(404, _(u'Resource not found'))

def get(self, id: str, resource_id: str):
def get(self,
id: str,
resource_id: str,
data: Optional[dict[str, Any]] = None,
errors: Optional[dict[str, Any]] = None,
error_summary: Optional[dict[str, Any]] = None,
):
u'''Data dictionary view: show field labels and descriptions'''

data_dict = self._prepare(id, resource_id)
template_vars = self._prepare(id, resource_id)
template_vars['data'] = data or {}
template_vars['errors'] = errors or {}
template_vars['error_summary'] = error_summary

# global variables for backward compatibility
g.pkg_dict = data_dict[u'pkg_dict']
g.resource = data_dict[u'resource']
g.pkg_dict = template_vars['pkg_dict']
g.resource = template_vars['resource']

return render(u'datastore/dictionary.html', data_dict)
return render('datastore/dictionary.html', template_vars)

def post(self, id: str, resource_id: str):
u'''Data dictionary view: edit field labels and descriptions'''
Expand All @@ -176,18 +180,34 @@ def post(self, id: str, resource_id: str):
if not isinstance(info, list):
info = []
info = info[:len(fields)]
custom = data.get('fields')
if not isinstance(custom, list):
custom = []

get_action(u'datastore_create')(
{}, {
u'resource_id': resource_id,
u'force': True,
u'fields': [{
u'id': f[u'id'],
u'type': f[u'type'],
u'info': fi if isinstance(fi, dict) else {}
} for f, fi in zip_longest(fields, info)]
}
)
try:
get_action('datastore_create')(
{}, {
'resource_id': resource_id,
'force': True,
'fields': [dict(
cu or {},
id=f['id'],
type=f['type'],
info=fi if isinstance(fi, dict) else {}
) for f, fi, cu in zip_longest(fields, info, custom)]
}
)
except ValidationError as e:
errors = e.error_dict
# flatten field errors for summary
error_summary = {}
field_errors = errors.get('fields', [])
if isinstance(field_errors, list):
for i, f in enumerate(field_errors, 1):
if isinstance(f, dict) and f:
error_summary[_('Field %d') % i] = ', '.join(
v for vals in f.values() for v in vals)
return self.get(id, resource_id, data, errors, error_summary)

h.flash_success(
_(
Expand Down

0 comments on commit f2aa96e

Please sign in to comment.