# Data Ingestion: 

In [1]:
%%writefile testutility.py

# Writing Utility Functions to be used later: 

import yaml

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def validate_columns(df, table_config): 
    df.columns = df.columns.str.replace(' ', '')
    df_columns = list(df.columns)
    table_config['columns'] = list(map(lambda x: x.replace(' ',''), table_config['columns']))
    
    df_columns.sort()
    table_config['columns'].sort()
    
    if df_columns == table_config['columns']: 
        print('Column name and length validation successful!')
        return 1
    else: 
        print('Column name and length validation failed!')
        mismatch_yaml = list(set(df_columns).difference(table_config['columns']))
        print('These file columns are not in the YAML file:', mismatch_yaml)
        mismatch_file = list(set(table_config['columns']).difference(df_columns))
        print('These YAML columns are not in the file uploaded:', mismatch_file)
        return 0

Overwriting testutility.py


# Write YAML File: 

In [2]:
%%writefile file.yaml

# YAML File: 

file_type: csv
dataset_name: testfile
file_name: Parking_Violations_Issued_-_Fiscal_Year_2017
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 0
columns: 
    - Summons Number
    - Plate ID
    - Registration State
    - Plate Type
    - Issue Date
    - Violation Code
    - Vehicle Body Type
    - Vehicle Make
    - Issuing Agency 
    - Street Code1
    - Street Code2
    - Street Code3
    - Vehicle Expiration Date
    - Violation Location 
    - Violation Precinct
    - Issuer Precinct
    - Issuer Code
    - Issuer Command 
    - Issuer Squad
    - Violation Time 
    - Time First Observed 
    - Violation County 
    - Violation In Front Of Or Opposite
    - House Number 
    - Street Name 
    - Intersecting Street 
    - Date First Observed 
    - Law Section 
    - Sub Division 
    - Violation Legal Code
    - Days Parking In Effect 
    - From Hours In Effect 
    - To Hours In Effect
    - Vehicle Color 
    - Unregistered Vehicle? 
    - Vehicle Year
    - Meter Number 
    - Feet From Curb 
    - Violation Post Code
    - Violation Description 
    - No Standing or Stopping Violation 
    - Hydrant Violation 
    - Double Parking Violation

Overwriting file.yaml


In [3]:
import testutility as util
# Reading YAML file: 
config_data = util.read_config_file('file.yaml')

In [4]:
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2017',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 0,
 'columns': ['Summons Number',
  'Plate ID',
  'Registration State',
  'Plate Type',
  'Issue Date',
  'Violation Code',
  'Vehicle Body Type',
  'Vehicle Make',
  'Issuing Agency',
  'Street Code1',
  'Street Code2',
  'Street Code3',
  'Vehicle Expiration Date',
  'Violation Location',
  'Violation Precinct',
  'Issuer Precinct',
  'Issuer Code',
  'Issuer Command',
  'Issuer Squad',
  'Violation Time',
  'Time First Observed',
  'Violation County',
  'Violation In Front Of Or Opposite',
  'House Number',
  'Street Name',
  'Intersecting Street',
  'Date First Observed',
  'Law Section',
  'Sub Division',
  'Violation Legal Code',
  'Days Parking In Effect',
  'From Hours In Effect',
  'To Hours In Effect',
  'Vehicle Color',
  'Unregistered Vehicle?',
  'Vehicle Year',


In [5]:
# READING DATA USING PANDAS
# UPLOAD TIME: 2m 36s
import pandas as pd
pd_df = pd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', delimiter = ',')
pd_df.head()

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [7]:
# READING DATA USING DASK 
# UPLOAD TIME: 6s
import dask.dataframe as dd
dd_df = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', delimiter = ',')
dd_df.head()

# Dask performs much faster than pandas does!

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [8]:
# Reading data using YAML file and Dask: 

file_type = config_data['file_type']
source_file = config_data['file_name'] + f'.{file_type}'
print(source_file)
df_config = dd.read_csv(source_file, delimiter = config_data['inbound_delimiter'])
df_config.head()

Parking_Violations_Issued_-_Fiscal_Year_2017.csv


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [9]:
import testutility as util
# Validating column names and length 
util.validate_columns(dd_df, config_data)

Column name and length validation successful!


1

In [12]:
# Saving data as a pipe separated gz file: 
df_config.to_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.gz', sep = '|', compression = 'gzip')

  result = (True, func(*args, **kwds))


['/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/00.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/01.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/02.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/03.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/04.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/05.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/06.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fiscal_Year_2017.gz/07.part',
 '/Users/alexlindberg/Desktop/DataGlacier/FileIngestion/Parking_Violations_Issued_-_Fisc

In [None]:
# Summary of Data: 
# Size: 678.5 MB
# Rows: 3059048
# Columns: 43
