From ee0cb2dae07281430b784880284999b9c806f7e7 Mon Sep 17 00:00:00 2001 From: kindly Date: Wed, 25 Jul 2012 00:38:24 +0100 Subject: [PATCH] [2733] add engines and basic transaction scaffolding for create --- ckanext/datastore/db.py | 76 +++++++++++++++++++++++- ckanext/datastore/logic/action/create.py | 17 +++--- 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/ckanext/datastore/db.py b/ckanext/datastore/db.py index 19845d05361..4951987e660 100644 --- a/ckanext/datastore/db.py +++ b/ckanext/datastore/db.py @@ -1,6 +1,57 @@ +import sqlalchemy +from pylons import config +_pg_types = {} +_type_names = set() +_engines = {} -def create(resource_id, fields, rows): +def _get_engine(context, data_dict): + ''' Get either read or write engine''' + connection_type = data_dict.get('connection_type', 'write') + engine = _engines.get(connection_type) + + if not engine: + config_option = 'ckan.datastore_{}_url'.format(connection_type) + url = config.get(config_option) + assert url, 'Config option ' + config_option + ' not defined' + engine = sqlalchemy.create_engine(url) + _engines[connection_type] = engine + return engine + +def _cache_types(context, data_dict=None): + if not _pg_types: + connection = context['connection'] + results = connection.execute( + 'select oid, typname from pg_type;' + ) + for result in results: + _pg_types[result[0]] = result[1] + _type_names.add(result[1]) + +def _get_type(context, oid): + _cache_types(context) + return _pg_types[oid] + +def check_fields(context, fields): + _cache_types(context) + ## check if fieds are in in _type_names + pass + +def create_table(context, data_dict): + '''create table from combination of fields and first row of data''' + check_fields(context, data_dict.get('fields')) + pass + +def alter_table(context, data_dict): + '''alter table from combination of fields and first row of data''' + check_fields(context, data_dict.get('fields')) + pass + +def insert_data(context, data_dict): + '''insert all data from records''' + pass + +def create(context, data_dict): ''' The first row will be used to guess types not in the fields and the guessed types will be added to the headers permanently. @@ -22,4 +73,25 @@ def create(resource_id, fields, rows): Any error results in total failure! For now pass back the actual error. Should be transactional. ''' - pass + + engine = _get_engine(context, {'connection_type': 'write'}) + context['connection'] = engine.connect() + ## close connection at all cost. + try: + ## check if table already existes + trans = context['connection'].begin() + result = context['connection'].execute( + 'select * from pg_tables where tablename = %s', + data_dict['resource_id'] + ).fetchone() + if not result: + create_table(context, data_dict) + else: + alter_table(context, data_dict) + insert_data(context, data_dict) + except: + trans.rollback() + raise + finally: + context['connection'].close() + diff --git a/ckanext/datastore/logic/action/create.py b/ckanext/datastore/logic/action/create.py index c7e02d70237..ca0321bf8fb 100644 --- a/ckanext/datastore/logic/action/create.py +++ b/ckanext/datastore/logic/action/create.py @@ -30,14 +30,17 @@ def datastore_create(context, data_dict): model = _get_or_bust(context, 'model') resource_id = _get_or_bust(data_dict, 'resource_id') fields = data_dict.get('fields') - records = _get_or_bust(data_dict, 'records') + records = data_dict.get('records') _check_access('datastore_create', context, data_dict) - schema = ckanext.datastore.logic.schema.default_datastore_create_schema() - data, errors = _validate(data_dict, schema, context) - if errors: - model.Session.rollback() - raise p.toolkit.ValidationError(errors) + # Not sure need schema as will be too dificulut to make + # as records could be deeply nested.. - return db.create(resource_id, fields, records) + #schema = ckanext.datastore.logic.schema.default_datastore_create_schema() + #data, errors = _validate(data_dict, schema, context) + #if errors: + # model.Session.rollback() + # raise p.toolkit.ValidationError(errors) + + return db.create(context, data_dict)