-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
query_result.py
123 lines (90 loc) · 3.15 KB
/
query_result.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import io
import csv
import xlsxwriter
from funcy import rpartial, project
from dateutil.parser import isoparse as parse_date
from redash.utils import json_loads, UnicodeWriter
from redash.query_runner import TYPE_BOOLEAN, TYPE_DATE, TYPE_DATETIME
from redash.authentication.org_resolving import current_org
def _convert_format(fmt):
return (
fmt.replace("DD", "%d")
.replace("MM", "%m")
.replace("YYYY", "%Y")
.replace("YY", "%y")
.replace("HH", "%H")
.replace("mm", "%M")
.replace("ss", "%S")
.replace("SSS", "%f")
)
def _convert_bool(value):
if value is True:
return "true"
elif value is False:
return "false"
return value
def _convert_datetime(value, fmt):
if not value:
return value
try:
parsed = parse_date(value)
ret = parsed.strftime(fmt)
except Exception:
return value
return ret
def _get_column_lists(columns):
date_format = _convert_format(current_org.get_setting("date_format"))
datetime_format = _convert_format(
"{} {}".format(
current_org.get_setting("date_format"),
current_org.get_setting("time_format"),
)
)
special_types = {
TYPE_BOOLEAN: _convert_bool,
TYPE_DATE: rpartial(_convert_datetime, date_format),
TYPE_DATETIME: rpartial(_convert_datetime, datetime_format),
}
fieldnames = []
special_columns = dict()
for col in columns:
fieldnames.append(col["name"])
for col_type in special_types.keys():
if col["type"] == col_type:
special_columns[col["name"]] = special_types[col_type]
return fieldnames, special_columns
def serialize_query_result(query_result, is_api_user):
if is_api_user:
publicly_needed_keys = ["data", "retrieved_at"]
return project(query_result.to_dict(), publicly_needed_keys)
else:
return query_result.to_dict()
def serialize_query_result_to_dsv(query_result, delimiter):
s = io.StringIO()
query_data = query_result.data
fieldnames, special_columns = _get_column_lists(query_data["columns"] or [])
writer = csv.DictWriter(s, extrasaction="ignore", fieldnames=fieldnames, delimiter=delimiter)
writer.writeheader()
for row in query_data["rows"]:
for col_name, converter in special_columns.items():
if col_name in row:
row[col_name] = converter(row[col_name])
writer.writerow(row)
return s.getvalue()
def serialize_query_result_to_xlsx(query_result):
output = io.BytesIO()
query_data = query_result.data
book = xlsxwriter.Workbook(output, {"constant_memory": True})
sheet = book.add_worksheet("result")
column_names = []
for c, col in enumerate(query_data["columns"]):
sheet.write(0, c, col["name"])
column_names.append(col["name"])
for r, row in enumerate(query_data["rows"]):
for c, name in enumerate(column_names):
v = row.get(name)
if isinstance(v, (dict, list)):
v = str(v)
sheet.write(r + 1, c, v)
book.close()
return output.getvalue()