# Task: File Ingestion and Schema validation

In [None]:
import os
import time

In [None]:
#Size of the file
os.path.getsize('/content/parking violations.csv')

27262976

# Read in the data with Dask

In [None]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('/content/parking violations.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.06425046920776367 sec


# Read in the data with Pandas

In [None]:
import pandas as pd
start = time.time()
df = pd.read_csv('/content/parking violations.csv')
df.head(2)
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  3.231543779373169 sec


# Read in the data with Modin and Ray

In [None]:
pip install modin

Collecting modin
  Downloading modin-0.30.1-py3-none-any.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m15.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pandas<2.3,>=2.2 (from modin)
  Downloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m67.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pandas, modin
  Attempting uninstall: pandas
    Found existing installation: pandas 2.0.3
    Uninstalling pandas-2.0.3:
      Successfully uninstalled pandas-2.0.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cudf-cu12 24.4.1 requires pandas<2.2.2dev0,>=2.0, but you have pandas 2.2.2 which is incompatible.
google-colab 1.0.0 requires pandas==2.0.3, but you have pandas 2.2.2 wh

In [None]:
!pip install --upgrade pandas



In [None]:
pip install ray

Collecting ray
  Downloading ray-2.24.0-cp310-cp310-manylinux2014_x86_64.whl (65.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.9/65.9 MB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ray
Successfully installed ray-2.24.0


In [None]:
!pip install --upgrade modin



In [None]:
import pandas as pd
import modin.pandas as pd
import ray
import time
ray.shutdown()
ray.init()

start = time.time()
df = pd.read_csv('/content/parking violations.csv')
end = time.time()

print("Read csv with modin and ray: ",(end-start),"sec")

2024-06-12 23:38:09,694	INFO worker.py:1753 -- Started a local Ray instance.


Read csv with modin and ray:  55.29525184631348 sec


Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.


# Here Dask is better than Pandas, Modin and Ray, with the least reading time of 0.0642 sec

In [None]:
from dask import dataframe as dd
df = dd.read_csv('/content/parking violations.csv', delimiter=',')

In [None]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 43 entries, Summons Number to Double Parking Violation
dtypes: float64(7), int64(13), string(23)

In [None]:
df = dd.read_csv('/content/parking violations.csv', delimiter=',', dtype={'Feet From Curb': 'float64',
       'House Number': 'object',
       'Time First Observed': 'object',
       'Vehicle Year': 'float64'})

In [None]:
df = pd.read_csv('/content/parking violations.csv')

In [None]:
#No. of Rows
len(df.index)

3371486

In [None]:
#No, of Columns
len(df.columns)

43

In [None]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

In [None]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [None]:
data=df.columns
data

Index(['SummonsNumber', 'PlateID', 'RegistrationState', 'PlateType',
       'IssueDate', 'ViolationCode', 'VehicleBodyType', 'VehicleMake',
       'IssuingAgency', 'StreetCode1', 'StreetCode2', 'StreetCode3',
       'VehicleExpirationDate', 'ViolationLocation', 'ViolationPrecinct',
       'IssuerPrecinct', 'IssuerCode', 'IssuerCommand', 'IssuerSquad',
       'ViolationTime', 'TimeFirstObserved', 'ViolationCounty',
       'ViolationInFrontOfOrOpposite', 'HouseNumber', 'StreetName',
       'IntersectingStreet', 'DateFirstObserved', 'LawSection', 'SubDivision',
       'ViolationLegalCode', 'DaysParkingInEffect', 'FromHoursInEffect',
       'ToHoursInEffect', 'VehicleColor', 'UnregisteredVehicle?',
       'VehicleYear', 'MeterNumber', 'FeetFromCurb', 'ViolationPostCode',
       'ViolationDescription', 'NoStandingorStoppingViolation',
       'HydrantViolation', 'DoubleParkingViolation'],
      dtype='object')

# Validation

In [None]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [None]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: parking violations
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - SummonsNumber
    - PlateID
    - RegistrationState
    - PlateType
    - IssueDate
    - ViolationCode
    - VehicleBodyType
    - VehicleMake
    - IssuingAgency
    - StreetCode1
    - StreetCode2
    - StreetCode3
    - VehicleExpirationDate
    - ViolationLocation
    - ViolationPrecinct
    - IssuerPrecinct
    - IssuerCode
    - IssuerCommand
    - IssuerSquad
    - ViolationTime
    - TimeFirstObserved
    - ViolationCounty
    - ViolationInFrontOfOrOpposite
    - HouseNumber
    - StreetName
    - IntersectingStreet
    - DateFirstObserved
    - LawSection
    - SubDivision
    - ViolationLegalCode
    - DaysParkingInEffect
    - FromHoursInEffect
    - ToHoursInEffect
    - VehicleColor
    - UnregisteredVehicle?
    - VehicleYear
    - MeterNumber
    - FeetFromCurb
    - ViolationPostCode
    - ViolationDescription
    - NoStandingorStoppingViolation
   -  HydrantViolation
   -  DoubleParkingViolation

Overwriting store.yaml


In [None]:
!pip install PyYAML



In [None]:
import yaml

In [None]:
# Read config file
import utility as util
config_data = util.read_config_file("store.yaml")

In [None]:
config_data['inbound_delimiter']

','

In [None]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'parking violations',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['SummonsNumber',
  'PlateID',
  'RegistrationState',
  'PlateType',
  'IssueDate',
  'ViolationCode',
  'VehicleBodyType',
  'VehicleMake',
  'IssuingAgency',
  'StreetCode1',
  'StreetCode2',
  'StreetCode3',
  'VehicleExpirationDate',
  'ViolationLocation',
  'ViolationPrecinct',
  'IssuerPrecinct',
  'IssuerCode',
  'IssuerCommand',
  'IssuerSquad',
  'ViolationTime',
  'TimeFirstObserved',
  'ViolationCounty',
  'ViolationInFrontOfOrOpposite',
  'HouseNumber',
  'StreetName',
  'IntersectingStreet',
  'DateFirstObserved',
  'LawSection',
  'SubDivision',
  'ViolationLegalCode',
  'DaysParkingInEffect',
  'FromHoursInEffect',
  'ToHoursInEffect',
  'VehicleColor',
  'UnregisteredVehicle?',
  'VehicleYear',
  'MeterNumber',
  'FeetFromCurb',
  'ViolationPostCode',
  'ViolationDescription

In [None]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("/content/parking violations.csv",delimiter=',')
df_sample.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [None]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("/content/parking violations.csv",source_file)
# Read the file using the delimiter character specified in the config file
df = pd.read_csv(source_file, sep=config_data['inbound_delimiter'])
df.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [None]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['violation_time', 'street_code1', 'violation_county', 'street_code3', 'days_parking_in_effect', 'time_first_observed', 'unregistered_vehicle', 'violation_code', 'registration_state', 'issuer_squad', 'to_hours_in_effect', 'sub_division', 'violation_in_front_of_or_opposite', 'violation_location', 'double_parking_violation', 'vehicle_body_type', 'street_name', 'vehicle_expiration_date', 'issuing_agency', 'issuer_command', 'meter_number', 'house_number', 'violation_precinct', 'law_section', 'feet_from_curb', 'from_hours_in_effect', 'vehicle_make', 'issue_date', 'issuer_precinct', 'hydrant_violation', 'plate_id', 'vehicle_color', 'violation_description', 'plate_type', 'summons_number', 'violation_post_code', 'vehicle_year', 'violation_legal_code', 'intersecting_street', 'issuer_code', 'date_first_observed', 'no_standing_or_stopping_violation', 'street_code2']
Following YAML columns are not in th

0

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['summons_number', 'plate_id', 'registration_state', 'plate_type',
       'issue_date', 'violation_code', 'vehicle_body_type', 'vehicle_make',
       'issuing_agency', 'street_code1', 'street_code2', 'street_code3',
       'vehicle_expiration_date', 'violation_location', 'violation_precinct',
       'issuer_precinct', 'issuer_code', 'issuer_command', 'issuer_squad',
       'violation_time', 'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_violation'],
      dtype=

In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['violation_time', 'street_code1', 'violation_county', 'street_code3', 'days_parking_in_effect', 'time_first_observed', 'unregistered_vehicle', 'violation_code', 'registration_state', 'issuer_squad', 'to_hours_in_effect', 'sub_division', 'violation_in_front_of_or_opposite', 'violation_location', 'double_parking_violation', 'vehicle_body_type', 'street_name', 'vehicle_expiration_date', 'issuing_agency', 'issuer_command', 'meter_number', 'house_number', 'violation_precinct', 'law_section', 'feet_from_curb', 'from_hours_in_effect', 'vehicle_make', 'issue_date', 'issuer_precinct', 'hydrant_violation', 'plate_id', 'vehicle_color', 'violation_description', 'plate_type', 'summons_number', 'violation_post_code', 'vehicle_year', 'violation_legal_code', 'intersecting_street', 'issuer_code', 'date_first_observed', 'no_standing_or_stopping_violation', 'street_code2']
Following YAML columns are not in th

In [None]:
pd.read_csv("/content/parking violations.csv")

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001.0,,0.0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001.0,,0.0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004.0,,0.0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007.0,,0.0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007.0,,0.0,31 6,69-Failure to Disp Muni Recpt,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1908011,8473633660,20768JY,NY,COM,04/03/2017,21,VAN,FORD,T,8390,...,WH,,2007.0,,0.0,25-A,21-No Parking (street clean),,,
1908012,7852198790,HDL1802,NY,PAS,03/06/2017,21,4DSD,ME/BE,T,35490,...,BK,,0.0,,0.0,R,21-No Parking (street clean),,,
1908013,4007052529,FPR5896,NY,PAS,03/09/2017,5,4DSD,TOYOT,V,0,...,GY,,2011.0,,0.0,,BUS LANE VIOLATION,,,
1908014,8512661276,14478JY,NY,COM,04/14/2017,46,DELV,FRUEH,T,25390,...,WHITE,,2007.0,,0.0,23,46B-Double Parking (Com-100Ft),,,


In [None]:
df

Unnamed: 0,summons_number,plate_id,registration_state,plate_type,issue_date,violation_code,vehicle_body_type,vehicle_make,issuing_agency,street_code1,...,vehicle_color,unregistered_vehicle,vehicle_year,meter_number,feet_from_curb,violation_post_code,violation_description,no_standing_or_stopping_violation,hydrant_violation,double_parking_violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1262984,7537579817,HFM3753,NY,PAS,11/17/2016,21,SUBN,HONDA,T,88130,...,BR,,2013,,0,11 3,21-No Parking (street clean),,,
1262985,8363523975,DMT8683,NY,PAS,12/06/2016,71,SUBN,INFIN,T,57790,...,BK,,2015,,0,Y 41,71A-Insp Sticker Expired (NYS),,,
1262986,7395477571,83655MG,NY,COM,11/16/2016,84,VAN,FRUEH,T,28490,...,WH,,2016,,0,P 99,84-Platform lifts in low posit,,,
1262987,4633392761,GWV5701,NY,PAS,05/04/2017,36,SUBN,LEXUS,V,0,...,GY,,2015,,0,,PHTO SCHOOL ZN SPEED VIOLATION,,,


In [16]:
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('/content/parking violations.csv', delimiter="\t")

# Write csv in gz format in pipe separated text file (|)
df.to_csv('parking violations.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True)

['/content/parking violations.csv.gz/0.part',
 '/content/parking violations.csv.gz/1.part']

In [18]:
import os
# Get file summary
file_size = os.path.getsize('parking violations.csv.gz')
num_rows = len(df)
num_cols = len(df.columns)

# Print file summary
print("File summary:")
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

File summary:
Number of rows: 975696
Number of columns: 1
File size: 4096 bytes
