In [1]:
from scan import scan
import mapper as mp
from reducer import reduce
from io_util import read_groups
import pandas

In [2]:
def q6_filter(df: pandas.DataFrame) -> pandas.DataFrame:
    after_1994=df['L_SHIPDATE']>='1994-01-01'
    before_1995=df['L_SHIPDATE']<'1995-01-01'
    greater_d=df['L_DISCOUNT']>0.05
    lower_d=df['L_DISCOUNT']<0.07
    lower_q=df['L_QUANTITY']<24
    return df[greater_d & lower_d &lower_q & before_1995 &after_1994]


In [7]:
import random
def q6_rand_filter(df):
    date=random.randrange(1993, 1998, 1)
    discount=round(random.uniform(0.02,0.09), 2)
    quantity=random.randrange(24, 26, 1)
    after_date=df['L_SHIPDATE']>='{}-01-01'.format(date)
    before_date=df['L_SHIPDATE']<'{}-01-01'.format(date+1)
    greater_d=df['L_DISCOUNT']>discount-0.01
    lower_d=df['L_DISCOUNT']<discount+0.01
    lower_q=df['L_QUANTITY']<quantity
    print("selected date: {}-01-01\nselected discount: {}\nselected quantity: {}".format(date,discount,quantity))
    df=df[greater_d & lower_d &lower_q & before_date &after_date]
    print("resulting in {} selected rows\n-----------------------".format(df.shape[0]))
    df=df[['L_EXTENDEDPRICE', 'L_DISCOUNT']]
    return df


In [4]:
def q6_mapper(df):
    out=pandas.DataFrame(columns=['group','value'])
    for index,row in df.iterrows():
        out=out.append({'group':1,'value':row['L_DISCOUNT']*row['L_EXTENDEDPRICE']}, ignore_index=True)
    return out

In [5]:
def q6_reducer(df):
    sum=0
    for index,row in df.iterrows():
        sum+=row['value']
    return pandas.DataFrame([['result',sum]],columns=['result', 'value'])

In [8]:
import timeit
import os
import shutil
runs = 2
def run(func,*args):
    def test():
        func(*args)
    return test

def scan_map_reduce(in_folder,out_folder,groups_in,groups_out):
    scan(in_folder,"{}/test_scan.parquet".format(out_folder),q6_rand_filter,groups_in,groups_out)
    mp.map("{}/test_scan.parquet".format(out_folder), "{}/test_map.parquet".format(out_folder),q6_mapper,groups_in,groups_out )
    reduce("{}/test_map.parquet".format(out_folder),"{}/test_reduce.parquet".format(out_folder),q6_reducer,groups_in,1)

out_folder="./test"
in_folder="../resources/groups.parquet"
shutil.rmtree(out_folder)
os.mkdir(out_folder)
t = timeit.Timer(run(scan_map_reduce,in_folder,out_folder,[0,1],2))
duration = t.timeit(runs)
print("{} runs took {}s ".format(runs,duration))

selected date: 1993-01-01
selected discount: 0.07
selected quantity: 25
resulting in 132672 selected rows
-----------------------
selected date: 1996-01-01
selected discount: 0.08
selected quantity: 24
resulting in 127287 selected rows
-----------------------
2 runs took 359.59758381199936s 
