/
test_helpers.py
123 lines (98 loc) · 4.46 KB
/
test_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# -*- encoding: utf-8 -*-
import pylons
import sqlalchemy.orm as orm
import nose
import ckanext.datastore.helpers as datastore_helpers
import ckanext.datastore.tests.helpers as datastore_test_helpers
import ckanext.datastore.db as db
eq_ = nose.tools.eq_
class TestTypeGetters(object):
def test_get_list(self):
get_list = datastore_helpers.get_list
assert get_list(None) is None
assert get_list([]) == []
assert get_list('') == []
assert get_list('foo') == ['foo']
assert get_list('foo, bar') == ['foo', 'bar']
assert get_list('foo_"bar, baz') == ['foo_"bar', 'baz']
assert get_list('"foo", "bar"') == ['foo', 'bar']
assert get_list(u'foo, bar') == ['foo', 'bar']
assert get_list(['foo', 'bar']) == ['foo', 'bar']
assert get_list([u'foo', u'bar']) == ['foo', 'bar']
assert get_list(['foo', ['bar', 'baz']]) == ['foo', ['bar', 'baz']]
def test_is_single_statement(self):
singles = ['SELECT * FROM footable',
'SELECT * FROM "bartable"',
'SELECT * FROM "bartable";',
'SELECT * FROM "bart;able";',
"select 'foo'||chr(59)||'bar'"]
multiples = ['SELECT * FROM abc; SET LOCAL statement_timeout to'
'SET LOCAL statement_timeout to; SELECT * FROM abc',
'SELECT * FROM "foo"; SELECT * FROM "abc"']
for single in singles:
assert datastore_helpers.is_single_statement(single) is True
for multiple in multiples:
assert datastore_helpers.is_single_statement(multiple) is False
def test_should_fts_index_field_type(self):
indexable_field_types = ['tsvector',
'text',
'number']
non_indexable_field_types = ['nested',
'timestamp',
'date',
'_text',
'text[]']
for indexable in indexable_field_types:
assert datastore_helpers.should_fts_index_field_type(indexable) is True
for non_indexable in non_indexable_field_types:
assert datastore_helpers.should_fts_index_field_type(non_indexable) is False
class TestGetTables(object):
@classmethod
def setup_class(cls):
if not pylons.config.get('ckan.datastore.read_url'):
raise nose.SkipTest('Datastore runs on legacy mode, skipping...')
engine = db.get_write_engine()
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
datastore_test_helpers.clear_db(cls.Session)
create_tables = [
u'CREATE TABLE test_a (id_a text)',
u'CREATE TABLE test_b (id_b text)',
u'CREATE TABLE "TEST_C" (id_c text)',
u'CREATE TABLE test_d ("α/α" integer)',
]
for create_table_sql in create_tables:
cls.Session.execute(create_table_sql)
@classmethod
def teardown_class(cls):
datastore_test_helpers.clear_db(cls.Session)
def test_get_table_names(self):
test_cases = [
(u'SELECT * FROM test_a', ['test_a']),
(u'SELECT * FROM public.test_a', ['test_a']),
(u'SELECT * FROM "TEST_C"', ['TEST_C']),
(u'SELECT * FROM public."TEST_C"', ['TEST_C']),
(u'SELECT * FROM pg_catalog.pg_database', ['pg_database']),
(u'SELECT rolpassword FROM pg_roles', ['pg_authid']),
(u'''SELECT p.rolpassword
FROM pg_roles p
JOIN test_b b
ON p.rolpassword = b.id_b''', ['pg_authid', 'test_b']),
(u'''SELECT id_a, id_b, id_c
FROM (
SELECT *
FROM (
SELECT *
FROM "TEST_C") AS c,
test_b) AS b,
test_a AS a''', ['test_a', 'test_b', 'TEST_C']),
(u'INSERT INTO test_a VALUES (\'a\')', ['test_a']),
(u'SELECT "α/α" FROM test_d', ['test_d']),
(u'SELECT "α/α" FROM test_d WHERE "α/α" > 1000', ['test_d']),
]
context = {
'connection': self.Session.connection()
}
for case in test_cases:
eq_(sorted(datastore_helpers.get_table_names_from_sql(context,
case[0])),
sorted(case[1]))