# Setup

In [None]:
!pip install modin[ray]
!pip install 'ray[default]'

In [None]:
!pip install "dask[dataframe]"

In [None]:
import zipfile
with zipfile.ZipFile('./parking_violations_issued_fiscal_year_2016.csv.zip', 'r') as zip_ref:
    zip_ref.extractall('./')

In [None]:
%%writefile configuration.yaml
file_type: csv
dataset_name: parking_violations_issued
file_name: parking_violations_issued_fiscal_year_2016
table_name: parking_violations
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - plate_type
    - vehicle_body_type
    - violation_code
    - registration_state
    - vehicle_make
    - issue_date
    - summons_number
    - issuing_agency
    - street_code1
    - plate_id
    - violation_post_code
    - bbl
    - street_code2
    - street_code3
    - meter_number
    - double_parking_violation
    - violation_time
    - nta
    - community_council
    - date_first_observed
    - feet_from_curb
    - bin
    - street_name
    - issuer_code
    - issuer_precinct
    - intersecting_street
    - vehicle_year
    - time_first_observed
    - no_standing_or_stopping_violation
    - law_section
    - issuer_command
    - violation_in_front_of_or_opposite
    - violation_location
    - days_parking_in_effect
    - longitude
    - unregistered_vehicle
    - violation_description
    - latitude
    - violation_precinct
    - from_hours_in_effect
    - house_number
    - violation_legal_code
    - census_tract
    - vehicle_expiration_date
    - issuer_squad
    - violation_county
    - sub_division
    - community_board
    - hydrant_violation
    - vehicle_color
    - to_hours_in_effect

Overwriting configuration.yaml


# Read Large File

In [None]:
import dask.dataframe
import modin.pandas as m_ray_pd # using the Ray core in modin
import pandas as pd
import time
import ray

input_file = 'parking_violations_issued_fiscal_year_2016.csv' # 6+ GB file

# Pandas - (runs out of RAM and crashes Google Colab)
start_time = time.time();
data = pd.read_csv(input_file);
print('Pandas took %s seconds' % (time.time() - start_time))

# Pandas[chunksize] - (runs out of RAM and crashes Google Colab)
start_time = time.time();
data = pd.read_csv(input_file, chunksize=100000);
print('Pandas took with chunksize %s seconds' % (time.time() - start_time))

# Modin[Ray] 
start_time = time.time();
data = m_ray_pd.read_csv(input_file);
print('Modin[Ray] %s seconds' % (time.time() - start_time))

# dask 
start_time = time.time();
data = dask.dataframe.read_csv(input_file);
print('Dask took %s seconds' % (time.time() - start_time))





Pandas took 69.10192894935608 seconds
Pandas took with chunksize 1.3400604724884033 seconds



    import ray
    ray.init()

[2m[36m(pid=4811)[0m tcmalloc: large alloc 1075970048 bytes == 0x55ce2ac20000 @  0x7f3f4756a1e7 0x55ce26f94e68 0x55ce26f5f637 0x55ce27040a6e 0x55ce26f62b59 0x55ce27053fed 0x55ce26fd6988 0x55ce26fd14ae 0x55ce26ea3e2c 0x55ce26fd3bb5 0x55ce26ea3d14 0x55ce26fd3bb5 0x55ce26fd17ad 0x55ce26ea3eb1 0x7f3f4432821f 0x7f3f443cede9 0x7f3f4433019a 0x7f3f444a0a3f 0x7f3f444154e2 0x7f3f444e301d 0x7f3f444e39ca 0x7f3f44446472 0x7f3f4486b238 0x7f3f44968bf1 0x7f3f44968d21 0x7f3f4496a820 0x7f3f444aa950 0x7f3f4432b0a7 0x55ce26f62bb1 0x55ce27053fed 0x55ce26fd6988
[2m[36m(pid=4810)[0m tcmalloc: large alloc 1075970048 bytes == 0x55e36d614000 @  0x7f026756f1e7 0x55e36a9d2e68 0x55e36a99d637 0x55e36aa7ea6e 0x55e36a9a0b59 0x55e36aa91fed 0x55e36aa14988 0x55e36aa0f4ae 0x55e36a8e1e2c 0x55e36aa11bb5 0x55e36a8e1d14 0x55e36aa11bb5 0x55e36aa0f7ad 0x55e36a8e1eb1 0x7f026432d21f 0x7f02643d3de9 0x7f026433519a 0x7f02644a5a3f 0x7f026441a4e2 0x7f02644e801d 0x7f02644e89ca 0x7f026444b472 0x7f0

Modin[Ray] 193.85107588768005 seconds
Dask took 0.1345052719116211 seconds


# Utility Functions


In [None]:
%%writefile utility.py
import math
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import gzip
import shutil


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

def human_size(nbytes):
  suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
  human = nbytes
  rank = 0
  if nbytes != 0:
    rank = int((math.log10(nbytes)) / 3)
    rank = min(rank, len(suffixes) - 1)
    human = nbytes / (1024.0 ** rank)
  f = ('%.2f' % human).rstrip('0').rstrip('.')
  return '%s %s' % (f, suffixes[rank])

def file_summary(df,table_config):
    # get file size and convert bytes to readable string
    file_type = table_config['file_type']
    file_name = table_config['file_name'] + f'.{file_type}'

    THIS_FOLDER = os.path.dirname(os.path.abspath(__file__))
    file_full_path = os.path.join(THIS_FOLDER, file_name)

    file_size = os.path.getsize(file_full_path)
    size_readable = human_size(file_size)

    # get number of columns
    number_of_cols = df.shape[1]

    # get number of rows
    number_of_rows = df.shape[0]

    # print file summary
    print('FILE SUMMARY FOR: ', file_name)
    print('Total number of rows: ', number_of_rows)
    print('Total number of columns: ', number_of_cols)
    print('File size: ', size_readable)

def saveFile(df,table_config):
    # save dataframe to text file seperated by |
    df.to_csv(r'./saved_data.txt', header=None, index=None, sep=table_config['outbound_delimiter'], mode='a')

    # comppress saved text file to gz format
    with open('./saved_data.txt', 'rb') as f_in, gzip.open(table_config['file_name'] + '.txt' + '.gz', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

Writing utility.py


# Main Execution

In [None]:
import pandas as pd
import utility as util

# Read config file
config_data = util.read_config_file("configuration.yaml")

# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'],)


if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine
    util.file_summary(df,config_data)
    util.saveFile(df,config_data)

  interactivity=interactivity, compiler=compiler, result=result)


col validation passed
FILE SUMMARY FOR:  parking_violations_issued_fiscal_year_2016.csv
Total number of rows:  10626899
Total number of columns:  51
File size:  2 GB
