# Data Ingestion code

## Write a utility file

In [1]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    Standardizes the column names.
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(r'[^\w\s]+', '', regex=True)
    df.columns = df.columns.str.replace('\s', '_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    # for Dask DataFrame
    df = df[list(sorted(df.columns))]
    # for Pandas DataFrmae
    # df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("Column name and column length validation passed.")
        return (1, df)
    else:
        print("Column name and column length validation failed.")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file:",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded:",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'Expected columns: {expected_col}')
        return (0, df)

Overwriting utility.py


File address: https://www.kaggle.com/datasets/new-york-city/nyc-parking-tickets?select=Parking_Violations_Issued_-_Fiscal_Year_2017.csv

## Write an YAML file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: parknig_data
file_name: Parking_Violations_Issued_-_Fiscal_Year_2017
table_name: Parking_Violations_Issued_-_Fiscal_Year_2017
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - summons_number
    - plate_id
    - registration_state
    - plate_type
    - issue_date
    - violation_code
    - vehicle_body_type
    - vehicle_make
    - issuing_agency
    - street_code1
    - street_code2
    - street_code3
    - vehicle_expiration_date
    - violation_location
    - violation_precinct
    - issuer_precinct
    - issuer_code
    - issuer_command
    - issuer_squad
    - violation_time
    - time_first_observed
    - violation_county
    - violation_in_front_of_or_opposite
    - house_number
    - street_name
    - intersecting_street
    - date_first_observed
    - law_section
    - sub_division
    - violation_legal_code
    - days_parking_in_effect
    - from_hours_in_effect
    - to_hours_in_effect
    - vehicle_color
    - unregistered_vehicle
    - vehicle_year
    - meter_number
    - feet_from_curb
    - violation_post_code
    - violation_description
    - no_standing_or_stopping_violation
    - hydrant_violation
    - double_parking_violation


Overwriting file.yaml


In [3]:
# Read config file
import utility as util
config_data = util.read_config_file("file.yaml")

In [4]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'parknig_data',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2017',
 'table_name': 'Parking_Violations_Issued_-_Fiscal_Year_2017',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['summons_number',
  'plate_id',
  'registration_state',
  'plate_type',
  'issue_date',
  'violation_code',
  'vehicle_body_type',
  'vehicle_make',
  'issuing_agency',
  'street_code1',
  'street_code2',
  'street_code3',
  'vehicle_expiration_date',
  'violation_location',
  'violation_precinct',
  'issuer_precinct',
  'issuer_code',
  'issuer_command',
  'issuer_squad',
  'violation_time',
  'time_first_observed',
  'violation_county',
  'violation_in_front_of_or_opposite',
  'house_number',
  'street_name',
  'intersecting_street',
  'date_first_observed',
  'law_section',
  'sub_division',
  'violation_legal_code',
  'days_parking_in_effect',
  'from_hours_in_effect',
  'to_hours_in_effect',
  'vehicle_color',
  '

# Reading the file

In [5]:
import time
import pandas as pd
import ray
import modin.pandas as mpd
from dask import dataframe as dd
import os

## Read in the file using Pandas

In [6]:
start = time.time()
pd_df = pd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv')
end = time.time()
print("Read the file with pandas: ",(end-start),"seconds")



Read the file with pandas:  45.911070108413696 seconds


## Read in the file using Modin and Ray

In [7]:
if not ray.is_initialized():
    ray.init()
start = time.time()
ray_df = mpd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv')
end = time.time()
print("Read the file with Modin and Ray:", (end - start), "seconds")
ray.shutdown()

2024-03-13 21:31:43,250	INFO worker.py:1724 -- Started a local Ray instance.
[33m(raylet)[0m [2024-03-13 21:35:44,990 E 5948 9948] (raylet.exe) worker_pool.cc:553: Some workers of the worker process(13740) have not registered within the timeout. The process is still alive, probably it's hanging during start.
[33m(raylet)[0m [2024-03-13 21:38:56,113 E 5948 9948] (raylet.exe) worker_pool.cc:553: Some workers of the worker process(12064) have not registered within the timeout. The process is still alive, probably it's hanging during start.[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m


Read the file with Modin and Ray: 542.6628022193909 seconds


[33m(raylet)[0m [2024-03-13 21:40:42,885 E 5948 9948] (raylet.exe) worker_pool.cc:553: Some workers of the worker process(13852) have not registered within the timeout. The process is still alive, probably it's hanging during start.[32m [repeated 3x across cluster][0m


## Read in the file using Dask

In [8]:
start = time.time()
dask_df = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', 
                      dtype={'House Number': 'object', 'Time First Observed': 'object'}, low_memory=False)
end = time.time()
print("Read the file with dask: ",(end-start),"seconds")

Read the file with dask:  0.6460757255554199 seconds


Conclusion: Dask is unquestionably a leader when it comes to efficiently working with large files.

## Read in the file using config file

In [9]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file, sep=config_data['inbound_delimiter'])
df.head()



Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


## Validate the header of the file

In [10]:
result_df = util.col_header_val(dask_df,config_data)[1]

Column name and column length validation passed.


In [11]:
result_df.columns

Index(['date_first_observed', 'days_parking_in_effect',
       'double_parking_violation', 'feet_from_curb', 'from_hours_in_effect',
       'house_number', 'hydrant_violation', 'intersecting_street',
       'issue_date', 'issuer_code', 'issuer_command', 'issuer_precinct',
       'issuer_squad', 'issuing_agency', 'law_section', 'meter_number',
       'no_standing_or_stopping_violation', 'plate_id', 'plate_type',
       'registration_state', 'street_code1', 'street_code2', 'street_code3',
       'street_name', 'sub_division', 'summons_number', 'time_first_observed',
       'to_hours_in_effect', 'unregistered_vehicle', 'vehicle_body_type',
       'vehicle_color', 'vehicle_expiration_date', 'vehicle_make',
       'vehicle_year', 'violation_code', 'violation_county',
       'violation_description', 'violation_in_front_of_or_opposite',
       'violation_legal_code', 'violation_location', 'violation_post_code',
       'violation_precinct', 'violation_time'],
      dtype='object')

In [12]:
if util.col_header_val(dask_df,config_data)[0]==0:
    print("Validation failed.")
    # write code to reject the file
else:
    print("Validation passed.")
    subset_df = result_df.head(1000)
    subset_df.to_csv('output_file.csv.gz', sep='|', compression='gzip', index=False)

Column name and column length validation passed.
Validation passed.


## Summary

In [16]:
print(f"Total number of rows: {len(dask_df)}")
print(f"Total number of columns: {len(dask_df.columns)}")
print(f"Size of the short version of the file: {os.path.getsize('output_file.csv.gz')} bytes")

Total number of rows: 10803028
Total number of columns: 43
Size of the short version of the file: 64006 bytes
