# File Ingestion and Schema Validation

In [1]:
import os 
from timeit import default_timer as timer

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

## File Reading Using Different Methods

__DASK:__

In [3]:
from dask import dataframe as dd 
start = timer()
df = dd.read_csv('Dataset/eCommerce.csv')
end = timer()
print("Read CSV with Dask:", round(end - start, 6), "sec")

Read CSV with Dask: 0.023096 sec


__PANDAS__:

In [4]:
import pandas as pd 
start = timer()
df = pd.read_csv('Dataset/eCommerce.csv')
end = timer()
print("Read CSV with Pandas:", round(end - start, 6), "sec")

Read CSV with Pandas: 141.200817 sec


__RAY:__

In [5]:
import ray
start = timer()
df = ray.data.read_csv('Dataset/eCommerce.csv')
end = timer()
print("Read CSV with Ray:", round(end - start, 6), "sec")

2023-02-08 16:33:04,836	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[2m[36m(raylet)[0m Spilled 2051 MiB, 4 objects, write throughput 111 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[2m[36m(raylet)[0m Spilled 4103 MiB, 8 objects, write throughput 139 MiB/s.


Read CSV with Ray: 107.579427 sec


__MODIN (POWERED BY RAY):__

In [6]:
import modin.pandas as mpd
start = timer()
df = mpd.read_csv('Dataset/eCommerce.csv')
end = timer()
print("Read CSV with Modin:", round(end - start, 6), "sec")



Read CSV with Modin: 198.760036 sec


#### Based on the computational efficiency, we can see that Dask did better in file reading compared to Pandas, Ray, and Modin, with a reading time at 0.02 

## Summary of the File (Working with Dask)

In [7]:
from dask import dataframe as dd 

df = dd.read_csv('Dataset/eCommerce.csv')
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, event_time to user_session
dtypes: object(5), float64(1), int64(3)

In [8]:
# total number of rows
len(df.index)

42448764

In [9]:
# total number of columns
len(df.columns)

9

In [37]:
# size of the file
os.path.getsize('Dataset/eCommerce.csv')

5668612855

In [10]:
df_col = df.columns
df_col

Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')

* __The total number of observations the file have is 42248764__
* __The total number of columns the file have is 9__ 
* __The file size is 5.67 GB or 5668612855 bytes__

## Basic Validation
* __Removing special characters and white spaces from columns__

__Creating TestUtility.py File__

In [12]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


__Crating a YAML file__

In [13]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: eCommerce
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Event_time
    - Event_type 
    - ProductID
    - CategoryID
    - Category_code 
    - Brand
    - Price 
    - UserID 
    - User_session

Overwriting file.yaml


In [14]:
# Reading config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [15]:
config_data['inbound_delimiter']

','

In [16]:
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'eCommerce',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Event_time',
  'Event_type',
  'ProductID',
  'CategoryID',
  'Category_code',
  'Brand',
  'Price',
  'UserID',
  'User_session']}

In [47]:
# Reading process of file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('Dataset/eCommerce.csv')
df_sample.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [48]:
# Read the file using config file
file_type = config_data['file_type']
source_file = "Dataset/" + config_data['file_name'] + f'.{file_type}'
print("",source_file)

 Dataset/eCommerce.csv


In [49]:
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [50]:
#validating the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['product_id', 'user_id', 'category_id']
Following YAML columns are not in the file uploaded ['categoryid', 'userid', 'productid']


0

In [51]:
print("Columns of files are:" ,df.columns)
print("Columns of YAML are:" ,config_data['columns'])

Columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
Columns of YAML are: ['Event_time', 'Event_type', 'ProductID', 'CategoryID', 'Category_code', 'Brand', 'Price', 'UserID', 'User_session']


In [52]:
if util.col_header_val(df,config_data)==0:
    print("validation failed, rejecting the file! Look into the validation header of the file.")
else:
    print("col validation passed perform further action in the pipeline!")

column name and column length validation failed
Following File columns are not in the YAML file ['product_id', 'user_id', 'category_id']
Following YAML columns are not in the file uploaded ['categoryid', 'userid', 'productid']
validation failed, rejecting the file! Look into the validation header of the file.


## Write File in GZ Format  

__Writing the CSV format of the file in gz format in pipe separated text file (|)__

In [42]:
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('Dataset/eCommerce.csv',delimiter=',')

# Create CSV with gzip compression in Dask 
# to_csv = write object to a csv format
df.to_csv('eCommerce.csv.gz',
          sep='|',
          header=True,
          index=False,
          compression='gzip',
          line_terminator='\n')

['/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/00.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/01.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/02.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/03.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/04.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/05.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/06.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/07.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/08.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/09.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/10.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/11.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/12.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/13.part',
 '/Users/elissakuon/data_glacier_repos/eCommerce.csv.gz/14.part',
 '/Users/e

__Code outputs a list of all files within the CSV files in gz format, where there is a total of 87 files__

#### Size of the CSV file in gz format

In [25]:
# size of the gz format folder
os.path.getsize('eCommerce.csv.gz')

2880