-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
cass.py
199 lines (167 loc) · 6.48 KB
/
cass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import logging
import os
import ssl
from base64 import b64decode
from tempfile import NamedTemporaryFile
from redash.query_runner import BaseQueryRunner, register
from redash.utils import JSONEncoder, json_dumps, json_loads
logger = logging.getLogger(__name__)
try:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.util import sortedset
enabled = True
except ImportError:
enabled = False
def generate_ssl_options_dict(protocol, cert_path=None):
ssl_options = {
'ssl_version': getattr(ssl, protocol)
}
if cert_path is not None:
ssl_options['ca_certs'] = cert_path
ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
return ssl_options
class CassandraJSONEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, sortedset):
return list(o)
return super(CassandraJSONEncoder, self).default(o)
class Cassandra(BaseQueryRunner):
noop_query = "SELECT dateof(now()) FROM system.local"
@classmethod
def enabled(cls):
return enabled
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"host": {"type": "string"},
"port": {"type": "number", "default": 9042},
"keyspace": {"type": "string", "title": "Keyspace name"},
"username": {"type": "string", "title": "Username"},
"password": {"type": "string", "title": "Password"},
"protocol": {
"type": "number",
"title": "Protocol Version",
"default": 3,
},
"timeout": {"type": "number", "title": "Timeout", "default": 10},
"useSsl": {"type": "boolean", "title": "Use SSL", "default": False},
"sslCertificateFile": {
"type": "string",
"title": "SSL Certificate File"
},
"sslProtocol": {
"type": "string",
"title": "SSL Protocol",
"enum": [
"PROTOCOL_SSLv23",
"PROTOCOL_TLS",
"PROTOCOL_TLS_CLIENT",
"PROTOCOL_TLS_SERVER",
"PROTOCOL_TLSv1",
"PROTOCOL_TLSv1_1",
"PROTOCOL_TLSv1_2",
],
},
},
"required": ["keyspace", "host", "useSsl"],
"secret": ["sslCertificateFile"],
}
@classmethod
def type(cls):
return "Cassandra"
def get_schema(self, get_stats=False):
query = """
select release_version from system.local;
"""
results, error = self.run_query(query, None)
results = json_loads(results)
release_version = results["rows"][0]["release_version"]
query = """
SELECT table_name, column_name
FROM system_schema.columns
WHERE keyspace_name ='{}';
""".format(
self.configuration["keyspace"]
)
if release_version.startswith("2"):
query = """
SELECT columnfamily_name AS table_name, column_name
FROM system.schema_columns
WHERE keyspace_name ='{}';
""".format(
self.configuration["keyspace"]
)
results, error = self.run_query(query, None)
results = json_loads(results)
schema = {}
for row in results["rows"]:
table_name = row["table_name"]
column_name = row["column_name"]
if table_name not in schema:
schema[table_name] = {"name": table_name, "columns": []}
schema[table_name]["columns"].append(column_name)
return list(schema.values())
def run_query(self, query, user):
connection = None
cert_path = self._generate_cert_file()
if self.configuration.get("username", "") and self.configuration.get(
"password", ""
):
auth_provider = PlainTextAuthProvider(
username="{}".format(self.configuration.get("username", "")),
password="{}".format(self.configuration.get("password", "")),
)
connection = Cluster(
[self.configuration.get("host", "")],
auth_provider=auth_provider,
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
ssl_options=self._get_ssl_options(cert_path),
)
else:
connection = Cluster(
[self.configuration.get("host", "")],
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
ssl_options=self._get_ssl_options(cert_path),
)
session = connection.connect()
session.set_keyspace(self.configuration["keyspace"])
session.default_timeout = self.configuration.get("timeout", 10)
logger.debug("Cassandra running query: %s", query)
result = session.execute(query)
self._cleanup_cert_file(cert_path)
column_names = result.column_names
columns = self.fetch_columns([(c, "string") for c in column_names])
rows = [dict(zip(column_names, row)) for row in result]
data = {"columns": columns, "rows": rows}
json_data = json_dumps(data, cls=CassandraJSONEncoder)
return json_data, None
def _generate_cert_file(self):
cert_encoded_bytes = self.configuration.get("sslCertificateFile", None)
if cert_encoded_bytes:
with NamedTemporaryFile(mode='w', delete=False) as cert_file:
cert_bytes = b64decode(cert_encoded_bytes)
cert_file.write(cert_bytes.decode("utf-8"))
return cert_file.name
return None
def _cleanup_cert_file(self, cert_path):
if cert_path:
os.remove(cert_path)
def _get_ssl_options(self, cert_path):
ssl_options = None
if self.configuration.get("useSsl", False):
ssl_options = generate_ssl_options_dict(
protocol=self.configuration["sslProtocol"],
cert_path=cert_path
)
return ssl_options
class ScyllaDB(Cassandra):
@classmethod
def type(cls):
return "scylla"
register(Cassandra)
register(ScyllaDB)