forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datahelp.py
159 lines (110 loc) · 4.55 KB
/
datahelp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import re
from cassandra.concurrent import execute_concurrent_with_args
def strip(val):
# remove spaces and pipes from beginning/end
return val.strip().strip('|')
def parse_headers_into_list(data):
# throw out leading/trailing space and pipes
# so we can split on the data without getting
# extra empty fields
rows = map(strip, data.split('\n'))
# remove any remaining empty lines (i.e. '') from data
rows = filter(None, rows)
# separate headers from actual data and remove extra spaces from them
headers = [unicode(h.strip()) for h in rows.pop(0).split('|')]
return headers
def get_row_multiplier(row):
# find prefix like *1234 meaning create 1,234 rows
row_cells = [l.strip() for l in row.split('|')]
m = re.findall('\*(\d+)$', row_cells[0])
if m:
return int(m[0])
return None
def row_has_multiplier(row):
if get_row_multiplier(row) is not None:
return True
return False
def parse_row_into_dict(row, headers, format_funcs=None):
row_cells = [l.strip() for l in row.split('|')]
if row_has_multiplier(row):
row_multiplier = get_row_multiplier(row)
row = '|'.join(row_cells[1:]) # cram remainder of row back into foo|bar format
multirows = []
for i in range(row_multiplier):
multirows.append(
parse_row_into_dict(row, headers, format_funcs=format_funcs)
)
return multirows
row_map = dict(zip(headers, row_cells))
if format_funcs:
for colname, value in row_map.items():
func = format_funcs.get(colname)
if func is not None:
row_map[colname] = func(value)
return row_map
def row_describes_data(row):
"""
Returns True if this appears to be a row describing data, otherwise False.
Meant to be used in conjunction with filter to prune out those rows
that don't actually describe data, such as empty strings or decorations
that delimit headers from actual data (i.e. '+----|----|-----+')
"""
if row:
if row.startswith('+') and row.endswith('+'):
return False
return True
return False
def parse_data_into_dicts(data, format_funcs=None):
# throw out leading/trailing space and pipes
# so we can split on the data without getting
# extra empty fields
rows = map(strip, data.split('\n'))
# remove any remaining empty/decoration lines (i.e. '') from data
rows = filter(row_describes_data, rows)
# remove headers
headers = parse_headers_into_list(rows.pop(0))
values = []
for row in rows:
if row_has_multiplier(row):
values.extend(parse_row_into_dict(row, headers, format_funcs=format_funcs))
else:
values.append(parse_row_into_dict(row, headers, format_funcs=format_funcs))
return values
def create_rows(data, session, table_name, cl=None, format_funcs=None, prefix='', postfix=''):
"""
Creates db rows using given session, with table name provided,
using data formatted like:
|colname1|colname2|
+--------+--------+
|value2 |value2 |
format_funcs should be a dictionary of {columnname: function} if data needs to be formatted
before being included in CQL.
Returns a list of maps describing the data created.
"""
values = []
dicts = parse_data_into_dicts(data, format_funcs=format_funcs)
# use the first dictionary to build a prepared statement for all
prepared = session.prepare(
"{prefix} INSERT INTO {table} ({cols}) values ({vals}) {postfix}".format(
prefix=prefix, table=table_name, cols=', '.join(dicts[0].keys()),
vals=', '.join('?' for k in dicts[0].keys()), postfix=postfix)
)
if cl is not None:
prepared.consistency_level = cl
query_results = execute_concurrent_with_args(session, prepared, [d.values() for d in dicts])
for i, (status, result_or_exc) in enumerate(query_results):
# should maybe check status here before appening to expected values
values.append(dicts[i])
return values
def flatten_into_set(iterable):
# use flatten() then convert to a set for set comparisons
return set(flatten(iterable))
def flatten(list_of_dicts):
# flatten list of dicts into list of strings for easier comparison
# and easier set membership testing (e.g. foo is subset of bar)
flattened = []
for _dict in list_of_dicts:
sorted_keys = sorted(_dict)
items = ['{}__{}'.format(k, _dict[k]) for k in sorted_keys]
flattened.append('__'.join(items))
return flattened