Parameterization for file reading from data lake

Utility file with all the important functions such as opening a yml file, comparing columns of yml and raw data file has been created

In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''

    raw_columns = list(map(lambda x: x.lower(),  df.columns))
    yml_columns = list(map(lambda x: x.lower(),  table_config['columns']))

    yml_columns = [x.strip(' ') for x in yml_columns]
    raw_columns = [x.strip(' ') for x in raw_columns]


    expected_col=yml_columns


    if len(raw_columns) == len(expected_col) and list(expected_col)  == list( raw_columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(raw_columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(raw_columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {raw_columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


writing yml file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: Parking_Violations_Issued_-_Fiscal_Year_2017
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
- Summons Number
- Plate ID
- Registration State
- Plate Type
- Issue Date
- Violation Code
- Vehicle Body Type
- Vehicle Make
- Issuing Agency
- Street Code1
- Street Code2
- Street Code3
- Vehicle Expiration Date
- Violation Location
- Violation Precinct
- Issuer Precinct
- Issuer Code
- Issuer Command
- Issuer Squad
- Violation Time
- Time First Observed
- Violation County
- Violation In Front Of Or Opposite
- House Number
- Street Name
- Intersecting Street
- Date First Observed
- Law Section
- Sub Division
- Violation Legal Code
- Days Parking In Effect
- From Hours In Effect
- To Hours In Effect
- Vehicle Color
- Unregistered Vehicle?
- Vehicle Year
- Meter Number
- Feet From Curb
- Violation Post Code
- Violation Description
- No Standing or Stopping Violation
- Hydrant Violation
- Double Parking Violation

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [4]:
config_data['inbound_delimiter']

','

In [5]:
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2017',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Summons Number',
  'Plate ID',
  'Registration State',
  'Plate Type',
  'Issue Date',
  'Violation Code',
  'Vehicle Body Type',
  'Vehicle Make',
  'Issuing Agency',
  'Street Code1',
  'Street Code2',
  'Street Code3',
  'Vehicle Expiration Date',
  'Violation Location',
  'Violation Precinct',
  'Issuer Precinct',
  'Issuer Code',
  'Issuer Command',
  'Issuer Squad',
  'Violation Time',
  'Time First Observed',
  'Violation County',
  'Violation In Front Of Or Opposite',
  'House Number',
  'Street Name',
  'Intersecting Street',
  'Date First Observed',
  'Law Section',
  'Sub Division',
  'Violation Legal Code',
  'Days Parking In Effect',
  'From Hours In Effect',
  'To Hours In Effect',
  'Vehicle Color',
  'Unregistered Vehicle?',
  'Vehicle Year',


In [6]:
# read the file using config file
import dask.dataframe as dd
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
# #print("",source_file)
# df = pd.read_csv(source_file)
# df.head()

data_dask = dd.read_csv(source_file,dtype={'House Number': 'object',
                                              'Time First Observed': 'object'})
data_dask.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [None]:
list(data_dask.columns)

In [8]:
util.col_header_val(data_dask,config_data)

column name and column length validation passed


1

GZIP conversion

In [1]:
import gzip
import shutil
with open('file.yaml', 'rb') as f_in:
    with gzip.open('file.yaml.gz', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)