## Task: File Ingestion and Schema validation

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import os
import time

### Read in the data with Dask

In [5]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.1403210163116455 sec


### Read in the data with Pandas

In [3]:
import pandas as pd
start = time.time()
df = pd.read_csv('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  974.9344456195831 sec


### Here Dask is better than Pandas

### Read data

In [4]:
from dask import dataframe as dd
df = dd.read_csv('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv',delimiter=',')

In [5]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 19 entries, VendorID to total_amount
dtypes: object(3), float64(12), int64(4)

In [6]:
#No. of Rows
len(df.index)

12748986

In [7]:
#No, of Columns
len(df.columns)

19

In [8]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

In [9]:
#Size of the file
os.path.getsize('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv')

1985964692

In [10]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [11]:
data=df.columns
data

Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'pickup_longitude',
       'pickup_latitude', 'RateCodeID', 'store_and_fwd_flag',
       'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount',
       'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
       'improvement_surcharge', 'total_amount'],
      dtype='object')

### Validation

In [8]:
pip install pyaml

Collecting pyaml
  Downloading pyaml-21.10.1-py2.py3-none-any.whl (24 kB)
Installing collected packages: pyaml
Successfully installed pyaml-21.10.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
pip install pyyaml

Note: you may need to restart the kernel to use updated packages.


In [11]:
pip install yamlmagic

Collecting yamlmagic
  Downloading yamlmagic-0.2.0-py2.py3-none-any.whl (5.5 kB)
Installing collected packages: yamlmagic
Successfully installed yamlmagic-0.2.0
Note: you may need to restart the kernel to use updated packages.


In [12]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [36]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: x.replace(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [37]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: yellow_tripdata_2015-01
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - VendorID 
    - tpep_pickup_datetime
    - tpep_dropoff_datetime
    - passenger_count 
    - trip_distance
    - pickup_longitude
    - pickup_latitude
    - RateCodeID
    - store_and_fwd_flag
    - dropoff_longitude
    - dropoff_latitude
    - payment_type
    - fare_amount
    - extra
    - mta_tax
    - tip_amount
    - tolls_amount
    - improvement_surcharge
    - total_amount

Overwriting store.yaml


In [38]:
import yaml
with open('store.yaml') as f:
    my_dict = yaml.safe_load(f)

In [29]:
#data of config file
my_dict

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'yellow_tripdata_2015-01',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['VendorID',
  'tpep_pickup_datetime',
  'tpep_dropoff_datetime',
  'passenger_count',
  'trip_distance',
  'pickup_longitude',
  'pickup_latitude',
  'RateCodeID',
  'store_and_fwd_flag',
  'dropoff_longitude',
  'dropoff_latitude',
  'payment_type',
  'fare_amount',
  'extra',
  'mta_tax',
  'tip_amount',
  'tolls_amount',
  'improvement_surcharge',
  'total_amount']}

In [17]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv',delimiter=',')
df_sample.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,N,-73.974785,40.750618,1,12.0,1.0,0.5,3.25,0.0,0.3,17.05
1,1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.3,-74.001648,40.724243,1,N,-73.994415,40.759109,1,14.5,0.5,0.5,2.0,0.0,0.3,17.8
2,1,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.8,-73.963341,40.802788,1,N,-73.95182,40.824413,2,9.5,0.5,0.5,0.0,0.0,0.3,10.8
3,1,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.5,-74.009087,40.713818,1,N,-74.004326,40.719986,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8
4,1,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.0,-73.971176,40.762428,1,N,-74.004181,40.742653,2,15.0,0.5,0.5,0.0,0.0,0.3,16.3


In [18]:
#Reading the file using config file
file_type = my_dict['file_type']
source_file = "C:/Users/starinfo/Desktop/Summer/DFI/" + my_dict['file_name'] + f'.{file_type}'

In [19]:
import pandas as pd
df = pd.read_csv(source_file,my_dict['inbound_delimiter'])
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,N,-73.974785,40.750618,1,12.0,1.0,0.5,3.25,0.0,0.3,17.05
1,1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.3,-74.001648,40.724243,1,N,-73.994415,40.759109,1,14.5,0.5,0.5,2.0,0.0,0.3,17.8
2,1,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.8,-73.963341,40.802788,1,N,-73.95182,40.824413,2,9.5,0.5,0.5,0.0,0.0,0.3,10.8
3,1,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.5,-74.009087,40.713818,1,N,-74.004326,40.719986,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8
4,1,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.0,-73.971176,40.762428,1,N,-74.004181,40.742653,2,15.0,0.5,0.5,0.0,0.0,0.3,16.3


In [40]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,my_dict['columns'])

columns of files are: Index(['vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'pickup_longitude',
       'pickup_latitude', 'ratecodeid', 'store_and_fwd_flag',
       'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount',
       'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
       'improvement_surcharge', 'total_amount'],
      dtype='object')
columns of YAML are: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'RateCodeID', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']


In [51]:
import gzip
f_in = open('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv')
f_out = gzip.open('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv.gz', 'wb')
#f_out.writelines(f_in)
f_out.close()
f_in.close()

In [52]:
#number of files in gz format folder
import os
entries = os.listdir('C:/Users/starinfo/Desktop/Summer/DFI/')
for entry in entries:
    print(entry)

.ipynb_checkpoints
File ingestion and schema validation.ipynb
store.yaml
store.yaml.gz
utility.py
yellow_tripdata.csv
yellow_tripdata_2015-01.csv
yellow_tripdata_2015-01.csv.gz
__pycache__


In [53]:
#size of the gz format folder
os.path.getsize('C:/Users/starinfo/Desktop/Summer/DFI/yellow_tripdata_2015-01.csv.gz')

48