## **Data Ingestion and Schema Validation**

Hneri Edwards - 
LISUM10: 30

### Dataset Description

- Data is sourced from Kaggle - **NYC Parking Violation Tickets**
- Data was produced by NYC Department of Finance.
- Columns include information about the vehicle ticketed, the ticket issued, location and time.

### Table of Contents

1. Reading the Data
2. Utilities
3. YAML file
4. Validation
5. Export as gzip

In [None]:
# necessary packages

import gzip
import os
import time
import pandas as pd
import modin.pandas as mpd
import dask.dataframe as dd
import vaex
import ray

In [None]:
# import data using Pandas

start_time = time.time()
df = pd.read_csv('tickets/nyc_tickets.csv')
print("--- %s seconds ---" % (time.time() - start_time))



--- 80.33357524871826 seconds ---


In [None]:
# import data using Dask

start_time = time.time()
df = dd.read_csv('tickets/nyc_tickets.csv', dtype={'House Number': 'object',
       'Issuer Command': 'object',
       'Issuer Squad': 'object',
       'Time First Observed': 'object',
       'Unregistered Vehicle?': 'float64',
       'Violation Description': 'object',
       'Violation Legal Code': 'object',
       'Violation Location': 'float64',
       'Violation Post Code': 'object',
       'Date First Observed': 'float64',
       'Feet From Curb': 'float64',
       'Law Section': 'float64',
       'Vehicle Year': 'float64'})
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.1541757583618164 seconds ---


In [None]:
# import data using modin

start_time = time.time()
df = mpd.read_csv('tickets/nyc_tickets.csv')
print("--- %s seconds ---" % (time.time() - start_time))


    import ray
    ray.init()

[2m[36m(raylet)[0m Spilled 2515 MiB, 3 objects, write throughput 276 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


--- 145.39502716064453 seconds ---


Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.


Dax has the fastest computational runtime which is under a second for data around 2gb

In [None]:
# import necessary packages

%%writefile utility.py
import logging
import os
import time
import modin.pandas as mpd
import dask.dataframe as dd
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def check_header(header, df):
    return df.reindex(header, axis=1, fill_value=0)

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: nyc_tickets
file_name: nyc_tickets
table_name: tickets
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Summons Number
    - Plate ID
    - Registration State
    - Plate Type
    - Issue Date
    - Violation Code
    - Vehicle Body Type
    - Vehicle Make
    - Issuing Agency
    - Street Code1
    - Street Code2
    - Street Code3
    - Vehicle Expiration Date
    - Violation Location
    - Violation Precinct
    - Issuer Precinct
    - Issuer Code
    - Issuer Command
    - Issuer Squad
    - Violation Time
    - Time First Observed
    - Violation County
    - Violation In Front Of Or Opposite
    - House Number
    - Street Name
    - Intersecting Street
    - Date First Observed
    - Law Section
    - Sub Division
    - Violation Legal Code
    - Days Parking In Effect
    - From Hours In Effect
    - To Hours In Effect
    - Vehicle Color
    - Unregistered Vehicle?
    - Vehicle Year
    - Meter Number
    - Feet From Curb
    - Violation Post Code
    - Violation Description
    - No Standing or Stopping Violation
    - Hydrant Violation
    - Double Parking Violation
    - Latitude
    - Longitude
    - Community Board
    - Community Council
    - Census Tract
    - BIN
    - BBL
    - NTA

Overwriting file.yaml


In [None]:
# Read config file
import utility as util
config_data = util.read_config_file("file.yaml")

In [None]:
config_data['inbound_delimiter']

','

In [None]:
# inspecting data of config file
config_data

{'columns': ['Summons Number',
  'Plate ID',
  'Registration State',
  'Plate Type',
  'Issue Date',
  'Violation Code',
  'Vehicle Body Type',
  'Vehicle Make',
  'Issuing Agency',
  'Street Code1',
  'Street Code2',
  'Street Code3',
  'Vehicle Expiration Date',
  'Violation Location',
  'Violation Precinct',
  'Issuer Precinct',
  'Issuer Code',
  'Issuer Command',
  'Issuer Squad',
  'Violation Time',
  'Time First Observed',
  'Violation County',
  'Violation In Front Of Or Opposite',
  'House Number',
  'Street Name',
  'Intersecting Street',
  'Date First Observed',
  'Law Section',
  'Sub Division',
  'Violation Legal Code',
  'Days Parking In Effect',
  'From Hours In Effect',
  'To Hours In Effect',
  'Vehicle Color',
  'Unregistered Vehicle?',
  'Vehicle Year',
  'Meter Number',
  'Feet From Curb',
  'Violation Post Code',
  'Violation Description',
  'No Standing or Stopping Violation',
  'Hydrant Violation',
  'Double Parking Violation',
  'Latitude',
  'Longitude',
  'Com

In [None]:
# validation

In [None]:
# validate the header of the file

util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['issuer_command', 'violation_post_code', 'date_first_observed', 'summons_number', 'vehicle_body_type', 'street_code3', 'to_hours_in_effect', 'sub_division', 'violation_location', 'feet_from_curb', 'intersecting_street', 'violation_description', 'vehicle_make', 'census_tract', 'unregistered_vehicle', 'registration_state', 'violation_precinct', 'plate_id', 'from_hours_in_effect', 'no_standing_or_stopping_violation', 'violation_time', 'issuer_precinct', 'law_section', 'issuing_agency', 'issue_date', 'violation_county', 'vehicle_color', 'violation_in_front_of_or_opposite', 'double_parking_violation', 'issuer_squad', 'time_first_observed', 'days_parking_in_effect', 'street_name', 'street_code1', 'house_number', 'vehicle_year', 'violation_legal_code', 'hydrant_violation', 'meter_number', 'violation_code', 'community_board', 'community_council', 'vehicle_expiration_date', 'issuer_code', 'street_co

0

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['summons_number', 'plate_id', 'registration_state', 'plate_type',
       'issue_date', 'violation_code', 'vehicle_body_type', 'vehicle_make',
       'issuing_agency', 'street_code1', 'street_code2', 'street_code3',
       'vehicle_expiration_date', 'violation_location', 'violation_precinct',
       'issuer_precinct', 'issuer_code', 'issuer_command', 'issuer_squad',
       'violation_time', 'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_violation', 'latitude', '

In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['issuer_command', 'violation_post_code', 'date_first_observed', 'summons_number', 'vehicle_body_type', 'street_code3', 'to_hours_in_effect', 'sub_division', 'violation_location', 'feet_from_curb', 'intersecting_street', 'violation_description', 'vehicle_make', 'census_tract', 'unregistered_vehicle', 'registration_state', 'violation_precinct', 'plate_id', 'from_hours_in_effect', 'no_standing_or_stopping_violation', 'violation_time', 'issuer_precinct', 'law_section', 'issuing_agency', 'issue_date', 'violation_county', 'vehicle_color', 'violation_in_front_of_or_opposite', 'double_parking_violation', 'issuer_squad', 'time_first_observed', 'days_parking_in_effect', 'street_name', 'street_code1', 'house_number', 'vehicle_year', 'violation_legal_code', 'hydrant_violation', 'meter_number', 'violation_code', 'community_board', 'community_council', 'vehicle_expiration_date', 'issuer_code', 'street_co

In [None]:
# Write csv in gz format in pipe separated text file (|)
df.to_csv("nyc_tickets.csv.gz", 
           index=False,
           sep='|', 
           compression="gzip")

In [None]:
# File summary

print("Total Rows = " + str(df.shape[0]) + " and Total Columns = " + str(df.shape[1]))
print("File size = " + str(os.path.getsize('nyc_tickets.csv.gz')))

Total Rows = 9100278 and Total Columns = 51
File size = 365104705
