In [1]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re
import time
import dask.dataframe as dd
import modin.pandas as mpd

In [2]:
file_path = '/content/Crimes_-_2001_to_Present.csv'
import warnings
warnings.filterwarnings("ignore")

## Read the file in different format

In [4]:
## Dask
time1 = time.time()
df = dd.read_csv(file_path)
time2 = time.time()
duration_d = time2-time1
print('Dask duration:', duration_d)

## Modin
time1 = time.time()
df = mpd.read_csv(file_path)
time2 = time.time()
duration_d = time2-time1
print('Modin duration:', duration_d)

## Pandas
time1 = time.time()
df = pd.read_csv(file_path,delimiter=',')
time2 = time.time()
duration_d = time2-time1
print('Pandas duration:', duration_d)

Dask duration: 0.0138397216796875


INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:44339
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33715'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46397'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34321'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39155'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42261'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33143'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42301'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:36723'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43509'
INFO:distributed.nanny:      

Modin duration: 2.9903266429901123
Pandas duration: 0.06632447242736816


Conclusion: Dask has the most efficient computation.

In [6]:
df.head(10)

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11646166,JC213529,09/01/2018 12:01:00 AM,082XX S INGLESIDE AVE,810,THEFT,OVER $500,RESIDENCE,False,True,...,8.0,44.0,06,,,2018.0,04/06/2019 04:04:43 PM,,,
1,11645836,JC212333,05/01/2016 12:25:00 AM,055XX S ROCKWELL ST,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,,False,False,...,15.0,63.0,11,,,2016.0,04/06/2019 04:04:43 PM,,,
2,11449702,JB373031,07/31/2018 01:30:00 PM,009XX E HYDE PARK BLVD,2024,NARCOTICS,POSS: HEROIN(WHITE),STREET,True,False,...,5.0,41.0,18,,,2018.0,04/09/2019 04:24:58 PM,,,
3,11643334,JC209972,12/19/2018 04:30:00 PM,056XX W WELLINGTON AVE,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,...,31.0,19.0,14,,,2018.0,04/04/2019 04:16:11 PM,,,
4,11645527,JC212744,02/02/2015 10:00:00 AM,069XX W ARCHER AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,OTHER,False,False,...,23.0,56.0,11,,,2015.0,04/06/2019 04:04:43 PM,,,
5,11034701,JA366925,01/01/2001 11:00:00 AM,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,8.0,45.0,11,,,2001.0,08/05/2017 03:50:08 PM,,,
6,10224881,HY411873,09/03/2015 06:00:00 PM,044XX S UNIVERSITY AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,False,False,...,4.0,39.0,14,1184667.0,1875669.0,2015.0,02/10/2018 03:50:01 PM,41.813999,-87.598138,"(41.81399924, -87.598137918)"
7,11230640,JB152083,12/04/2017 12:00:00 AM,035XX S MICHIGAN AVE,4650,OTHER OFFENSE,SEX OFFENDER: FAIL TO REGISTER,GOVERNMENT BUILDING/PROPERTY,True,False,...,3.0,35.0,26,,,2017.0,04/12/2018 03:55:17 PM,,,
8,11645648,JC212959,01/01/2018 08:00:00 AM,024XX N MONITOR AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,30.0,19.0,11,,,2018.0,04/06/2019 04:04:43 PM,,,
9,11645959,JC211511,12/20/2018 04:00:00 PM,045XX N ALBANY AVE,2820,OTHER OFFENSE,TELEPHONE THREAT,RESIDENCE,False,False,...,33.0,14.0,08A,,,2018.0,04/06/2019 04:04:43 PM,,,


## Perform basic validation on data columns

In [5]:
df.columns

Index(['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type',
       'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat',
       'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate',
       'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude',
       'Location'],
      dtype='object')

In [7]:
def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

## create a YAML file

In [32]:
%%writefile file.yaml
file_type: csv
dataset_name: Crimes_-_2001_to_Present
file_name: data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - ID
    - Case_Number
    - Date
    - Block
    - IUCR
    - Primary_Type
    - Description
    - Location_Description
    - Arrest
    - Domestic
    - Beat
    - District
    - Ward
    - Community_Area
    - FBI_Code
    - X_Coordinate
    - Y_Coordinate
    - Year
    - Location
    - Name


Overwriting file.yaml


In [33]:
with open('/content/file.yaml', 'r') as stream:
  config_data =  yaml.safe_load(stream)

In [34]:
col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['updated_on', 'longitude', 'latitude']
Following YAML columns are not in the file uploaded ['name']


0

## Validate number of columns and column name of ingested file with YAML.

In [53]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['id', 'case_number', 'date', 'block', 'iucr', 'primary_type',
       'description', 'location_description', 'arrest', 'domestic', 'beat',
       'district', 'ward', 'community_area', 'fbi_code', 'x_coordinate',
       'y_coordinate', 'year', 'updated_on', 'latitude', 'longitude',
       'location'],
      dtype='object')
columns of YAML are: ['ID', 'Case_Number', 'Date', 'Block', 'IUCR', 'Primary_Type', 'Description', 'Location_Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community_Area', 'FBI_Code', 'X_Coordinate', 'Y_Coordinate', 'Year', 'Location', 'Name']


In [54]:
if col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['updated_on', 'longitude', 'latitude']
Following YAML columns are not in the file uploaded ['name']
validation failed


## Write the file in pipe separated text file (|) in gz format.

In [42]:
import csv
import gzip

df = pd.read_csv(file_path,delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('crimedata.csv.gz', sep='|', compression='gzip', index=False)

In [48]:
# get the file size
file_size = os.path.getsize('/content/crimedata.csv.gz')

In [50]:
## Create a summary of the file:
## Total number of rows,
print("rows of rows:", len(df))

## total number of columns
print("number of columns:", len(df.columns))

## file size
print("Size (In bytes):" , file_size)

rows of rows: 2391128
number of columns: 22
Size (In bytes): 137324232
