## Week 6: File ingestion and schema validation


Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character, white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

Total number of rows,
Total number of columns
File size

---
## File ingestion

In [1]:
# import libraries
import os
import time

In [2]:
# file size confirmation
os.path.getsize('transactions_train.csv')

3488002253

In [3]:
# read data with Dask
from dask import dataframe as dd
start_time = time.time()
dask_df = dd.read_csv('transactions_train.csv')
end_time = time.time()
print(f'It took {end_time - start_time} seconds to read the csv file with Dask.')

It took 0.010052919387817383 seconds to read the csv file with Dask.


In [4]:
# read data with Pandas
import pandas as pd
start_time = time.time()
pd_df = pd.read_csv('transactions_train.csv')#, error_bad_lines=False)
end_time = time.time()
print(f'It took {end_time - start_time} seconds to read the csv file with Pandas.')

It took 37.013733863830566 seconds to read the csv file with Pandas.


## Dask is more effecient reading the csv file

In [5]:
# number of columns and data frame information
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, t_dat to sales_channel_id
dtypes: object(2), float64(1), int64(2)

In [6]:
# number of rows
len(dask_df)

28425882

In [27]:
# remove special character
dask_df.columns=dask_df.columns.str.replace('[#,@,&]','')

  dask_df.columns=dask_df.columns.str.replace('[#,@,&]','')


In [8]:
# remove white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

In [9]:
# column names
data=dask_df.columns
data

Index(['t_dat', 'customer_id', 'article_id', 'price', 'sales_channel_id'], dtype='object')

---
## Schema Validation

In [17]:
# import libraries
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [19]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(dask_df,table_config):
    dask_df.columns = dask_df.columns.str.lower()
    dask_df.columns = dask_df.columns.str.replace('[^\w]','_',regex=True)
    dask_df.columns = list(map(lambda x: x.strip('_'), list(dask_df.columns)))
    dask_df.columns = list(map(lambda x: replacer(x,'_'), list(dask_df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    dask_df.columns =list(map(lambda x: x.lower(), list(dask_df.columns)))
    dask_df = dask_df.reindex(sorted(dask_df.columns), axis=1)
    if len(dask_df.columns) == len(expected_col) and list(expected_col)  == list(dask_df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(dask_df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(dask_df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {dask_df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [24]:
%%writefile tran.yaml
file_type: csv
dataset_name: file
file_name: transactions
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - t_dat
    - customer_id
    - price
    - sales_channel_id

Writing tran.yaml
