# Setup File

In [1]:
import pandas as pd
import numpy as np
#First attempt with Dask to deal with large dataset
import dask.dataframe as dd
import dask
#Second attempt with PySpark
from pyspark.sql import SparkSession
import multiprocessing as mp
#Third Attempt with Csv.Disk Reader
import csv
#Fourth attempt with Datatable.Fread 
import datatable as dt
import yaml
from box import Box
import gzip
import os
from subprocess import check_call
import warnings
warnings.filterwarnings('ignore')

# Check multiple solutions to find efficiency solution to read 2GB+ dataset (csv)


## Pandas option without chunk

In [2]:
%%time
df_pandas = pd.read_csv("Parking_2016.csv")
df_pandas.head()

Wall time: 1min 9s


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,1363745270,GGY6450,99,PAS,07/09/2015,46,SDN,HONDA,P,0,...,,,,,,,,,,
1,1363745293,KXD355,SC,PAS,07/09/2015,21,SUBN,CHEVR,P,55730,...,,,,,,,,,,
2,1363745438,JCK7576,PA,PAS,07/09/2015,21,SDN,ME/BE,P,42730,...,,,,,,,,,,
3,1363745475,GYK7658,NY,OMS,07/09/2015,21,SUBN,NISSA,P,58130,...,,,,,,,,,,
4,1363745487,GMT8141,NY,PAS,07/09/2015,21,P-U,LINCO,P,58130,...,,,,,,,,,,


## Pandas option with Chunk extension

In [5]:
%%time
chunks = pd.read_csv("Parking_2016.csv", chunksize=100000)
df_pandas_chunks = pd.concat(chunks)
df_pandas_chunks.head()

Wall time: 1min 3s


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,1363745270,GGY6450,99,PAS,07/09/2015,46,SDN,HONDA,P,0,...,,,,,,,,,,
1,1363745293,KXD355,SC,PAS,07/09/2015,21,SUBN,CHEVR,P,55730,...,,,,,,,,,,
2,1363745438,JCK7576,PA,PAS,07/09/2015,21,SDN,ME/BE,P,42730,...,,,,,,,,,,
3,1363745475,GYK7658,NY,OMS,07/09/2015,21,SUBN,NISSA,P,58130,...,,,,,,,,,,
4,1363745487,GMT8141,NY,PAS,07/09/2015,21,P-U,LINCO,P,58130,...,,,,,,,,,,


## Dask solution 

In [106]:
%%time
df_dask = dd.read_csv("Parking_2016.csv", dtype={'House Number': 'object',
       'Intersecting Street': 'object',
       'Issuer Squad': 'object',
       'Time First Observed': 'object',
       'Unregistered Vehicle?': 'float64',
       'Violation Description': 'object',
       'Violation Legal Code': 'object',
       'Violation Post Code': 'object'})
#We have to adding dtypes based on error generated 
#from dask extension or have to increase sample size but I choose the first option with adding dtypes
df_dask.head()

Wall time: 1.32 s


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,1363745270,GGY6450,99,PAS,07/09/2015,46,SDN,HONDA,P,0,...,,,,,,,,,,
1,1363745293,KXD355,SC,PAS,07/09/2015,21,SUBN,CHEVR,P,55730,...,,,,,,,,,,
2,1363745438,JCK7576,PA,PAS,07/09/2015,21,SDN,ME/BE,P,42730,...,,,,,,,,,,
3,1363745475,GYK7658,NY,OMS,07/09/2015,21,SUBN,NISSA,P,58130,...,,,,,,,,,,
4,1363745487,GMT8141,NY,PAS,07/09/2015,21,P-U,LINCO,P,58130,...,,,,,,,,,,


## Pyspark Solution


In [10]:
import os
#I getting error with Java gateway process exited before sending the driver its port number, and found this web to solve this problem
#https://github.com/jupyter/notebook/issues/743
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_311"


In [11]:
%%time

spark = SparkSession.builder.appName("Exploratory Analysis").getOrCreate()
df_spark = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("Parking_2016.csv")
df_spark.show(5)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+----+----+----+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehic

In [46]:
%%time
df_mp = pd.read_csv("Parking_2016.csv", chunksize=1000)
total_length = 0
for chunk in df_mp:
    total_length += len(chunk)
print(total_length)

10626899
Wall time: 1min 33s


## CSV Dict Reader Solution

In [8]:
%%time

df_csv = csv.DictReader(open("Parking_2016.csv"))
i=0
for row in df_csv:
        print(row)
        i += 1
        if i == 5:
            break

{'Summons Number': '1363745270', 'Plate ID': 'GGY6450', 'Registration State': '99', 'Plate Type': 'PAS', 'Issue Date': '07/09/2015', 'Violation Code': '46', 'Vehicle Body Type': 'SDN', 'Vehicle Make': 'HONDA', 'Issuing Agency': 'P', 'Street Code1': '0', 'Street Code2': '40404', 'Street Code3': '40404', 'Vehicle Expiration Date': '20170602', 'Violation Location': '0074', 'Violation Precinct': '74', 'Issuer Precinct': '301', 'Issuer Code': '358160', 'Issuer Command': 'T301', 'Issuer Squad': '0000', 'Violation Time': '1037A', 'Time First Observed': '', 'Violation County': 'K', 'Violation In Front Of Or Opposite': 'F', 'House Number': '142', 'Street Name': 'MACDOUNGH ST', 'Intersecting Street': '', 'Date First Observed': '0', 'Law Section': '408', 'Sub Division': 'D1', 'Violation Legal Code': '', 'Days Parking In Effect    ': 'BBBBBBB', 'From Hours In Effect': 'ALL', 'To Hours In Effect': 'ALL', 'Vehicle Color': 'WHITE', 'Unregistered Vehicle?': '0', 'Vehicle Year': '2010', 'Meter Number':

## Database. Fread Solution

In [9]:
%%time
df_dt = dt.fread("Parking_2016.csv")
df_dt.head()

Wall time: 6.95 s


Unnamed: 0_level_0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,…,Community Council,Census Tract,BIN,BBL,NTA
Unnamed: 0_level_1,▪▪▪▪▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
0,1363745270,GGY6450,99,PAS,07/09/2015,46,SDN,HONDA,P,0,…,(unknown),(unknown),(unknown),(unknown),(unknown)
1,1363745293,KXD355,SC,PAS,07/09/2015,21,SUBN,CHEVR,P,55730,…,(unknown),(unknown),(unknown),(unknown),(unknown)
2,1363745438,JCK7576,PA,PAS,07/09/2015,21,SDN,ME/BE,P,42730,…,(unknown),(unknown),(unknown),(unknown),(unknown)
3,1363745475,GYK7658,NY,OMS,07/09/2015,21,SUBN,NISSA,P,58130,…,(unknown),(unknown),(unknown),(unknown),(unknown)
4,1363745487,GMT8141,NY,PAS,07/09/2015,21,P-U,LINCO,P,58130,…,(unknown),(unknown),(unknown),(unknown),(unknown)
5,1363745517,GYK3760,NY,PAS,07/09/2015,21,SUBN,HONDA,P,46730,…,(unknown),(unknown),(unknown),(unknown),(unknown)
6,1363745529,GYK3760,NY,PAS,07/09/2015,75,SUBN,HONDA,P,46730,…,(unknown),(unknown),(unknown),(unknown),(unknown)
7,1363745542,GWL9925,NY,PAS,07/09/2015,71,SDN,TOYOT,P,85730,…,(unknown),(unknown),(unknown),(unknown),(unknown)
8,1363745554,GPH9963,PA,PAS,07/09/2015,21,SDN,MITSU,P,55730,…,(unknown),(unknown),(unknown),(unknown),(unknown)
9,1363745578,GWF8627,NY,PAS,07/09/2015,21,SUBN,LINCO,P,31530,…,(unknown),(unknown),(unknown),(unknown),(unknown)


# Conclusion for each solution
Based on the executed times, totally "csv.DictReader" is the best one , but "dask" is aslo another good choice in this scenario to read data in Dataframe mode. However, Dask still have some situation with dytpes, which requires we have to force dtypes for each variables, so that I will use pandas beacaue of its stable.



# Use YAML File


In [22]:
with open("config.yaml", "r") as ymlfile:
    cfg = Box(yaml.safe_load(ymlfile))

In [23]:
def read_data_summary(config):
    data = cfg.input.name+'.'+cfg.input.format
    df_test= pd.read_csv(data, delimiter=cfg.input.delimiter)
    print(f'The size of the file is: {os.stat(data).st_size} Bytes')
    print(f'It has: {len(df_dask.columns)} Columns and {len(df_dask)} Rows')
    return df_test

In [24]:
df = read_data_summary(cfg)

The size of the file is: 2151937808 Bytes
It has: 51 Columns and 10626899 Rows


In [27]:
def validate_data(raw_df):
    trusted_columns = list(map(lambda x: x.lower(),  cfg.input.columns))
    raw_columns = list(map(lambda x: x.lower(),  raw_df.columns))
    
    trusted_columns = [x.strip(' ') for x in trusted_columns]
    raw_columns = [x.strip(' ') for x in raw_columns]
    
    while(True):
        if len(raw_columns)!=len(trusted_columns):
            print(f'Count of columns are invalid! It should be {len(trusted_columns)}, but it is {len(raw_columns)}')
            return
        if raw_columns.sort()!=trusted_columns.sort():
            print('Columns are invalid!')
            return
        if raw_columns.sort()!=trusted_columns.sort():
            print(f'Columns are invalid!')
            print(f'Columns in Uploaded Dataset: {list(set(raw_columns).difference(trusted_columns))} VS. Columns in Config File: {list(set(trusted_columns).difference(raw_columns))}')
            return
    
    
        output_file = cfg.output.name+"."+cfg.output.format
        output_path = cfg.output.path+output_file
        df.to_csv(output_path, header=None, index=None, sep=cfg.output.delimiter, compression='infer')
        print(f'File is uploaded successfully and written to: {output_file}')
        return

In [28]:
validate_data(df)


File is uploaded successfully and written to: trusted_Parking_2016.gz
