In [1]:
#from google.colab import drive
#drive.mount('/content/gdrive')

In [2]:
%%writefile test_utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath): 
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.replace(' ', '')#removing the white spaces in the column names
    df.columns = df.columns.str.replace('[#,@,&,?]', '') #removing special characters
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting test_utility.py


In [3]:
%%writefile file.yaml
file_type: csv
dataset_name: taxifile
file_name: yellow_tripdata_2015-03
file_path: /content/gdrive/yellow_tripdata_2015-03.csv
inbound_delimiter: ","
outbound_delimiter: "|"
columns: 
    - vendorid
    - tpep_pickup_datetime
    - tpep_dropoff_datetime
    - passenger_count
    - trip_distance
    - pickup_longitude
    - pickup_latitude
    - ratecodeid
    - store_and_fwd_flag
    - dropoff_longitude
    - dropoff_latitude
    - payment_type
    - fare_amount
    - extra
    - mta_tax
    - tip_amount
    - tolls_amount
    - improvement_surcharge
    - total_amount
  

Overwriting file.yaml


In [4]:
# Reading config file
import test_utility as util
config_data = util.read_config_file("file.yaml")

In [5]:
config_data['inbound_delimiter']

','

In [6]:
#inspecting data of config file
config_data

{'columns': ['vendorid',
  'tpep_pickup_datetime',
  'tpep_dropoff_datetime',
  'passenger_count',
  'trip_distance',
  'pickup_longitude',
  'pickup_latitude',
  'ratecodeid',
  'store_and_fwd_flag',
  'dropoff_longitude',
  'dropoff_latitude',
  'payment_type',
  'fare_amount',
  'extra',
  'mta_tax',
  'tip_amount',
  'tolls_amount',
  'improvement_surcharge',
  'total_amount'],
 'dataset_name': 'taxifile',
 'file_name': 'yellow_tripdata_2015-03',
 'file_path': '/content/gdrive/yellow_tripdata_2015-03.csv',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|'}

In [7]:
import tracemalloc
import time

In [8]:
def tracing_start():
    tracemalloc.stop()
    print("nTracing Status : ", tracemalloc.is_tracing())
    tracemalloc.start()
    print("Tracing Status : ", tracemalloc.is_tracing())
def tracing_mem():
    first_size, first_peak = tracemalloc.get_traced_memory()
    peak = first_peak/(1024*1024)
    print("Peak Size in MB - ", peak)

In [9]:
# Normal reading process of the file
import pandas as pd
tracing_start()
start = time.time()
df_sample = pd.read_csv('/content/gdrive/My Drive/yellow_tripdata_2015-03.csv')

end = time.time()
print("time elapsed {} milli seconds".format((end-start)*1000))
tracing_mem()

nTracing Status :  False
Tracing Status :  True
time elapsed 70665.42172431946 milli seconds
Peak Size in MB -  7488.050775527954


In [10]:
df_sample.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2015-03-06 08:02:31,2015-03-06 08:09:55,1,1.2,-73.990211,40.750969,1,N,-73.987892,40.738037,2,7.0,0.0,0.5,0.0,0.0,0.3,7.8
1,1,2015-03-06 08:02:31,2015-03-06 08:15:23,1,3.2,-73.935188,40.80072,1,N,-73.952553,40.765373,2,11.5,0.0,0.5,0.0,0.0,0.3,12.3
2,1,2015-03-06 08:02:31,2015-03-06 08:12:27,1,1.1,-73.963753,40.767937,1,N,-73.956947,40.78027,2,8.0,0.0,0.5,0.0,0.0,0.3,8.8
3,1,2015-03-06 08:02:31,2015-03-06 08:09:09,1,0.8,-73.997177,40.742168,1,N,-74.008064,40.739281,1,6.0,0.0,0.5,1.0,0.0,0.3,7.8
4,1,2015-03-06 08:02:32,2015-03-06 08:19:37,1,2.7,-74.006844,40.730267,1,N,-73.97686,40.750671,1,13.0,0.0,0.5,2.75,0.0,0.3,16.55


In [11]:
df_sample.shape

(13351609, 19)

In [12]:
# Reading file in Dask
import dask.dataframe as dd
tracing_start()
start = time.time()
df_sample1 = dd.read_csv('/content/gdrive/My Drive/yellow_tripdata_2015-03.csv')
end = time.time()
print("time elapsed {} milli seconds".format((end-start)*1000))
tracing_mem()

nTracing Status :  False
Tracing Status :  True
time elapsed 146.23236656188965 milli seconds
Peak Size in MB -  1.5486268997192383


In [13]:
#!pip install ray[rllib]

In [14]:
#pip install -U pyarrow

In [15]:
# Reading file in Ray
import ray
tracing_start()
start = time.time()
df_sample2 = ray.data.read_csv('/content/gdrive/My Drive/yellow_tripdata_2015-03.csv')
end = time.time()
print("time elapsed {} milli seconds".format((end-start)*1000))
tracing_mem()

nTracing Status :  False
Tracing Status :  True
time elapsed 62025.37298202515 milli seconds
Peak Size in MB -  2.4924240112304688


In [16]:
#pip install modin

In [17]:
# Reading file in Modin
import modin.pandas as mp
tracing_start()
start = time.time()
df_sample3 = mp.read_csv('/content/gdrive/My Drive/yellow_tripdata_2015-03.csv')
end = time.time()
print("time elapsed {} milli seconds".format((end-start)*1000))
tracing_mem()

nTracing Status :  False
Tracing Status :  True


[2m[33m(raylet)[0m [2021-10-08 23:48:08,057 E 4223 4223] local_object_manager.cc:271: Failed to send object spilling request: IOError: 14: Socket closed


time elapsed 184496.8090057373 milli seconds
Peak Size in MB -  997.195990562439


In [18]:
#reading file in csv using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = dd.read_csv(source_file, delimiter=config_data['inbound_delimiter'],assume_missing=True)
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1.0,2015-03-06 08:02:31,2015-03-06 08:09:55,1.0,1.2,-73.990211,40.750969,1.0,N,-73.987892,40.738037,2.0,7.0,0.0,0.5,0.0,0.0,0.3,7.8
1,1.0,2015-03-06 08:02:31,2015-03-06 08:15:23,1.0,3.2,-73.935188,40.80072,1.0,N,-73.952553,40.765373,2.0,11.5,0.0,0.5,0.0,0.0,0.3,12.3
2,1.0,2015-03-06 08:02:31,2015-03-06 08:12:27,1.0,1.1,-73.963753,40.767937,1.0,N,-73.956947,40.78027,2.0,8.0,0.0,0.5,0.0,0.0,0.3,8.8
3,1.0,2015-03-06 08:02:31,2015-03-06 08:09:09,1.0,0.8,-73.997177,40.742168,1.0,N,-74.008064,40.739281,1.0,6.0,0.0,0.5,1.0,0.0,0.3,7.8
4,1.0,2015-03-06 08:02:32,2015-03-06 08:19:37,1.0,2.7,-74.006844,40.730267,1.0,N,-73.97686,40.750671,1.0,13.0,0.0,0.5,2.75,0.0,0.3,16.55


In [19]:
#shape of csv file
df.shape

(Delayed('int-1ebb0b79-4aed-44cf-9424-3734357915c0'), 19)

In [20]:
#validate the header of the ingested csv file
util.col_header_val(df,config_data)

column name and column length validation passed




1

In [21]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'pickup_longitude',
       'pickup_latitude', 'ratecodeid', 'store_and_fwd_flag',
       'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount',
       'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
       'improvement_surcharge', 'total_amount'],
      dtype='object')
columns of YAML are: ['vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'ratecodeid', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']


In [22]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed




In [24]:
#reading file in pipe separated format using config file

out_df = dd.read_csv(source_file, delimiter=config_data['outbound_delimiter'])
out_df.head()

Unnamed: 0,"VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount"
0,"1,2015-03-06 08:02:31,2015-03-06 08:09:55,1,1...."
1,"1,2015-03-06 08:02:31,2015-03-06 08:15:23,1,3...."
2,"1,2015-03-06 08:02:31,2015-03-06 08:12:27,1,1...."
3,"1,2015-03-06 08:02:31,2015-03-06 08:09:09,1,.8..."
4,"1,2015-03-06 08:02:32,2015-03-06 08:19:37,1,2...."


In [25]:
#shape of pipe separated file
out_df.shape

(Delayed('int-d95bfdd0-51bf-461b-911b-ee69300efbd2'), 1)

In [26]:
#creating a gz file
out_df.to_csv(r'/content/gdrive/My Drive/file.gz', single_file=True, compression='gzip')

['/content/gdrive/My Drive/file.gz']