In [1]:
import readtextfile
import writeavrofile
import writetextfile
import writeparquetfile
import utils
import dask.dataframe as dd
import pandas as pd
import ray
import time
from timeit import default_timer as timer
import io
import uuid

In [2]:
if ray.is_initialized:
    ray.init(address="auto")

2021-01-12 21:52:54,063	INFO worker.py:654 -- Connecting to existing Ray cluster at address: 192.168.29.242:6379


In [3]:
df = readtextfile.ReadTextFile(ipfile='/Users/sriyan/Downloads/sales_50mb_1500000.csv',
                                   ipschemafile='/Users/sriyan/Documents/dataprocessor/schema/sample_csv_file.schema',
                                   delimiter=',', skiprows=1, parallel=4).read_using_dask()

Time taken : 0.034280551999998465 seconds for reading file '/Users/sriyan/Downloads/sales_50mb_1500000.csv'


In [23]:
@ray.remote
class Test():

    def transform(self, df, partition_no): 
        '''All transformations before repartition'''
        start = timer()
        
        df = df.get_partition(partition_no).compute()
        df['Order_Date'] = utils.Utils().parse_dates_using_lookup(df['Order_Date'])
        df['Ship_Date'] = utils.Utils().parse_dates_using_lookup(df['Ship_Date'])
        #df = df[df['Region'] == 'Europe']

        print("duration =", timer() - start, " seconds for transform")
        return df

    def concat_dfs(self, dflist):
        start = timer()
        '''Combine all dataframes to repartition them based on key'''
        dfs = pd.concat(dflist)
        #print(dfs.index)
        print("duration =", timer() - start, " seconds for combining data for repartition")
        return dfs

    def re_partition_sort_data(self, df, partition_metadata, partition_keys, sort_keys):
        start = timer()
        '''Prepare partitions based on partition metadata'''
        df = df[df[partition_keys].isin(partition_metadata)]
        if sort_keys is not None:
            '''Sort data based on sort keys'''
            df.sort_values(by=sort_keys)
        print("duration =", timer() - start, " seconds for preparing partition based on keys")
        return df

    def write_file(self, df, partition_no):
        start = timer()
        '''
        df = dd.from_pandas(df, npartitions=1)
        writeparquetfile.WriteParquetFile(ipdf=df, opfile="/tmp/data/sample_with_partitions",
                                      compression='snappy', engine='pyarrow', append=False, overwrite=True, write_metadata_file=False).write_using_dask()'''
        filename = "/tmp/data/sample_textfile." + str(partition_no) + ".txt"
        df.to_csv(filename, sep='|', header=None, encoding='utf-8')
        #writeavrofile.WriteAvroFile(df, partition_no, '/Users/sriyan/Downloads/sample_avro_file').write_using_fastavro()
        print("duration =", timer() - start, " seconds for writing")
        return 1


start = timer()
try:
    actors = {}
    result_ids_step1 = []
    completed_dfs_step1 = []
    completed_ids_step1 = []

    result_ids_step2 = []
    completed_ids_step2 = []

    result_ids_step3 = []
    completed_dfs_step3 = []
    completed_ids_step3 = []

    result_ids_step4 = []
    completed_ids_step4 = []

    npartitions = df.npartitions
    
    ################################STEP1##################################

    '''Workers for executing step1 tasks'''
    for i in range(npartitions):
        actors[i] = Test.remote()
        result_ids_step1.append(actors[i].transform.remote(df, i))
    
    '''Get step1 status'''
    while len(result_ids_step1):
        done_ids, result_ids_step1 = ray.wait(result_ids_step1)
        completed_ids_step1.extend(done_ids if isinstance(done_ids, list) else [done_ids])
    
    '''Prepare list of data frames to be merged '''
    dflist = [ray.get(x) for x in completed_ids_step1]

    '''Close all step1 workers'''
    for x in actors.keys():
        ray.kill(actors[x])
    
    ################################STEP2##################################

    '''workers for step2 tasks'''
    actors[completed_ids_step1[0]] = Test.remote()
    result_ids_step2.append(actors[completed_ids_step1[0]].concat_dfs.remote(dflist))
    
    '''Get step2 status'''
    while len(result_ids_step2):
        done_ids, result_ids_step2 = ray.wait(result_ids_step2)
        completed_ids_step2.extend(done_ids if isinstance(done_ids, list) else [done_ids])
    
    #print(completed_ids_step2)

    '''Get the merged data frame'''
    df_concat = [ray.get(x) for x in completed_ids_step2][0]
    #print(df_concat.info)

    '''Close all step2 workers'''
    for x in actors.keys():
        ray.kill(actors[x])
    
    ################################STEP3##################################

    '''Get the partitions metadata based on keys'''
    partition_keys = 'Region'
    sort_keys = ['Order_Date']
    partition_metadata = utils.Utils().define_partitions(seq = list(df_concat[partition_keys].value_counts(dropna=False).keys()), 
                                                         num_of_partitions = npartitions)

    '''Repartition data: partition using metadata >> sort >> send to workers'''
    for i, x in enumerate(partition_metadata):
        actors[i] = Test.remote()
        result_ids_step3.append(actors[i].re_partition_sort_data.remote(df_concat, x, partition_keys, sort_keys))
    
    '''Get step3 status'''
    while len(result_ids_step3):
        done_ids, result_ids_step3 = ray.wait(result_ids_step3)
        for done_id in done_ids:
            actors[done_id] = Test.remote()
            result_ids_step4.append(actors[done_id].write_file.remote(ray.get(done_id), uuid.uuid4()))
        completed_ids_step3.extend(done_ids if isinstance(done_ids, list) else [done_ids])
    
    ################################STEP4##################################

    '''Get step4 status'''   
    while len(result_ids_step4):
        done_ids_step4, result_ids_step4 = ray.wait(result_ids_step4)
        completed_ids_step4.extend(done_ids_step4 if isinstance(done_ids_step4, list) else [done_ids_step4])
finally:
    ''' All temporary files should be removed
    ray.get(completed_ids_step1)
    ray.get(completed_ids_step2)
    ray.get(completed_ids_step3)
    ray.get(completed_ids_step4)'''
    for x in actors.keys():
        ray.kill(actors[x])
print("duration =", timer() - start, " seconds")

[2m[36m(pid=21916)[0m duration = 9.02068488  seconds for transform
[2m[36m(pid=21917)[0m duration = 9.098256571  seconds for transform
[2m[36m(pid=21915)[0m duration = 9.081197728000001  seconds for transform
[2m[36m(pid=21918)[0m duration = 9.173062334  seconds for transform
[2m[36m(pid=21926)[0m duration = 0.31037634800000014  seconds for combining data for repartition
[2m[36m(pid=21938)[0m duration = 0.5297652750000001  seconds for preparing partition based on keys
[2m[36m(pid=21942)[0m duration = 0.519340717  seconds for preparing partition based on keys
[2m[36m(pid=21941)[0m duration = 0.8600921370000001  seconds for preparing partition based on keys
[2m[36m(pid=21951)[0m duration = 0.47566074999999985  seconds for preparing partition based on keys
[2m[36m(pid=21957)[0m duration = 7.444226695  seconds for writing
[2m[36m(pid=21953)[0m duration = 16.010869579  seconds for writing
[2m[36m(pid=21948)[0m duration = 18.764007976000002  seconds for wr