In [1]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import dask.dataframe as dd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath): #deserialization from yaml file
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.replace(' ', '')#removing the white spaces in the column names
    df.columns = df.columns.str.replace('[#,@,&,?]', '') #removing special characters
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: ddfile
file_name: Parking_Violations_Issued_-_Fiscal_Year_2015
file_path: D:\Purity\DataGlacier Internship\week6\Parking_Violations_Issued_-_Fiscal_Year_2015.csv
inbound_delimiter: ","
outbound_delimiter: "|"
columns: 
    - summonsnumber
    - plateid
    - registrationstate
    - platetype
    - issuedate
    - violationcode
    - vehiclebodytype
    - vehiclemake
    - IssuingAgency
    - StreetCode1
    - StreetCode2
    - StreetCode3
    - VehicleExpirationDate
    - ViolationLocation
    - ViolationPrecinct
    - IssuerPrecinct
    - IssuerCode
    - IssuerCommand
    - IssuerSquad
    - ViolationTime
    - TimeFirstObserved
    - ViolationCounty
    - ViolationInFrontOfOrOpposite
    - HouseNumber
    - StreetName
    - IntersectingStreet
    - DateFirstObserved
    - LawSection
    - SubDivision
    - ViolationLegalCode
    - DaysParkingInEffect
    - FromHoursInEffect
    - ToHoursInEffect
    - VehicleColor
    - UnregisteredVehicle
    - VehicleYear
    - MeterNumber
    - FeetFromCurb
    - ViolationPostCode
    - ViolationDescription
    - NoStandingorStoppingViolation
    - HydrantViolation
    - DoubleParkingViolation
    - Latitude
    - Longitude
    - CommunityBoard
    - CommunityCouncil
    - CensusTract
    - BIN
    - BBL
    - NTA

Overwriting file.yaml


In [3]:
# Reading config file
import utility as util
config_data = util.read_config_file("file.yaml")

In [4]:
config_data['inbound_delimiter']

','

In [5]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'ddfile',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2015',
 'file_path': 'D:\\Purity\\DataGlacier Internship\\week6\\Parking_Violations_Issued_-_Fiscal_Year_2015.csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns': ['summonsnumber',
  'plateid',
  'registrationstate',
  'platetype',
  'issuedate',
  'violationcode',
  'vehiclebodytype',
  'vehiclemake',
  'IssuingAgency',
  'StreetCode1',
  'StreetCode2',
  'StreetCode3',
  'VehicleExpirationDate',
  'ViolationLocation',
  'ViolationPrecinct',
  'IssuerPrecinct',
  'IssuerCode',
  'IssuerCommand',
  'IssuerSquad',
  'ViolationTime',
  'TimeFirstObserved',
  'ViolationCounty',
  'ViolationInFrontOfOrOpposite',
  'HouseNumber',
  'StreetName',
  'IntersectingStreet',
  'DateFirstObserved',
  'LawSection',
  'SubDivision',
  'ViolationLegalCode',
  'DaysParkingInEffect',
  'FromHoursInEffect',
  'ToHoursInEffect',
  'VehicleColor',
  'UnregisteredVehicle',
  'VehicleYe

In [6]:
#reading file in csv using config file
import dask.dataframe as dd
path = r'' + f"{config_data['file_path']}"
csvdf = dd.read_csv(path, delimiter=config_data['inbound_delimiter'])
csvdf.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,8002531292,EPC5238,NY,PAS,10/01/2014,21,SUBN,CHEVR,T,20390,...,,,,,,,,,,
1,8015318440,5298MD,NY,COM,03/06/2015,14,VAN,FRUEH,T,27790,...,,,,,,,,,,
2,7611181981,FYW2775,NY,PAS,07/28/2014,46,SUBN,SUBAR,T,8130,...,,,,,,,,,,
3,7445908067,GWE1987,NY,PAS,04/13/2015,19,4DSD,LEXUS,T,59990,...,,,,,,,,,,
4,7037692864,T671196C,NY,PAS,05/19/2015,19,4DSD,CHRYS,T,36090,...,,,,,,,,,,


In [7]:
#shape of csv file
csvdf.shape

(Delayed('int-27b0c1d7-f984-44de-825a-2c05cc2058d8'), 51)

In [8]:
#validate the header of the ingested csv file
util.col_header_val(csvdf,config_data)

column name and column length validation passed


1

In [9]:
#reading file in pipe separated format using config file

pipe_df = dd.read_csv(path, delimiter=config_data['outbound_delimiter'])
pipe_df.head()

Unnamed: 0,"Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Violation Location,Violation Precinct,Issuer Precinct,Issuer Code,Issuer Command,Issuer Squad,Violation Time,Time First Observed,Violation County,Violation In Front Of Or Opposite,House Number,Street Name,Intersecting Street,Date First Observed,Law Section,Sub Division,Violation Legal Code,Days Parking In Effect ,From Hours In Effect,To Hours In Effect,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council ,Census Tract,BIN,BBL,NTA"
0,"8002531292,EPC5238,NY,PAS,10/01/2014,21,SUBN,C..."
1,"8015318440,5298MD,NY,COM,03/06/2015,14,VAN,FRU..."
2,"7611181981,FYW2775,NY,PAS,07/28/2014,46,SUBN,S..."
3,"7445908067,GWE1987,NY,PAS,04/13/2015,19,4DSD,L..."
4,"7037692864,T671196C,NY,PAS,05/19/2015,19,4DSD,..."


In [10]:
#shape of pipe separated file
pipe_df.shape

(Delayed('int-7712dd25-ba4a-4d52-8bf6-ad2d8d32c7cf'), 1)

In [11]:
#creating a gz file
pipe_df.to_csv(r'D:\Purity\DataGlacier Internship\week6\file.gz', single_file=True, compression='gzip')

['D:/Purity/DataGlacier Internship/week6/file.gz']