Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplifications and performance improvements #8

Merged
merged 10 commits into from
Jul 27, 2021
125 changes: 61 additions & 64 deletions spyql/processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# TODO: optimizations
# [x]: single eval
# [ ]: try to eliminate nested list
# [ ]: try to eliminate wrap row
# [ ]: try to eliminate is instance of
# [ ]: try to eliminate execute (replace vars - needs heads + keywords)
# [x]: try to eliminate wrap row
# [x]: try to eliminate is instance of
# [x]: try to eliminate execute (replace vars - needs heads + keywords)


import csv
Expand Down Expand Up @@ -59,9 +59,13 @@ def make_processor(prs, strings):

def __init__(self, prs, strings):
self.prs = prs #parsed query
self.strings = strings
# by default, a row does not need to be wrapped (only single cols need)
self.wrap_row = False
self.row_instantiation_script = None
self.input_col_names = []
self.strings = strings
self.colnames2idx = {}


# True after header, metadata, etc in input file
def reading_data(self):
Expand All @@ -71,15 +75,17 @@ def reading_data(self):
def handle_header_row(self, row):
pass

# Makes sure a row is always a list of columns (even when there is a single input col)
def wrap_row(self, row):
if not isinstance(row, Iterable): #TO DO: change this takes a lot
return [row]
return row


# Action for handling the first row of data
def handle_1st_data_row(self, row):
self.n_input_cols = len(row) if row else 0
self.n_input_cols = len(row) if row else 0

#dictionary to translate col names to indexes in `_values`
self.colnames2idx.update({self.default_col_name(_i): _i for _i in range(self.n_input_cols)})
if self.input_col_names:
#TODO check if len(input_col_names) == self.n_input_cols
self.colnames2idx.update({self.input_col_names[_i]: _i for _i in range(self.n_input_cols)})


# Create list of output column names
def make_out_cols_names(self, out_cols_names):
Expand Down Expand Up @@ -108,26 +114,17 @@ def get_input_iterators(self):
def default_col_name(self, idx):
return f"col{idx+1}"

def make_row_instantiation_script(self):
# script for instantianting input variables
# should return a list of string with assignment statements
# has access to the `_values` variable, which has a complete input row of values
# this should only be called by `get_row_instantiation_script`
# can be overrided (e.g. json processor overrides this)

vars_script = [f"{self.default_col_name(_i)} = _values[{_i}]" for _i in range(self.n_input_cols)]
if self.input_col_names:
#TODO check if len(input_col_names) == self.n_input_cols
vars_script = [f"{self.input_col_names[_i]} = {vars_script[_i]}" for _i in range(self.n_input_cols)]
return vars_script

# replace identifiers (column names) in sql expressions by references to `_values`
# and put (quoted) strings back
def prepare_expression(self, expr):
for id, idx in self.colnames2idx.items():
pattern = rf"\b({id})\b"
replacement = f"_values[{idx}]"
expr = re.compile(pattern).sub(replacement, expr)

return self.strings.put_strings_back(expr)

# lazy initialization of the row instantiation script
def get_row_instantiation_script(self):
if not self.row_instantiation_script:
vars_script = '\n'.join(self.make_row_instantiation_script())
#print(vars_script)
self.row_instantiation_script = compile(vars_script, '', 'exec')
return self.row_instantiation_script

# main
def go(self):
Expand All @@ -138,17 +135,18 @@ def go(self):
output_handler.finish()

def _go(self, output_handler):
vars = globals() # to do: filter out not useful/internal vars

_values = [[]]
row_number = 0
vars_script = None
#json = {}

# gets user-defined output cols names (with AS alias)
out_cols_names = [c[0] for c in self.prs['select']]

# compiles expressions for calculating outputs
cmds = [self.strings.put_strings_back(c[1]) for c in self.prs['select']] #todo: rename cmds to out_expressions
cmds = compile('[' + ','.join(cmds) + ']', '', 'eval')

cmds = []


explode_it_cmd = None
explode_inst_cmd = None
Expand All @@ -163,32 +161,36 @@ def _go(self, output_handler):
# an input iterator [[1],[2],[3]] is the same as [[1,2,3]]
its_list = self.get_input_iterators()

where = self.prs['where']
if (where):
where = compile(self.strings.put_strings_back(where), '', 'eval')
where = None

logging.info("-- RESULT --")

for its in its_list:
for it in its:
_values = it
_values = it
dcmoura marked this conversation as resolved.
Show resolved Hide resolved

if not self.reading_data():
self.handle_header_row(_values)
continue

_values = self.wrap_row(_values)
if self.wrap_row:
_values = [_values]

# print header
if row_number == 0:
self.handle_1st_data_row(_values)
output_handler.writer.writeheader(self.make_out_cols_names(out_cols_names))
if output_handler.is_done():
return # in case of `limit 0`

# compiles expressions for calculating outputs
cmds = [self.prepare_expression(c[1]) for c in self.prs['select']] #todo: rename cmds to out_expressions
cmds = compile('[' + ','.join(cmds) + ']', '<select>', 'eval')
where = self.prs['where']
if (where):
where = compile(self.prepare_expression(where), '<where>', 'eval')

#make input variables (uses `_values`)
exec(self.get_row_instantiation_script())


explode_its = [None] # 1 element by default (no explosion)
if explode_path:
explode_its = eval(explode_it_cmd)
Expand All @@ -198,13 +200,14 @@ def _go(self, output_handler):
exec(explode_inst_cmd)

row_number = row_number + 1

if not where or eval(where): #filter (opt: eventually could be done before exploding)

vars["_values"] = _values

if not where or eval(where,{},vars): #filter (opt: eventually could be done before exploding)
# input line is eligeble
the_globals = globals()
the_locals = locals() # to do: filter out internal vars

# calculate outputs
_res = [item for sublist in eval(cmds, the_globals, the_locals) for item in sublist]
_res = [item for sublist in eval(cmds,{},vars) for item in sublist]

output_handler.handle_result(_res) #deal with output
if output_handler.is_done():
Expand All @@ -216,46 +219,40 @@ def __init__(self, prs, strings):

# input is a Python expression
def get_input_iterators(self):
return [eval(self.strings.put_strings_back(self.prs['from']))]
e = eval(self.strings.put_strings_back(self.prs['from']))
if e:
if not isinstance(e, Iterable):
e = [e]
self.wrap_row = not isinstance(e[0], Iterable)
return [e]

class TextProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.wrap_row = True # since it's a single col, always wrap it

# reads a text row as a row with 1 column
def get_input_iterators(self):
#return [sys.stdin] #to do: suport files
return [[line.rstrip("\n\r") for line in sys.stdin]]

# since it's a single col, always wrap it
def wrap_row(self, row):
return [row]



class JSONProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.wrap_row = True # since it's a single col, always wrap it
self.colnames2idx.update({"json": 0}) # first column alias as json

def get_input_iterators(self):
return [sys.stdin] #to do: suport files

# since it's a single col, always wrap it
def wrap_row(self, row):
return [row]

def make_row_instantiation_script(self):
# overriding default: json input is considered a single col
return [
"json = jsonlib.loads(_values[0])",
self.default_col_name(0) + " = _values[0] = json"
]


## CSV
class CSVProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.has_header = False
self.wrap_row = False # since it's a multi col, it is already wraped

def get_input_iterators(self):
# Part 1 reads sample to detect dialect and if has header
Expand Down