**Dataingestion, Google colab and Yaml:**

Data ingestion is performed for large csv file size>2GB downloaded from Kaggle using Google Colab and Yaml schema. The test data is also created and saved in gz format. How to read the rows, columns and filesize is also presented in this notebook.

In [None]:
from google.colab import drive

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting testutility.py


In [None]:
%%writefile parkinglot.yaml
file_type: csv
dataset_name: parkingviolation
file_name: parking_violation
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Plate_ID
    - Plate_Type
    - Violation

Overwriting parkinglot.yaml


In [None]:
import testutility as util
config_data = util.read_config_file("parkinglot.yaml")

In [None]:
config_data['inbound_delimiter']

','

In [None]:
config_data

{'columns': ['Plate_ID', 'Plate_Type', 'Violation'],
 'dataset_name': 'parkingviolation',
 'file_name': 'parking_violation',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

In [None]:
config_data

{'columns': ['Plate_ID', 'Plate_Type', 'Violation'],
 'dataset_name': 'parkingviolation',
 'file_name': 'parking_violation',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

In [None]:
import pandas as pd
df=pd.read_csv('/content/drive/MyDrive/dataglacier/parking_violation.csv',delimiter=',',chunksize=10000)

In [None]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print("",source_file)

 ./parking_violation.csv


In [None]:
file_type = config_data['file_type']
source_file = "/content/drive/MyDrive/dataglacier/" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'],nrows=10000)

In [None]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['hydrant_violation', 'house_number', 'issuer_squad', 'registration_state', 'intersecting_street', 'vehicle_body_type', 'violation_location', 'bbl', 'date_first_observed', 'month', 'street_code3', 'time_first_observed', 'violation_in_front_of_or_opposite', 'year', 'from_hours_in_effect', 'vehicle_expiration_date', 'street_code1', 'summons_number', 'vehicle_make', 'violation_post_code', 'bin', 'street_name', 'unregistered_vehicle', 'issue_date', 'community_council', 'meter_number', 'unnamed_0', 'violation_description', 'to_hours_in_effect', 'longitude', 'days_parking_in_effect', 'vehicle_color', 'issuer_command', 'violation_legal_code', 'sub_division', 'violation_county', 'feet_from_curb', 'nta', 'community_board', 'issuer_precinct', 'vehicle_year', 'violation_code', 'violation_precinct', 'issuer_code', 'no_standing_or_stopping_violation', 'latitude', 'law_section', 'violation_time', 'census_

0

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['unnamed_0', 'summons_number', 'plate_id', 'registration_state',
       'plate_type', 'issue_date', 'violation_code', 'vehicle_body_type',
       'vehicle_make', 'issuing_agency', 'street_code1', 'street_code2',
       'street_code3', 'vehicle_expiration_date', 'violation_location',
       'violation_precinct', 'issuer_precinct', 'issuer_code',
       'issuer_command', 'issuer_squad', 'violation_time',
       'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_viola

In [None]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['hydrant_violation', 'house_number', 'issuer_squad', 'registration_state', 'intersecting_street', 'vehicle_body_type', 'violation_location', 'bbl', 'date_first_observed', 'month', 'street_code3', 'time_first_observed', 'violation_in_front_of_or_opposite', 'year', 'from_hours_in_effect', 'vehicle_expiration_date', 'street_code1', 'summons_number', 'vehicle_make', 'violation_post_code', 'bin', 'street_name', 'unregistered_vehicle', 'issue_date', 'community_council', 'meter_number', 'unnamed_0', 'violation_description', 'to_hours_in_effect', 'longitude', 'days_parking_in_effect', 'vehicle_color', 'issuer_command', 'violation_legal_code', 'sub_division', 'violation_county', 'feet_from_curb', 'nta', 'community_board', 'issuer_precinct', 'vehicle_year', 'violation_code', 'violation_precinct', 'issuer_code', 'no_standing_or_stopping_violation', 'latitude', 'law_section', 'violation_time', 'census_

0

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['unnamed_0', 'summons_number', 'plate_id', 'registration_state',
       'plate_type', 'issue_date', 'violation_code', 'vehicle_body_type',
       'vehicle_make', 'issuing_agency', 'street_code1', 'street_code2',
       'street_code3', 'vehicle_expiration_date', 'violation_location',
       'violation_precinct', 'issuer_precinct', 'issuer_code',
       'issuer_command', 'issuer_squad', 'violation_time',
       'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_viola

In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['hydrant_violation', 'house_number', 'issuer_squad', 'registration_state', 'intersecting_street', 'vehicle_body_type', 'violation_location', 'bbl', 'date_first_observed', 'month', 'street_code3', 'time_first_observed', 'violation_in_front_of_or_opposite', 'year', 'from_hours_in_effect', 'vehicle_expiration_date', 'street_code1', 'summons_number', 'vehicle_make', 'violation_post_code', 'bin', 'street_name', 'unregistered_vehicle', 'issue_date', 'community_council', 'meter_number', 'unnamed_0', 'violation_description', 'to_hours_in_effect', 'longitude', 'days_parking_in_effect', 'vehicle_color', 'issuer_command', 'violation_legal_code', 'sub_division', 'violation_county', 'feet_from_curb', 'nta', 'community_board', 'issuer_precinct', 'vehicle_year', 'violation_code', 'violation_precinct', 'issuer_code', 'no_standing_or_stopping_violation', 'latitude', 'law_section', 'violation_time', 'census_

In [None]:
testdata = {
    'Plate_ID' : ['GBB9093', '78755JZ', 'T60DAR','GBH9379'],
    'Plate_Type' : ['PAS', 'COM', 'COM','PAS'],
    'Violation' : [46, 14,24,46]
}
import pandas as pd
df = pd.DataFrame(testdata, columns=['Plate_ID', 'Plate_Type','Violation'])
df.to_csv("test_data.csv",index=False)

In [None]:
df

Unnamed: 0,Plate_ID,Plate_Type,Violation
0,GBB9093,PAS,46
1,78755JZ,COM,14
2,T60DAR,COM,24
3,GBH9379,PAS,46


In [None]:
import gzip

In [None]:
df.to_csv(r'/content/drive/MyDrive/dataglacier/testviolation.gz',sep = '|', index=False,compression='gzip')

In [None]:
Size of the file:

In [None]:
import os
import struct

with open(r'/content/drive/MyDrive/dataglacier/testviolation.gz',"rb") as f:
    f.seek(-4, os.SEEK_END)
    size, = struct.unpack("<I", f.read(4))
    print (size)
        

89


No of Rows:

In [None]:
myfile=r'/content/drive/MyDrive/dataglacier/testviolation.gz'
with gzip.open(myfile, 'rb') as f:
    for i, l in enumerate(f):
        pass
print("File {1} contain {0} lines".format(i + 1, myfile))
with gzip.open(myfile, 'rb') as f:
    for i, l in enumerate(f):
        pass
print("File {1} contain {0} lines".format(i + 1, myfile))

File /content/drive/MyDrive/dataglacier/testviolation.gz contain 5 lines
File /content/drive/MyDrive/dataglacier/testviolation.gz contain 5 lines


No of Columns:

In [None]:
import csv

with gzip.open(myfile, 'rt') as gzf:
    reader = csv.reader(gzf, dialect=csv.excel_tab)
    print(len(next(reader)))

1
