Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplifications and performance improvements #8

Merged
merged 10 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spyql/output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def write(self, row):
self.rows_written = self.rows_written + 1

def finish(self):
#self.writer.writerow([self.rows_written])
dcmoura marked this conversation as resolved.
Show resolved Hide resolved
self.writer.flush()

class LineInLineOut(OutputHandler):
Expand Down
259 changes: 118 additions & 141 deletions spyql/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from math import *
import logging
from collections.abc import Iterable
from itertools import islice
from itertools import islice, chain

from spyql.writer import Writer
from spyql.output_handler import OutputHandler
Expand All @@ -17,26 +17,27 @@

# extra... need to find some way to add user imports...
# e.g. ~/.spyql.py file with python code to run at startup
import requests




class Processor:

@staticmethod
def make_processor(prs):
def make_processor(prs, strings):
processor_name = prs['from']
if not processor_name:
return Processor(prs)
return Processor(prs, strings)

processor_name = processor_name.upper()

if processor_name == 'JSON':
return JSONProcessor(prs)
return JSONProcessor(prs, strings)
if processor_name == 'CSV':
return CSVProcessor(prs)
return CSVProcessor(prs, strings)
if processor_name == 'TEXT': #single col
return TextProcessor(prs)
return TextProcessor(prs, strings)

return PythonExprProcessor(prs)
return PythonExprProcessor(prs, strings)
# if not reader_name or reader_name == 'CSV':
# return CSVWriter(inputfile, options)

Expand All @@ -48,10 +49,12 @@ def make_processor(prs):



def __init__(self, prs):
def __init__(self, prs, strings):
self.prs = prs #parsed query
self.row_instantiation_script = None
self.input_col_names = []
self.strings = strings #quoted strings
self.input_col_names = [] #column names of the input data
self.colnames2idx = {} #map from column names to indexes


# True after header, metadata, etc in input file
def reading_data(self):
Expand All @@ -60,16 +63,17 @@ def reading_data(self):
# Action for header row (e.g. column name definition)
def handle_header_row(self, row):
pass

# Makes sure a row is always a list of columns (even when there is a single input col)
def wrap_row(self, row):
if not isinstance(row, Iterable):
return [row]
return row


# Action for handling the first row of data
def handle_1st_data_row(self, row):
self.n_input_cols = len(row) if row else 0
self.n_input_cols = len(row) if row else 0

#dictionary to translate col names to indexes in `_values`
self.colnames2idx.update({self.default_col_name(_i): _i for _i in range(self.n_input_cols)})
if self.input_col_names:
#TODO check if len(input_col_names) == self.n_input_cols
self.colnames2idx.update({self.input_col_names[_i]: _i for _i in range(self.n_input_cols)})


# Create list of output column names
def make_out_cols_names(self, out_cols_names):
Expand All @@ -80,44 +84,31 @@ def make_out_cols_names(self, out_cols_names):
out_cols_names = [name for sublist in out_cols_names for name in sublist] #flatten
return out_cols_names

# Returns iterator over input
# Input iterator should be a list of lists of rows for convinience
# Each row can be a list (in case of multiple columns) or a literal (single column)
# Returns iterator over input (e.g. list of rows)
# Each row is list with one value per column
# e.g.
# [[1],[2],[3]] is the same as
# [[1,2],[3]] and is the same as
# [[1,2,3]]: 3 rows with a single col
#
# [[[1,'a']], [[2,'b']], [[3,'c']]] is the same as
# [[[1,'a']], [[2,'b'], [3,'c']]] and is the same as
# [[[1,'a'], [2,'b'], [3,'c']]]: 3 rows with 2 cols
def get_input_iterators(self):
# [[1] ,[2], [3]]: 3 rows with a single col
# [[1,'a'], [2,'b'], [3,'c']]: 3 rows with 2 cols
def get_input_iterator(self):
return [[None]] #default: returns a single line with a 'null' column

dcmoura marked this conversation as resolved.
Show resolved Hide resolved
# Default column names, e.g. col1 for the first column
def default_col_name(self, idx):
return f"col{idx+1}"

# replace identifiers (column names) in sql expressions by references to `_values`
# and put (quoted) strings back
def prepare_expression(self, expr):
dcmoura marked this conversation as resolved.
Show resolved Hide resolved
if expr == '*':
return [f"_values[{idx}]" for idx in range(self.n_input_cols)]

def make_row_instantiation_script(self):
# script for instantianting input variables
# should return a list of string with assignment statements
# has access to the `_values` variable, which has a complete input row of values
# this should only be called by `get_row_instantiation_script`
# can be overrided (e.g. json processor overrides this)

vars_script = [f"{self.default_col_name(_i)} = _values[{_i}]" for _i in range(self.n_input_cols)]
if self.input_col_names:
#TODO check if len(input_col_names) == self.n_input_cols
vars_script = [f"{self.input_col_names[_i]} = {vars_script[_i]}" for _i in range(self.n_input_cols)]
return vars_script
for id, idx in self.colnames2idx.items():
pattern = rf"\b({id})\b"
replacement = f"_values[{idx}]"
expr = re.compile(pattern).sub(replacement, expr)

return [self.strings.put_strings_back(expr)]

# lazy initialization of the row instantiation script
def get_row_instantiation_script(self):
if not self.row_instantiation_script:
vars_script = '\n'.join(self.make_row_instantiation_script())
#print(vars_script)
self.row_instantiation_script = compile(vars_script, '', 'exec')
return self.row_instantiation_script

# main
def go(self):
Expand All @@ -128,17 +119,16 @@ def go(self):
output_handler.finish()

def _go(self, output_handler):
_values = [[]]
row_number = 0
vars_script = None
#json = {}
vars = globals() # to do: filter out not useful/internal vars

select_expr = []
where_expr = None
_values = []
row_number = 0
explode_its = [None] # 1 element by default (no explosion)

# gets user-defined output cols names (with AS alias)
out_cols_names = [c[0] for c in self.prs['select']]

# compiles expressions for calculating outputs
cmds = [c[1] for c in self.prs['select']] #todo: rename cmds to out_expressions
cmds = [compile(cmd, '', 'eval') for cmd in cmds]
out_cols_names = [c['name'] for c in self.prs['select']]

explode_it_cmd = None
explode_inst_cmd = None
Expand All @@ -147,108 +137,96 @@ def _go(self, output_handler):
explode_it_cmd = compile(explode_path, '', 'eval')
explode_inst_cmd = compile(f'{explode_path} = explode_it', '', 'exec')


# should not accept than 1 source, joins, etc (at least for now)
# input iterator is a list of lists for convinence
# an input iterator [[1],[2],[3]] is the same as [[1,2,3]]
its_list = self.get_input_iterators()

where = self.prs['where']
if (where):
where = compile(where, '', 'eval')

logging.info("-- RESULT --")

for its in its_list:
for it in its:
_values = it

if not self.reading_data():
self.handle_header_row(_values)
continue

_values = self.wrap_row(_values)

# print header
if row_number == 0:
self.handle_1st_data_row(_values)
output_handler.writer.writeheader(self.make_out_cols_names(out_cols_names))
if output_handler.is_done():
return # in case of `limit 0`

#make input variables (uses `_values`)
exec(self.get_row_instantiation_script())
# should not accept more than 1 source, joins, etc (at least for now)
for _values in self.get_input_iterator():

if not self.reading_data():
self.handle_header_row(_values)
continue

if row_number == 0:
self.handle_1st_data_row(_values)
output_handler.writer.writeheader(self.make_out_cols_names(out_cols_names))
if output_handler.is_done():
return # in case of `limit 0`

# TODO: move to function(s)
# compiles expressions for calculating outputs
select_expr = [self.prepare_expression(c['expr']) for c in self.prs['select']]
select_expr = [item for sublist in select_expr for item in sublist] #flatten (because of '*')
select_expr = compile('[' + ','.join(select_expr) + ']', '<select>', 'eval')
where_expr = self.prs['where']
if (where_expr):
#TODO: check if * is not used in where... or pass argument
where_expr = compile(self.prepare_expression(where_expr)[0], '<where>', 'eval')

if explode_path:
explode_its = eval(explode_it_cmd)

explode_its = [None] # 1 element by default (no explosion)
for explode_it in explode_its:
if explode_path:
explode_its = eval(explode_it_cmd)

for explode_it in explode_its:
if explode_path:
exec(explode_inst_cmd)
exec(explode_inst_cmd)

row_number = row_number + 1

row_number = row_number + 1
vars["_values"] = _values

if not where_expr or eval(where_expr,{},vars): #filter (opt: eventually could be done before exploding)
# input line is eligeble

if not where or eval(where): #filter
# input line is eligeble
the_globals = globals()
the_locals = locals() # to do: filter out internal vars
# calculate outputs
_res = [eval(cmd, the_globals, the_locals) for cmd in cmds]
_res = [item for sublist in _res for item in sublist] #flatten

output_handler.handle_result(_res) #deal with output
if output_handler.is_done():
return #e.g. when reached limit
# calculate outputs
_res = eval(select_expr,{},vars)

output_handler.handle_result(_res) #deal with output
if output_handler.is_done():
return #e.g. when reached limit


class PythonExprProcessor(Processor):
def __init__(self, prs):
super().__init__(prs)
def __init__(self, prs, strings):
super().__init__(prs, strings)

# input is a Python expression
def get_input_iterators(self):
return [eval(self.prs['from'])]
def get_input_iterator(self):
e = eval(self.strings.put_strings_back(self.prs['from']))
if e:
if not isinstance(e, Iterable):
e = [e]
if not isinstance(e[0], Iterable):
e = [[el] for el in e]
return e


class TextProcessor(Processor):
def __init__(self, prs):
super().__init__(prs)
def __init__(self, prs, strings):
super().__init__(prs, strings)

# reads a text row as a row with 1 column
def get_input_iterators(self):
#return [sys.stdin] #to do: suport files
return [[line.rstrip("\n\r") for line in sys.stdin]]

# since it's a single col, always wrap it
def wrap_row(self, row):
return [row]
def get_input_iterator(self):
#to do: suport files
return [[line.rstrip("\n\r")] for line in sys.stdin]


class JSONProcessor(Processor):
def __init__(self, prs):
super().__init__(prs)

def get_input_iterators(self):
return [sys.stdin] #to do: suport files
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.colnames2idx.update({"json": 0}) # first column alias as json

# since it's a single col, always wrap it
def wrap_row(self, row):
return [row]

def make_row_instantiation_script(self):
# overriding default: json input is considered a single col
return [
"json = jsonlib.loads(_values[0])",
self.default_col_name(0) + " = _values[0] = json"
]
# 1 row = 1 json
def get_input_iterator(self):
#to do: suport files
return [[jsonlib.loads(line)] for line in sys.stdin]


## CSV
class CSVProcessor(Processor):
def __init__(self, prs):
super().__init__(prs)
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.has_header = False

def get_input_iterators(self):
def get_input_iterator(self):
# Part 1 reads sample to detect dialect and if has header
# TODO: infer data type
sample_size = 10 #make a input parameter
Expand All @@ -262,9 +240,9 @@ def get_input_iterators(self):
#print(self.has_header)
#print(dialect)
sample.seek(0) #rewinds the sample
return [
return chain(
csv.reader(sample, dialect), #goes through sample again (for reading input data)
csv.reader(sys.stdin, dialect)] #continues to the rest of the file
csv.reader(sys.stdin, dialect)) #continues to the rest of the file
#TODO: suport files

def reading_data(self):
Expand All @@ -288,4 +266,3 @@ def make_str_valid_varname(self, s):
s = re.sub(r'\s+', '_', s)

return s

Loading