In [7]:
%%writefile ingestion.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting ingestion.py


In [8]:
%%writefile file.yaml
file_type: csv
dataset_name: NYC parking tickets
file_name: Parking_Violations_Issued_-_Fiscal_Year_2015
table_name: Parking_Violations_Issued_-_Fiscal_Year_2015
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Number
    - City
    - Gender
    - Age
    - Income

Overwriting file.yaml


In [9]:
import ingestion as ing
config_data = ing.read_config_file("file.yaml")

In [10]:
config_data

{'file_type': 'csv',
 'dataset_name': 'NYC parking tickets',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2015',
 'table_name': 'Parking_Violations_Issued_-_Fiscal_Year_2015',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Number', 'City', 'Gender', 'Age', 'Income']}

In [23]:
from dask import dataframe as ddf
df_sample = ddf.read_csv("Parking_Violations_Issued_-_Fiscal_Year_2015.csv",delimiter=',',dtype={'Meter Number': 'object',
       'Time First Observed': 'object',
       'Violation Location': 'float64'})
df_sample.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,8002531292,EPC5238,NY,PAS,10/01/2014,21,SUBN,CHEVR,T,20390,...,,,,,,,,,,
1,8015318440,5298MD,NY,COM,03/06/2015,14,VAN,FRUEH,T,27790,...,,,,,,,,,,
2,7611181981,FYW2775,NY,PAS,07/28/2014,46,SUBN,SUBAR,T,8130,...,,,,,,,,,,
3,7445908067,GWE1987,NY,PAS,04/13/2015,19,4DSD,LEXUS,T,59990,...,,,,,,,,,,
4,7037692864,T671196C,NY,PAS,05/19/2015,19,4DSD,CHRYS,T,36090,...,,,,,,,,,,


In [24]:
import pandas as pd
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,8002531292,EPC5238,NY,PAS,10/01/2014,21,SUBN,CHEVR,T,20390,...,,,,,,,,,,
1,8015318440,5298MD,NY,COM,03/06/2015,14,VAN,FRUEH,T,27790,...,,,,,,,,,,
2,7611181981,FYW2775,NY,PAS,07/28/2014,46,SUBN,SUBAR,T,8130,...,,,,,,,,,,
3,7445908067,GWE1987,NY,PAS,04/13/2015,19,4DSD,LEXUS,T,59990,...,,,,,,,,,,
4,7037692864,T671196C,NY,PAS,05/19/2015,19,4DSD,CHRYS,T,36090,...,,,,,,,,,,


In [25]:
ing.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['issuer_command', 'issue_date', 'violation_post_code', 'violation_precinct', 'violation_time', 'sub_division', 'street_code2', 'street_code1', 'violation_location', 'feet_from_curb', 'house_number', 'street_code3', 'summons_number', 'vehicle_expiration_date', 'latitude', 'community_board', 'bin', 'nta', 'law_section', 'time_first_observed', 'violation_in_front_of_or_opposite', 'street_name', 'registration_state', 'vehicle_body_type', 'violation_legal_code', 'hydrant_violation', 'census_tract', 'issuer_squad', 'date_first_observed', 'violation_code', 'to_hours_in_effect', 'violation_description', 'plate_id', 'meter_number', 'violation_county', 'community_council', 'no_standing_or_stopping_violation', 'issuing_agency', 'issuer_code', 'issuer_precinct', 'bbl', 'plate_type', 'double_parking_violation', 'unregistered_vehicle', 'vehicle_color', 'intersecting_street', 'vehicle_year', 'from_hours_i

0

In [26]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['summons_number', 'plate_id', 'registration_state', 'plate_type',
       'issue_date', 'violation_code', 'vehicle_body_type', 'vehicle_make',
       'issuing_agency', 'street_code1', 'street_code2', 'street_code3',
       'vehicle_expiration_date', 'violation_location', 'violation_precinct',
       'issuer_precinct', 'issuer_code', 'issuer_command', 'issuer_squad',
       'violation_time', 'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_violation', 'latitude', '

In [27]:
if ing.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['issuer_command', 'issue_date', 'violation_post_code', 'violation_precinct', 'violation_time', 'sub_division', 'street_code2', 'street_code1', 'violation_location', 'feet_from_curb', 'house_number', 'street_code3', 'summons_number', 'vehicle_expiration_date', 'latitude', 'community_board', 'bin', 'nta', 'law_section', 'time_first_observed', 'violation_in_front_of_or_opposite', 'street_name', 'registration_state', 'vehicle_body_type', 'violation_legal_code', 'hydrant_violation', 'census_tract', 'issuer_squad', 'date_first_observed', 'violation_code', 'to_hours_in_effect', 'violation_description', 'plate_id', 'meter_number', 'violation_county', 'community_council', 'no_standing_or_stopping_violation', 'issuing_agency', 'issuer_code', 'issuer_precinct', 'bbl', 'plate_type', 'double_parking_violation', 'unregistered_vehicle', 'vehicle_color', 'intersecting_street', 'vehicle_year', 'from_hours_i

In [28]:
import os
os.path.getsize('Parking_Violations_Issued_-_Fiscal_Year_2015.csv')

2864071408

In [31]:
import gzip
import os
content1 = "Total number of rows:" + " " + str(df.shape[0]) 
content2 = "total number of columns:" + " " +  str(len(df.columns)) 
content3 = "file size:" + " " + str(os.path.getsize('Parking_Violations_Issued_-_Fiscal_Year_2015.csv')) + " " + "bytes" 
with gzip.open('file1.gz', 'wt') as f:
    f.write(content1 + '\n')
    f.write(content2 + '\n')
    f.write(content3 + '\n')