Skip to content

Commit

Permalink
[#3428] replace db._get_engine with db.get_(read|write)_engine
Browse files Browse the repository at this point in the history
  • Loading branch information
wardi committed Feb 28, 2017
1 parent 77417d0 commit 9fc4507
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 75 deletions.
40 changes: 21 additions & 19 deletions ckanext/datastore/db.py
Expand Up @@ -10,6 +10,7 @@
import hashlib

import pylons
from pylons import config
import distutils.version
import sqlalchemy
from sqlalchemy.exc import (ProgrammingError, IntegrityError,
Expand Down Expand Up @@ -96,9 +97,16 @@ def _is_valid_table_name(name):
return _is_valid_field_name(name)


def _get_engine(data_dict):
def get_read_engine():
return _get_engine_from_url(config['ckan.datastore.read_url'])


def get_write_engine():
return _get_engine_from_url(config['ckan.datastore.write_url'])


def _get_engine_from_url(connection_url):
'''Get either read or write engine.'''
connection_url = data_dict['connection_url']
engine = _engines.get(connection_url)

if not engine:
Expand Down Expand Up @@ -126,9 +134,7 @@ def _cache_types(context):
log.info("Create nested type. Native JSON: {0!r}".format(
native_json))

data_dict = {
'connection_url': pylons.config['ckan.datastore.write_url']}
engine = _get_engine(data_dict)
engine = get_write_engine()
with engine.begin() as connection:
connection.execute(
'CREATE TYPE "nested" AS (json {0}, extra text)'.format(
Expand Down Expand Up @@ -962,7 +968,6 @@ def validate(context, data_dict):
fields_types)

# Remove default elements in data_dict
del data_dict_copy['connection_url']
del data_dict_copy['resource_id']
data_dict_copy.pop('id', None)

Expand Down Expand Up @@ -1101,7 +1106,7 @@ def create(context, data_dict):
:raises InvalidDataError: if there is an invalid value in the given data
'''
engine = _get_engine(data_dict)
engine = get_write_engine()
context['connection'] = engine.connect()
timeout = context.get('query_timeout', _TIMEOUT)
_cache_types(context)
Expand Down Expand Up @@ -1166,7 +1171,7 @@ def upsert(context, data_dict):
Any error results in total failure! For now pass back the actual error.
Should be transactional.
'''
engine = _get_engine(data_dict)
engine = get_write_engine()
context['connection'] = engine.connect()
timeout = context.get('query_timeout', _TIMEOUT)

Expand Down Expand Up @@ -1209,7 +1214,7 @@ def upsert(context, data_dict):


def delete(context, data_dict):
engine = _get_engine(data_dict)
engine = get_write_engine()
context['connection'] = engine.connect()
_cache_types(context)

Expand All @@ -1233,7 +1238,7 @@ def delete(context, data_dict):


def search(context, data_dict):
engine = _get_engine(data_dict)
engine = get_read_engine()
context['connection'] = engine.connect()
timeout = context.get('query_timeout', _TIMEOUT)
_cache_types(context)
Expand All @@ -1260,7 +1265,7 @@ def search(context, data_dict):


def search_sql(context, data_dict):
engine = _get_engine(data_dict)
engine = get_read_engine()
context['connection'] = engine.connect()
timeout = context.get('query_timeout', _TIMEOUT)
_cache_types(context)
Expand Down Expand Up @@ -1352,7 +1357,7 @@ def _change_privilege(context, data_dict, what):

def make_private(context, data_dict):
log.info('Making resource {resource_id!r} private'.format(**data_dict))
engine = _get_engine(data_dict)
engine = get_write_engine()
context['connection'] = engine.connect()
trans = context['connection'].begin()
try:
Expand All @@ -1364,7 +1369,7 @@ def make_private(context, data_dict):

def make_public(context, data_dict):
log.info('Making resource {resource_id!r} public'.format(**data_dict))
engine = _get_engine(data_dict)
engine = get_write_engine()
context['connection'] = engine.connect()
trans = context['connection'].begin()
try:
Expand All @@ -1375,12 +1380,9 @@ def make_public(context, data_dict):


def get_all_resources_ids_in_datastore():
read_url = pylons.config.get('ckan.datastore.read_url')
write_url = pylons.config.get('ckan.datastore.write_url')
data_dict = {
'connection_url': read_url or write_url
}
read_url = config.get('ckan.datastore.read_url')
write_url = config.get('ckan.datastore.write_url')
resources_sql = sqlalchemy.text(u'''SELECT name FROM "_table_metadata"
WHERE alias_of IS NULL''')
query = _get_engine(data_dict).execute(resources_sql)
query = get_read_engine().execute(resources_sql)
return [q[0] for q in query.fetchall()]
37 changes: 10 additions & 27 deletions ckanext/datastore/logic/action.py
Expand Up @@ -120,8 +120,6 @@ def datastore_create(context, data_dict):
resource_id = data_dict['resource_id']
_check_read_only(context, resource_id)

data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']

# validate aliases
aliases = datastore_helpers.get_list(data_dict.get('aliases', []))
for alias in aliases:
Expand Down Expand Up @@ -153,7 +151,6 @@ def datastore_create(context, data_dict):

result.pop('id', None)
result.pop('private', None)
result.pop('connection_url')
return result


Expand Down Expand Up @@ -207,12 +204,10 @@ def datastore_upsert(context, data_dict):
resource_id = data_dict['resource_id']
_check_read_only(context, resource_id)

data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']

res_id = data_dict['resource_id']
resources_sql = sqlalchemy.text(u'''SELECT 1 FROM "_table_metadata"
WHERE name = :id AND alias_of IS NULL''')
results = db._get_engine(data_dict).execute(resources_sql, id=res_id)
results = db.get_write_engine().execute(resources_sql, id=res_id)
res_exists = results.rowcount > 0

if not res_exists:
Expand All @@ -222,7 +217,6 @@ def datastore_upsert(context, data_dict):

result = db.upsert(context, data_dict)
result.pop('id', None)
result.pop('connection_url')
return result


Expand All @@ -249,11 +243,9 @@ def _type_lookup(t):
resource_id = _get_or_bust(data_dict, 'id')
resource = p.toolkit.get_action('resource_show')(context, {'id':resource_id})

data_dict['connection_url'] = pylons.config['ckan.datastore.read_url']

resources_sql = sqlalchemy.text(u'''SELECT 1 FROM "_table_metadata"
WHERE name = :id AND alias_of IS NULL''')
results = db._get_engine(data_dict).execute(resources_sql, id=resource_id)
results = db.get_read_engine().execute(resources_sql, id=resource_id)
res_exists = results.rowcount > 0
if not res_exists:
raise p.toolkit.ObjectNotFound(p.toolkit._(
Expand All @@ -269,7 +261,7 @@ def _type_lookup(t):
SELECT column_name, data_type
FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = :resource_id;
''')
schema_results = db._get_engine(data_dict).execute(schema_sql, resource_id=resource_id)
schema_results = db.get_read_engine().execute(schema_sql, resource_id=resource_id)
for row in schema_results.fetchall():
k = row[0]
v = row[1]
Expand All @@ -282,7 +274,7 @@ def _type_lookup(t):
meta_sql = sqlalchemy.text(u'''
SELECT count(_id) FROM "{0}";
'''.format(resource_id))
meta_results = db._get_engine(data_dict).execute(meta_sql, resource_id=resource_id)
meta_results = db.get_read_engine().execute(meta_sql, resource_id=resource_id)
info['meta']['count'] = meta_results.fetchone()[0]
finally:
if schema_results:
Expand Down Expand Up @@ -334,12 +326,10 @@ def datastore_delete(context, data_dict):
resource_id = data_dict['resource_id']
_check_read_only(context, resource_id)

data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']

res_id = data_dict['resource_id']
resources_sql = sqlalchemy.text(u'''SELECT 1 FROM "_table_metadata"
WHERE name = :id AND alias_of IS NULL''')
results = db._get_engine(data_dict).execute(resources_sql, id=res_id)
results = db.get_read_engine().execute(resources_sql, id=res_id)
res_exists = results.rowcount > 0

if not res_exists:
Expand All @@ -363,7 +353,6 @@ def datastore_delete(context, data_dict):
'datastore_active': False})

result.pop('id', None)
result.pop('connection_url')
return result


Expand Down Expand Up @@ -433,11 +422,13 @@ def datastore_search(context, data_dict):
raise p.toolkit.ValidationError(errors)

res_id = data_dict['resource_id']
data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']

resources_sql = sqlalchemy.text(u'''SELECT alias_of FROM "_table_metadata"
WHERE name = :id''')
results = db._get_engine(data_dict).execute(resources_sql, id=res_id)
# XXX: write connection because of private tables, we
# should be able to make this read once we stop using pg
# permissions enforcement
results = db.get_write_engine().execute(resources_sql, id=res_id)

# Resource only has to exist in the datastore (because it could be an alias)
if not results.rowcount > 0:
Expand All @@ -455,7 +446,6 @@ def datastore_search(context, data_dict):

result = db.search(context, data_dict)
result.pop('id', None)
result.pop('connection_url')
return result


Expand Down Expand Up @@ -497,11 +487,8 @@ def datastore_search_sql(context, data_dict):

p.toolkit.check_access('datastore_search_sql', context, data_dict)

data_dict['connection_url'] = pylons.config['ckan.datastore.read_url']

result = db.search_sql(context, data_dict)
result.pop('id', None)
result.pop('connection_url')
return result


Expand All @@ -520,8 +507,6 @@ def datastore_make_private(context, data_dict):
data_dict['resource_id'] = data_dict['id']
res_id = _get_or_bust(data_dict, 'resource_id')

data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']

if not _resource_exists(context, data_dict):
raise p.toolkit.ObjectNotFound(p.toolkit._(
u'Resource "{0}" was not found.'.format(res_id)
Expand All @@ -546,8 +531,6 @@ def datastore_make_public(context, data_dict):
data_dict['resource_id'] = data_dict['id']
res_id = _get_or_bust(data_dict, 'resource_id')

data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']

if not _resource_exists(context, data_dict):
raise p.toolkit.ObjectNotFound(p.toolkit._(
u'Resource "{0}" was not found.'.format(res_id)
Expand All @@ -567,7 +550,7 @@ def _resource_exists(context, data_dict):

resources_sql = sqlalchemy.text(u'''SELECT 1 FROM "_table_metadata"
WHERE name = :id AND alias_of IS NULL''')
results = db._get_engine(data_dict).execute(resources_sql, id=res_id)
results = db.get_read_engine().execute(resources_sql, id=res_id)
return results.rowcount > 0


Expand Down
13 changes: 4 additions & 9 deletions ckanext/datastore/plugin.py
Expand Up @@ -31,9 +31,7 @@ def _is_legacy_mode(config):
Returns True if `ckan.datastore.read_url` is not set in the provided
config object or CKAN is running on Postgres < 9.x
'''
write_url = config.get('ckan.datastore.write_url')

engine = db._get_engine({'connection_url': write_url})
engine = db.get_write_engine()
connection = engine.connect()

return (not config.get('ckan.datastore.read_url') or
Expand Down Expand Up @@ -104,8 +102,7 @@ def configure(self, config):
else:
self.read_url = self.config['ckan.datastore.read_url']

self.read_engine = db._get_engine(
{'connection_url': self.read_url})
self.read_engine = db.get_read_engine()
if not model.engine_is_pg(self.read_engine):
log.warn('We detected that you do not use a PostgreSQL '
'database. The DataStore will NOT work and DataStore '
Expand Down Expand Up @@ -133,7 +130,6 @@ def notify(self, entity, operation=None):
for resource in entity.resources:
try:
func(context, {
'connection_url': self.write_url,
'resource_id': resource.id})
except p.toolkit.ObjectNotFound:
pass
Expand Down Expand Up @@ -167,7 +163,7 @@ def _is_read_only_database(self):
''' Returns True if no connection has CREATE privileges on the public
schema. This is the case if replication is enabled.'''
for url in [self.ckan_url, self.write_url, self.read_url]:
connection = db._get_engine({'connection_url': url}).connect()
connection = db._get_engine_from_url(url).connect()
try:
sql = u"SELECT has_schema_privilege('public', 'CREATE')"
is_writable = connection.execute(sql).first()[0]
Expand All @@ -193,8 +189,7 @@ def _read_connection_has_correct_privileges(self):
only user. A table is created by the write user to test the
read only user.
'''
write_connection = db._get_engine(
{'connection_url': self.write_url}).connect()
write_connection = db.get_write_engine().connect()
read_connection_user = sa_url.make_url(self.read_url).username

drop_foo_sql = u'DROP TABLE IF EXISTS _foo'
Expand Down
6 changes: 2 additions & 4 deletions ckanext/datastore/tests/test_create.py
Expand Up @@ -184,8 +184,7 @@ def _get_index_names(self, resource_id):
return [result[0] for result in results]

def _execute_sql(self, sql, *args):
engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']})
engine = db.get_write_engine()
session = orm.scoped_session(orm.sessionmaker(bind=engine))
return session.connection().execute(sql, *args)

Expand Down Expand Up @@ -246,8 +245,7 @@ def setup_class(cls):
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']})
engine = db.get_write_engine()
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
Expand Down
3 changes: 1 addition & 2 deletions ckanext/datastore/tests/test_delete.py
Expand Up @@ -40,8 +40,7 @@ def setup_class(cls):
'rating with %': '42%'}]
}

engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']})
engine = db.get_write_engine()
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
Expand Down
3 changes: 1 addition & 2 deletions ckanext/datastore/tests/test_dump.py
Expand Up @@ -53,8 +53,7 @@ def setup_class(cls):
res_dict = json.loads(res.body)
assert res_dict['success'] is True

engine = db._get_engine({
'connection_url': config['ckan.datastore.write_url']})
engine = db.get_write_engine()
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))

@classmethod
Expand Down
4 changes: 1 addition & 3 deletions ckanext/datastore/tests/test_helpers.py
Expand Up @@ -70,9 +70,7 @@ def setup_class(cls):
if not pylons.config.get('ckan.datastore.read_url'):
raise nose.SkipTest('Datastore runs on legacy mode, skipping...')

engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']}
)
engine = db.get_write_engine()
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))

datastore_test_helpers.clear_db(cls.Session)
Expand Down

0 comments on commit 9fc4507

Please sign in to comment.