Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplifications and performance improvements #8

Merged
merged 10 commits into from
Jul 27, 2021
156 changes: 64 additions & 92 deletions spyql/processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# TODO: optimizations
# [x]: single eval
# [ ]: try to eliminate nested list
# [x]: try to eliminate wrap row
# [x]: try to eliminate is instance of
# [x]: try to eliminate execute (replace vars - needs heads + keywords)


import csv
import json as jsonlib
import sys
Expand All @@ -14,7 +6,7 @@
from math import *
import logging
from collections.abc import Iterable
from itertools import islice
from itertools import islice, chain

from spyql.writer import Writer
from spyql.output_handler import OutputHandler
Expand Down Expand Up @@ -61,8 +53,6 @@ def __init__(self, prs, strings):
self.prs = prs #parsed query
self.strings = strings
# by default, a row does not need to be wrapped (only single cols need)
self.wrap_row = False
self.row_instantiation_script = None
self.input_col_names = []
self.colnames2idx = {}

Expand Down Expand Up @@ -96,28 +86,22 @@ def make_out_cols_names(self, out_cols_names):
out_cols_names = [name for sublist in out_cols_names for name in sublist] #flatten
return out_cols_names

# Returns iterator over input
# Input iterator should be a list of lists of rows for convinience
# Each row can be a list (in case of multiple columns) or a literal (single column)
# Returns iterator over input (e.g. list if rows)
dcmoura marked this conversation as resolved.
Show resolved Hide resolved
# Each row is list with one value per column
# e.g.
# [[1],[2],[3]] is the same as
# [[1,2],[3]] and is the same as
# [[1,2,3]]: 3 rows with a single col
#
# [[[1,'a']], [[2,'b']], [[3,'c']]] is the same as
# [[[1,'a']], [[2,'b'], [3,'c']]] and is the same as
# [[[1,'a'], [2,'b'], [3,'c']]]: 3 rows with 2 cols
def get_input_iterators(self):
# [[1] ,[2], [3]]: 3 rows with a single col
# [[1,'a'], [2,'b'], [3,'c']]: 3 rows with 2 cols
def get_input_iterator(self):
return [[None]] #default: returns a single line with a 'null' column

dcmoura marked this conversation as resolved.
Show resolved Hide resolved
# Default column names, e.g. col1 for the first column
def default_col_name(self, idx):
return f"col{idx+1}"


# replace identifiers (column names) in sql expressions by references to `_values`
# and put (quoted) strings back
def prepare_expression(self, expr):
def prepare_expression(self, expr):
dcmoura marked this conversation as resolved.
Show resolved Hide resolved
if expr == '*':
return [f"_values[{idx}]" for idx in range(self.n_input_cols)]

Expand All @@ -140,127 +124,115 @@ def go(self):
def _go(self, output_handler):
vars = globals() # to do: filter out not useful/internal vars

_values = [[]]
_values = []
row_number = 0
vars_script = None
#json = {}

# gets user-defined output cols names (with AS alias)
out_cols_names = [c[0] for c in self.prs['select']]

cmds = []
cmds = []


explode_it_cmd = None
explode_inst_cmd = None
explode_path = self.prs['explode']
if (explode_path):
explode_it_cmd = compile(explode_path, '', 'eval')
explode_inst_cmd = compile(f'{explode_path} = explode_it', '', 'exec')


# should not accept than 1 source, joins, etc (at least for now)
# input iterator is a list of lists for convinence
# an input iterator [[1],[2],[3]] is the same as [[1,2,3]]
its_list = self.get_input_iterators()
explode_inst_cmd = compile(f'{explode_path} = explode_it', '', 'exec')

where = None
explode_its = [None] # 1 element by default (no explosion)

logging.info("-- RESULT --")

for its in its_list:
for it in its:
_values = it

if not self.reading_data():
self.handle_header_row(_values)
continue

# should not accept more than 1 source, joins, etc (at least for now)
for _values in self.get_input_iterator():

if not self.reading_data():
self.handle_header_row(_values)
continue

# print header
dcmoura marked this conversation as resolved.
Show resolved Hide resolved
if row_number == 0:
self.handle_1st_data_row(_values)
output_handler.writer.writeheader(self.make_out_cols_names(out_cols_names))
if output_handler.is_done():
return # in case of `limit 0`

# TODO: move to function(s)
# compiles expressions for calculating outputs
cmds = [self.prepare_expression(c[1]) for c in self.prs['select']] #todo: rename cmds to out_expressions
cmds = [item for sublist in cmds for item in sublist] #flatten (because of '*')
cmds = compile('[' + ','.join(cmds) + ']', '<select>', 'eval')
where = self.prs['where']
if (where):
#TODO: check if * is not used in where... or pass argument
where = compile(self.prepare_expression(where)[0], '<where>', 'eval')

if explode_path:
explode_its = eval(explode_it_cmd)

if self.wrap_row:
_values = [_values]

# print header
if row_number == 0:
self.handle_1st_data_row(_values)
output_handler.writer.writeheader(self.make_out_cols_names(out_cols_names))
if output_handler.is_done():
return # in case of `limit 0`

# TODO: move to function(s)
# compiles expressions for calculating outputs
cmds = [self.prepare_expression(c[1]) for c in self.prs['select']] #todo: rename cmds to out_expressions
cmds = [item for sublist in cmds for item in sublist] #flatten (because of '*')
cmds = compile('[' + ','.join(cmds) + ']', '<select>', 'eval')
where = self.prs['where']
if (where):
#TODO: check if * is not used in where... or pass argument
where = compile(self.prepare_expression(where)[0], '<where>', 'eval')

for explode_it in explode_its:
if explode_path:
explode_its = eval(explode_it_cmd)

for explode_it in explode_its:
if explode_path:
exec(explode_inst_cmd)
exec(explode_inst_cmd)

row_number = row_number + 1
row_number = row_number + 1

vars["_values"] = _values
vars["_values"] = _values

if not where or eval(where,{},vars): #filter (opt: eventually could be done before exploding)
# input line is eligeble
# calculate outputs
_res = eval(cmds,{},vars)
if not where or eval(where,{},vars): #filter (opt: eventually could be done before exploding)
# input line is eligeble

# calculate outputs
_res = eval(cmds,{},vars)

output_handler.handle_result(_res) #deal with output
if output_handler.is_done():
return #e.g. when reached limit
output_handler.handle_result(_res) #deal with output
if output_handler.is_done():
return #e.g. when reached limit


class PythonExprProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)

# input is a Python expression
def get_input_iterators(self):
def get_input_iterator(self):
e = eval(self.strings.put_strings_back(self.prs['from']))
if e:
if not isinstance(e, Iterable):
e = [e]
self.wrap_row = not isinstance(e[0], Iterable)
return [e]
if not isinstance(e[0], Iterable):
e = [[el] for el in e]
return e

class TextProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.wrap_row = True # since it's a single col, always wrap it

# reads a text row as a row with 1 column
def get_input_iterators(self):
#return [sys.stdin] #to do: suport files
return [[line.rstrip("\n\r") for line in sys.stdin]]
def get_input_iterator(self):
#to do: suport files
return [[line.rstrip("\n\r")] for line in sys.stdin]



class JSONProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.wrap_row = True # since it's a single col, always wrap it
self.colnames2idx.update({"json": 0}) # first column alias as json

def get_input_iterators(self):
return [sys.stdin] #to do: suport files
# 1 row = 1 json
def get_input_iterator(self):
#to do: suport files
return [[jsonlib.loads(line)] for line in sys.stdin]

## CSV
class CSVProcessor(Processor):
def __init__(self, prs, strings):
super().__init__(prs, strings)
self.has_header = False
self.wrap_row = False # since it's a multi col, it is already wraped

def get_input_iterators(self):
def get_input_iterator(self):
# Part 1 reads sample to detect dialect and if has header
# TODO: infer data type
sample_size = 10 #make a input parameter
Expand All @@ -274,9 +246,9 @@ def get_input_iterators(self):
#print(self.has_header)
#print(dialect)
sample.seek(0) #rewinds the sample
return [
return chain(
csv.reader(sample, dialect), #goes through sample again (for reading input data)
csv.reader(sys.stdin, dialect)] #continues to the rest of the file
csv.reader(sys.stdin, dialect)) #continues to the rest of the file
#TODO: suport files

def reading_data(self):
Expand Down