-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
__init__.py
150 lines (116 loc) · 3.77 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import logging
import json
import jsonschema
from jsonschema import ValidationError
logger = logging.getLogger(__name__)
__all__ = [
'ValidationError',
'BaseQueryRunner',
'InterruptException',
'TYPE_DATETIME',
'TYPE_BOOLEAN',
'TYPE_INTEGER',
'TYPE_STRING',
'TYPE_DATE',
'TYPE_FLOAT',
'SUPPORTED_COLUMN_TYPES',
'register',
'get_query_runner',
'import_query_runners'
]
# Valid types of columns returned in results:
TYPE_INTEGER = 'integer'
TYPE_FLOAT = 'float'
TYPE_BOOLEAN = 'boolean'
TYPE_STRING = 'string'
TYPE_DATETIME = 'datetime'
TYPE_DATE = 'date'
SUPPORTED_COLUMN_TYPES = set([
TYPE_INTEGER,
TYPE_FLOAT,
TYPE_BOOLEAN,
TYPE_STRING,
TYPE_DATETIME,
TYPE_DATE
])
class InterruptException(Exception):
pass
class BaseQueryRunner(object):
def __init__(self, configuration):
jsonschema.validate(configuration, self.configuration_schema())
self.syntax = 'sql'
self.configuration = configuration
@classmethod
def name(cls):
return cls.__name__
@classmethod
def type(cls):
return cls.__name__.lower()
@classmethod
def enabled(cls):
return True
@classmethod
def annotate_query(cls):
return True
@classmethod
def configuration_schema(cls):
return {}
def run_query(self, query):
raise NotImplementedError()
def fetch_columns(self, columns):
column_names = []
duplicates_counter = 1
new_columns = []
for col in columns:
column_name = col[0]
if column_name in column_names:
column_name = "{}{}".format(column_name, duplicates_counter)
duplicates_counter += 1
column_names.append(column_name)
new_columns.append({'name': column_name,
'friendly_name': column_name,
'type': col[1]})
return new_columns
def get_schema(self):
return []
def _run_query_internal(self, query):
results, error = self.run_query(query)
if error is not None:
raise Exception("Failed running query [%s]." % query)
return json.loads(results)['rows']
@classmethod
def to_dict(cls):
return {
'name': cls.name(),
'type': cls.type(),
'configuration_schema': cls.configuration_schema()
}
query_runners = {}
def register(query_runner_class):
global query_runners
if query_runner_class.enabled():
logger.debug("Registering %s (%s) query runner.", query_runner_class.name(), query_runner_class.type())
query_runners[query_runner_class.type()] = query_runner_class
else:
logger.warning("%s query runner enabled but not supported, not registering. Either disable or install missing dependencies.", query_runner_class.name())
def get_query_runner(query_runner_type, configuration_json):
query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None:
return None
return query_runner_class(json.loads(configuration_json))
def validate_configuration(query_runner_type, configuration_json):
query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None:
return False
try:
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False
return True
def import_query_runners(query_runner_imports):
for runner_import in query_runner_imports:
__import__(runner_import)