-
Notifications
You must be signed in to change notification settings - Fork 2k
/
plugin.py
140 lines (119 loc) · 5.13 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
import pylons
import sqlalchemy
from sqlalchemy.exc import ProgrammingError
import ckan.plugins as p
import ckanext.datastore.logic.action as action
import ckanext.datastore.logic.auth as auth
import ckanext.datastore.db as db
import ckan.logic as logic
log = logging.getLogger(__name__)
_get_or_bust = logic.get_or_bust
class DatastoreException(Exception):
pass
class DatastorePlugin(p.SingletonPlugin):
'''
Datastore plugin.
'''
p.implements(p.IConfigurable, inherit=True)
p.implements(p.IActions)
p.implements(p.IAuthFunctions)
def configure(self, config):
self.config = config
# check for ckan.datastore_write_url
if (not 'ckan.datastore_write_url' in config):
error_msg = 'ckan.datastore_write_url not found in config'
raise DatastoreException(error_msg)
## Do light wrapping around action function to add datastore_active
## to resource dict. Not using IAction extension as this prevents other plugins
## from having a custom resource_read.
if not config['debug']:
self._check_separate_db()
self._check_read_permissions()
# Make sure actions are cached
resource_show = p.toolkit.get_action('resource_show')
# TODO: move to package.py or better: have a think about it
def new_resource_show(context, data_dict):
engine = db._get_engine(
context,
{'connection_url': config['ckan.datastore_write_url']}
)
new_data_dict = resource_show(context, data_dict)
try:
connection = engine.connect()
result = connection.execute(
'select 1 from pg_tables where tablename = %s',
new_data_dict['id']
).fetchone()
if result:
new_data_dict['datastore_active'] = True
else:
new_data_dict['datastore_active'] = False
finally:
connection.close()
return new_data_dict
## Make sure do not run many times if configure is called repeatedly
## as in tests.
if not hasattr(resource_show, '_datastore_wrapped'):
new_resource_show._datastore_wrapped = True
logic._actions['resource_show'] = new_resource_show
def _check_separate_db(self):
'''
Make sure the datastore is on a separate db. Otherwise one could access
all internal tables via the api.
'''
ckan_url = pylons.config['sqlalchemy.url']
write_url = pylons.config['ckan.datastore_write_url']
read_url = pylons.config['ckan.datastore_read_url']
if write_url == read_url:
raise Exception("The write and read-only database connection url are the same.")
if self._get_db_from_url(ckan_url) == self._get_db_from_url(read_url):
raise Exception("The CKAN and datastore database are the same.")
def _get_db_from_url(self, url):
return url[url.rindex("@"):]
def _check_read_permissions(self):
'''
Check whether the right permissions are set for the read only user.
A table is created by the write user to test the read only user.
'''
write_url = pylons.config['ckan.datastore_write_url']
read_url = pylons.config['ckan.datastore_read_url']
write_connection = db._get_engine(None,
{'connection_url': write_url}).connect()
write_connection.execute("CREATE TABLE public.foo (id INTEGER NOT NULL, name VARCHAR)")
read_connection = db._get_engine(None,
{'connection_url': read_url}).connect()
read_trans = read_connection.begin()
statements = [
"CREATE TABLE public.bar (id INTEGER NOT NULL, name VARCHAR)",
"INSERT INTO public.foo VALUES (1, 'okfn')"
]
try:
for sql in statements:
read_trans = read_connection.begin()
try:
read_connection.execute(sql)
except ProgrammingError, e:
if 'permission denied' not in str(e):
raise
else:
log.info("Connection url {}"
.format(read_url))
raise Exception("We have write permissions on the read-only database.")
finally:
read_trans.rollback()
except Exception:
raise
finally:
write_connection.execute("DROP TABLE foo")
def get_actions(self):
available_actions = {'datastore_create': action.datastore_create,
'datastore_delete': action.datastore_delete,
'datastore_search': action.datastore_search}
if 'ckan.datastore_read_url' in self.config:
available_actions['data_search_sql'] = action.data_search_sql
return available_actions
def get_auth_functions(self):
return {'datastore_create': auth.datastore_create,
'datastore_delete': auth.datastore_delete,
'datastore_search': auth.datastore_search}