In [501]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.astype(str).str.lower()
    df.columns = df.columns.astype(str).str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [502]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: chunk0
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Client_id
    - age_start_observed
    - age_end
    - is_truncated
    - is_censored
    - is_dead
    - date_start_observed
    - date_end_observed

Overwriting file.yaml


In [503]:
# Read config file
import testutility as util
from testutility import *
import pandas as pd
import dask.dataframe as dd
import ray 
import modin.pandas as mpd
import time
import re
import os
import csv
import logging
import gzip
import shutil
import codecs as codec
from multiprocessing import Pool
config_data = util.read_config_file("file.yaml")

In [504]:
# read the file using config file
class SourceFile():
    file_type = config_data['file_type']
    source_file = "./csvchunks/" + config_data['file_name'] + f'.{file_type}'


In [505]:
#1st Way to read csv file: Using PANDAS
# start = time.time()

# def sum_column(chunk, column_idx):
#     return chunk.iloc[:,column_idx].sum()

# chunksize = 10 ** 6
# total_sum = 0
# column_index = 0
#df = pd.DataFrame
# with pd.read_csv('survival_data.csv', chunksize=chunksize) as reader:
#     for chunk in reader:
#         total_sum += sum_column(chunk, column_index)
#          df = pd.concat([chunk])

# print(f"Total rows: {total_sum}")
# print(df.head())
# print(f"Done in {time.time()-start} seconds")

In [506]:
#2nd Way to read csv file: Using DASK
# %%time
# dask_df = dd.read_csv('survival_data.csv', blocksize=25e6)
# pandas_df = dask_df.compute()

In [507]:
#3rd Way to read csv file: Using MODIN 
# %%time
# os.environ["MODIN_ENGINE"] = "dask"
# from distributed import Client
# client = Client(memory_limit='8GB')
# df = mpd.read_csv('survival_data.csv')

In [508]:
#4th Way to read csv file: Using RAY
#%%time
#for reading the whole csv file, it doesn't return ray dataset but list object
#df = ray.data.read_csv(SourceFile.source_file).window(blocks_per_window=12).split(100, equal=True)
df = ray.data.read_csv(SourceFile.source_file)
df = df.to_pandas()
df = df.rename(columns={'Unnamed: 0' : 'client_id'})

df.dropna(0, inplace=True)

print(df.head())

#df = pd.read_csv('test.csv')

#for taking into consideration all cvs file chunks use glob
# import glob
# PATH = 'C:/Users/abdul/Documents/FileIngestionAndSchemaValidation/csvchunks'
# all_files = glob.glob(os.path.join(PATH, "*.csv"))     

# df_from_each_file = (pd.read_csv(f) for f in all_files)
# concatenated_df   = pd.concat(df_from_each_file, ignore_index=True)



Read progress: 100%|██████████| 1/1 [00:00<00:00, 309.45it/s]

   client_id  age_start_observed    age_end  is_truncated  is_censored  \
0   15113102            0.000000   9.097335         False         True   
1   41505894            0.000000  64.486689         False         True   
2   24774171            0.000000  33.071552         False         True   
3   97834936           34.834566  68.778258          True         True   
4   45793809            0.000000  95.948358         False        False   

   is_dead date_start_observed date_end_observed  
0    False          1908-11-17        1917-12-22  
1    False          1828-09-13        1893-03-10  
2    False          1911-02-07        1944-03-04  
3    False          1820-01-01        1853-12-10  
4     True          1870-05-29        1966-05-11  





In [509]:
PATH = 'C:/Users/abdul/Documents/FileIngestionAndSchemaValidation/csvchunks'
isdir = os.path.isdir(PATH)
if(isdir):
     print('CSV File Already segregated into chunks!')
else:
     for i,chunk in enumerate(pd.read_csv(SourceFile.source_file, chunksize=40000)):
          chunk.to_csv('csvchunks/chunk{}.csv'.format(i), index=False)




##total of 2220 chunks generated!! ##

CSV File Already segregated into chunks!


In [510]:
util.col_header_val(df,config_data)
print(df.columns)

column name and column length validation passed
Index(['client_id', 'age_start_observed', 'age_end', 'is_truncated',
       'is_censored', 'is_dead', 'date_start_observed', 'date_end_observed'],
      dtype='object')


In [511]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['client_id', 'age_start_observed', 'age_end', 'is_truncated',
       'is_censored', 'is_dead', 'date_start_observed', 'date_end_observed'],
      dtype='object')
columns of YAML are: ['Client_id', 'age_start_observed', 'age_end', 'is_truncated', 'is_censored', 'is_dead', 'date_start_observed', 'date_end_observed']


In [512]:
import base64
def convert_to_gzip(source_file):

    print("File {} is being Converted".format(config_data['file_name']))

    df.to_csv('temp.txt', sep = config_data['outbound_delimiter'], index=False)

    f_in = open('temp.txt')
    f_out = gzip.open('test.gz', 'wb')

    #csv_w = csv.writer(f_out)

    for row in f_in:
        f_out.write(base64.encodebytes(row.encode()))

    
    f_out.close()
    f_in.close()

    ##Used multiprocessing on the 2220 chunck csv file to convert large csv file to gz format efficiently
    # with Pool() as pool:
    #     result = pool.map(convert_to_gzip, os.listdir(PATH))

    # return result


In [513]:

if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    convert_to_gzip(SourceFile.source_file)

column name and column length validation passed
col validation passed
File chunk0 is being Converted


In [514]:
#parse csv file just for testing purposes
# def parseCSVfile():
#     with open('test.csv', newline='') as infile:
#         reader = csv.reader(infile, dialect = 'excel')
#         with open('temp.txt', mode='w') as outfile:        
#             writer = csv.writer(outfile, delimiter= config_data['outbound_delimiter'])
#             writer.writerows(reader)

In [515]:
# parseCSVfile()

In [516]:
##Another way for converting to gz format but this takes a lot of time
# file_type = 'gz'
# with gzip.open(source_file, "wt") as f:
#     file_type = 'csv'
#     reader =  open(source_file, 'rt') 

#     writer =csv.writer(f)
#     for row in reader:
#         writer.writerow(codec.encode(row))

# f.close()
# reader.close()






In [517]:
#Time Statistics of dataframe reader
#1. pandas = 114.24626564979553 seconds
#2. dask = (Wall time) 2 min 56 s
#3. ray = (Wall time) 1 min 30 s
#4. modin = took much longer than expected


In [518]:
#File Summary
#Total no. rows: 3946661545 (~approx 4 Billion rows)
#Total no. cols: 7
#File size: 6.07 GB (6,525,517,581 bytes)