# DATA SCIENCE INTERNSHIP AT DATA GLACIER


## Week 06 Assignment

### Project: File ingestion and schema validation

#### Author: _Chooladeva Piyasiri_
---

In [1]:
import os
import time

import warnings
warnings.filterwarnings("ignore")

## 1. Reading the data 

- ### with Dask

In [3]:
from dask import dataframe as dd

start = time.time()
dask_df = dd.read_csv('train.csv')
end = time.time()
dask_duration = end - start

print("Time to read with Modin: {} seconds".format(round(dask_duration, 3)))

Time to read with Modin: 0.0 seconds


- ### with Pandas

In [4]:
import pandas as pd

start = time.time()
df = pd.read_csv('train.csv')
end = time.time()
pandas_duration = end - start

print("Time to read with pandas: {} seconds".format(round(pandas_duration, 3)))

Time to read with pandas: 1.274 seconds


- ### with Ray & Modin

In [5]:
from dask import dataframe as dd

start = time.time()
dask_df = dd.read_csv('train.csv')
end = time.time()
dask_duration = end - start

print("Time to read with Modin: {} seconds".format(round(dask_duration, 3)))

Time to read with Modin: 0.016 seconds


## 2. Perform basic validation on data columns : eg: remove special character , white spaces from the col name

In [6]:
from dask import dataframe as dd

dask_data = dd.read_csv('train.csv',delimiter=',')

- ### Remove special character

In [7]:
dask_data.columns=dask_data.columns.str.replace('[#,@,&]','')

- ### Remove white space from columns

In [8]:
dask_data.columns = dask_data.columns.str.replace(' ', '')

In [9]:
dask_data.columns

Index(['key', 'fare_amount', 'pickup_datetime', 'pickup_longitude',
       'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'passenger_count'],
      dtype='object')

- ### Validation

In [10]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


## 3. Create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML 

In [11]:
%%writefile file.yaml
file_type: csv
dataset_name: taxi_fare
file_name: train
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - key
    - fare_amount
    - pickup_datetime
    - pickup_longitude
    - pickup_latitude
    - dropoff_longitude
    - dropoff_latitude
    - passenger_count

Writing file.yaml


In [12]:
# Reading config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [13]:
config_data['inbound_delimiter']

','

In [14]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'taxi_fare',
 'file_name': 'train',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['key',
  'fare_amount',
  'pickup_datetime',
  'pickup_longitude',
  'pickup_latitude',
  'dropoff_longitude',
  'dropoff_latitude',
  'passenger_count']}

In [15]:
# Reading process of the file using Dask
from dask import dataframe as dd
dask_df = dd.read_csv('train.csv',delimiter=',')
dask_df.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,26:21.0,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,52:16.0,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,35:00.0,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,30:42.0,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,51:00.0,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


In [16]:
import pandas as pd
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,26:21.0,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,52:16.0,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,35:00.0,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,30:42.0,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,51:00.0,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


## 4. Validate number of columns and column name of ingested file with YAML.

In [17]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [18]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['key', 'fare_amount', 'pickup_datetime', 'pickup_longitude',
       'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'passenger_count'],
      dtype='object')
columns of YAML are: ['key', 'fare_amount', 'pickup_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count']


In [19]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


## 5. Write the file in pipe separated text file (|) in gz format.

In [20]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('train.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('train.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:/Users/HP/Documents/Vrtual Internships/Data Glacier/Weeks/Week 6/train.csv.gz\\0.part',
 'C:/Users/HP/Documents/Vrtual Internships/Data Glacier/Weeks/Week 6/train.csv.gz\\1.part']

## 6. Create a summary of the file:

In [21]:
path = 'C:/Users/HP/Documents/Vrtual Internships/Data Glacier/Weeks/New folder (2)/train.csv.gz\\0.part'
size = os.path.getsize(path)

print('File summary:')
print(f'File size: {size} bytes')


File summary:
File size: 7770800 bytes
