In [5]:
import os
import sys
import gc
import yaml
import logging
import subprocess
import datetime
import pandas as pd
import dask
import time
import dask.dataframe as dd
import re

In [6]:
file_path = 'D:/GGProject/data/testdata.csv'

In [7]:
def time_count(func):
    start = time.time()
    func()
    end = time.time()
    print("It takes {} second to finish reading".format(end - start))

In [9]:
#read file using pandas
@time_count
def readCsvwithPandas():
    file = pd.read_csv(file_path, low_memory=False)
    del file
    gc.collect()
    
readCsvwithPandas

It takes 99.53842759132385 second to finish reading


In [10]:
# read file using dask
@time_count
def readCsvwithDask():
    file = dd.read_csv(file_path)
    del file
    gc.collect()

readCsvwithDask

It takes 0.08823323249816895 second to finish reading


In [32]:
ff = dd.read_csv(file_path, low_memory=False, assume_missing=True)

In [33]:
ff.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 51 entries, Summons Number to NTA
dtypes: object(24), float64(27)

In [13]:
ff.columns

Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation', 'Latitude', 'Longitude', 'Comm

In [14]:
from pyparsing import col


def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            
def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_converson(df):
    columns_ = df.columns
    columns_ = columns_.str.lower()
    columns_ = columns_.str.replace('[^\w]','_',regex=True)
    columns_ = list(map(lambda x: x.strip('_'), list(columns_)))
    columns_ = list(map(lambda x: replacer(x,'_'), list(columns_)))
    columns_ =list(map(lambda x: x.lower(), list(columns_)))
    return columns_
    
def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = col_converson(df)
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    cols = sorted(df.columns)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(cols):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

In [15]:
yf = {}
yf['file_type'] = 'csv'
yf['dataset_name'] = 'testdataset'
yf['file_name'] = 'testdata'
yf['table_name'] = 'testtable'
yf['inbound_delimiter'] = ","
yf['outbound_delimiter'] = "|"
yf['skip_leading_rows'] = 1
yf['columns'] = col_converson(ff)
yf['file_path'] = 'D:/GGProject/data/'
with open("config.yaml", 'w') as stream:
    yaml.dump(yf, stream)

In [16]:
print(yaml.dump(yf))

columns:
- summons_number
- plate_id
- registration_state
- plate_type
- issue_date
- violation_code
- vehicle_body_type
- vehicle_make
- issuing_agency
- street_code1
- street_code2
- street_code3
- vehicle_expiration_date
- violation_location
- violation_precinct
- issuer_precinct
- issuer_code
- issuer_command
- issuer_squad
- violation_time
- time_first_observed
- violation_county
- violation_in_front_of_or_opposite
- house_number
- street_name
- intersecting_street
- date_first_observed
- law_section
- sub_division
- violation_legal_code
- days_parking_in_effect
- from_hours_in_effect
- to_hours_in_effect
- vehicle_color
- unregistered_vehicle
- vehicle_year
- meter_number
- feet_from_curb
- violation_post_code
- violation_description
- no_standing_or_stopping_violation
- hydrant_violation
- double_parking_violation
- latitude
- longitude
- community_board
- community_council
- census_tract
- bin
- bbl
- nta
dataset_name: testdataset
file_name: testdata
file_path: D:/GGProject/dat

In [17]:
config_data = read_config_file("config.yaml")

In [19]:
config_data

{'columns': ['summons_number',
  'plate_id',
  'registration_state',
  'plate_type',
  'issue_date',
  'violation_code',
  'vehicle_body_type',
  'vehicle_make',
  'issuing_agency',
  'street_code1',
  'street_code2',
  'street_code3',
  'vehicle_expiration_date',
  'violation_location',
  'violation_precinct',
  'issuer_precinct',
  'issuer_code',
  'issuer_command',
  'issuer_squad',
  'violation_time',
  'time_first_observed',
  'violation_county',
  'violation_in_front_of_or_opposite',
  'house_number',
  'street_name',
  'intersecting_street',
  'date_first_observed',
  'law_section',
  'sub_division',
  'violation_legal_code',
  'days_parking_in_effect',
  'from_hours_in_effect',
  'to_hours_in_effect',
  'vehicle_color',
  'unregistered_vehicle',
  'vehicle_year',
  'meter_number',
  'feet_from_curb',
  'violation_post_code',
  'violation_description',
  'no_standing_or_stopping_violation',
  'hydrant_violation',
  'double_parking_violation',
  'latitude',
  'longitude',
  'comm

In [20]:
file_type = config_data['file_type']
source_file = config_data['file_path'] + config_data['file_name'] + f'.{file_type}'
ff = dd.read_csv(source_file, low_memory=False, assume_missing=True, dtype={'Meter Number': 'object',
       'Time First Observed': 'object',
       'Violation Location': 'float64'})
ff.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,8002531292,EPC5238,NY,PAS,10/01/2014,21,SUBN,CHEVR,T,20390,...,,,,,,,,,,
1,8015318440,5298MD,NY,COM,03/06/2015,14,VAN,FRUEH,T,27790,...,,,,,,,,,,
2,7611181981,FYW2775,NY,PAS,07/28/2014,46,SUBN,SUBAR,T,8130,...,,,,,,,,,,
3,7445908067,GWE1987,NY,PAS,04/13/2015,19,4DSD,LEXUS,T,59990,...,,,,,,,,,,
4,7037692864,T671196C,NY,PAS,05/19/2015,19,4DSD,CHRYS,T,36090,...,,,,,,,,,,


In [21]:
col_header_val(ff, config_data)

column name and column length validation passed


1

In [36]:
print("this data file has {} rows, {} columns and size of file is {}MB" .format(len(ff), len(ff.columns), os.stat(file_path).st_size/(1024**2)))

this data file has 11809233 rows, 51 columns and size of file is 2731.391342163086MB
