### Data Injection Pipeline

######  Reading 2+GB file using various libraries

In [1]:
# Importing the necessary libraries

import warnings
warnings.filterwarnings('ignore')
import csv
import time
import pandas as pd
import dask.dataframe as dd

In [2]:
crime_data = "crime_data2013-2022.csv"

In [30]:
# Using pandas

start_time = time.time()
pandas_df = pd.read_csv(crime_data, sep=', ')
print("Reading the csv file using pandas took %s seconds" % (time.time() - start_time))

Reading the csv file using pandas took 929.8297851085663 seconds


In [3]:
# Using pandas chuncks

start_time = time.time()
pandas_chunck = pd.read_csv(crime_data, chunksize=100000)
print("Reading the csv file using pandas with chunksize took %s seconds" % (time.time() - start_time))

Reading the csv file using pandas with chunksize took 0.007946491241455078 seconds


In [9]:
%%time

pandas_chuncks2 = pd.read_csv(crime_data, chunksize=100000)
total_length = 0
for chunk in df_chunks:
    total_length += len(chunk)
print("Total length of file:",total_length)

56165388
Wall time: 3min 38s


In [3]:
# Using Dask

start_time = time.time()
dask_df = dd.read_csv(crime_data)
print("Reading the csv file using dask took %s seconds" % (time.time() - start_time))

Reading the csv file using dask took 0.026749849319458008 seconds


In [4]:
# Using csv.DictReader

start_time = time.time()

df_4 = csv.DictReader(open(crime_data))
print(" Reading the csv file using csv.DictReader took %s seconds" % (time.time() - start_time))


 Reading the csv file using csv.DictReader took 0.00751805305480957 seconds


In term of the computational efficency, CSV.DictReader appears to be the most efficient with the least execution time. However, dask.dataframe reader will be used because the data need to be written in to a dataframe.


In [5]:
dask_df.head()

Unnamed: 0,crime_id,Year,Month,LSOA_code,LSOA_name,Crime_type
0,1,2013,1,E01014399,Bath and North East Somerset 001A,Anti-social behaviour
1,2,2013,1,E01014399,Bath and North East Somerset 001A,Burglary
2,3,2013,1,E01014399,Bath and North East Somerset 001A,Other theft
3,4,2013,1,E01014400,Bath and North East Somerset 001B,Anti-social behaviour
4,5,2013,1,E01014400,Bath and North East Somerset 001B,Anti-social behaviour


In [6]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 6 entries, crime_id to Crime_type
dtypes: object(3), int64(3)

In [7]:
print(dask_df.dtypes)

crime_id       int64
Year           int64
Month          int64
LSOA_code     object
LSOA_name     object
Crime_type    object
dtype: object


In [8]:
import numpy as np

# Compute the memory usage 
memory_usage = dask_df.memory_usage(deep=True).compute()
print(memory_usage)

Crime_type    4269186601
Index               7040
LSOA_code     3706915608
LSOA_name     4059667726
Month          449323104
Year           449323104
crime_id       449323104
dtype: int64


In [9]:
# Getting the file size, numbers of rows and columns
import os

file_size = os.path.getsize(crime_data)
print("File size:", file_size, "bytes")
print("Number of rows:", dask_df.shape[0].compute())
print("Number of columns:", dask_df.shape[1])

File size: 3557434225 bytes
Number of rows: 56165388
Number of columns: 6


###### Performin basic validation on the columns

In [10]:
%%writefile testutility.py
import dask.dataframe as dd
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

################
# File Reading #
################

# Creating the config file
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0
 

Writing testutility.py


In [21]:
%%writefile crimefile.yaml
file_type: csv
dataset_name: crime_data2013-2022
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Crime ID
    - Year
    - Month
    - LSOA Name
    - LSOA Code
    - Crime Type

Writing crimefile.yaml


In [22]:
# Read config file
import testutility as util
config_data = util.read_config_file("crimefile.yaml")

In [23]:
config_data['inbound_delimiter']

','

In [24]:
# Checking the content of the config file
config_data

{'file_type': 'csv',
 'dataset_name': 'crime_data2013-2022',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Crime ID',
  'Year',
  'Month',
  'LSOA Name',
  'LSOA Code',
  'Crime Type']}

In [15]:
# Creating the test file
testdata = {
    'crime_id' : [0,1,2,3,4],
    'Year': [2013, 2013, 2013,2013, 2013],
    'Month':[1,1,1,1,1],
    'LSOA_code':['E01014399','E01014399','E01014399','E01014400','E01014400'],
    'LSOA_name':['Bath and North East Somerset 001A','Bath and North East Somerset 001A','Bath and North East Somerset 001A','Bath and North East Somerset 001B','Bath and North East Somerset 001B'],
    'Crime_type':['Anti-social behaviour','Burglary','Other theft','Anti-social behaviou','Shoplifting']
}

df = pd.DataFrame(testdata, columns=['crime_id', 'Year','Month','LSOA_code','LSOA_name','Crime_type'])
df.to_csv("test_data.csv",index=False)

In [16]:
# Normal reading process of the file using Pandas

import pandas as pd
sample_df = pd.read_csv("test_data.csv",delimiter=',')
sample_df.head()

Unnamed: 0,crime_id,Year,Month,LSOA_code,LSOA_name,Crime_type
0,0,2013,1,E01014399,Bath and North East Somerset 001A,Anti-social behaviour
1,1,2013,1,E01014399,Bath and North East Somerset 001A,Burglary
2,2,2013,1,E01014399,Bath and North East Somerset 001A,Other theft
3,3,2013,1,E01014400,Bath and North East Somerset 001B,Anti-social behaviou
4,4,2013,1,E01014400,Bath and North East Somerset 001B,Shoplifting


In [17]:
# Reading the file using config file


file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

 ./test_data.csv


Unnamed: 0,crime_id,Year,Month,LSOA_code,LSOA_name,Crime_type
0,0,2013,1,E01014399,Bath and North East Somerset 001A,Anti-social behaviour
1,1,2013,1,E01014399,Bath and North East Somerset 001A,Burglary
2,2,2013,1,E01014399,Bath and North East Somerset 001A,Other theft
3,3,2013,1,E01014400,Bath and North East Somerset 001B,Anti-social behaviou
4,4,2013,1,E01014400,Bath and North East Somerset 001B,Shoplifting


In [18]:
# Readin the of the crime file using config

file_type = config_data['file_type']
source_file = "./" + config_data['dataset_name'] + f'.{file_type}'
print("",source_file)
main_df = pd.read_csv(source_file,config_data['inbound_delimiter'])
main_df.head()

 ./crime_data2013-2022.csv


Unnamed: 0,crime_id,Year,Month,LSOA_code,LSOA_name,Crime_type
0,1,2013,1,E01014399,Bath and North East Somerset 001A,Anti-social behaviour
1,2,2013,1,E01014399,Bath and North East Somerset 001A,Burglary
2,3,2013,1,E01014399,Bath and North East Somerset 001A,Other theft
3,4,2013,1,E01014400,Bath and North East Somerset 001B,Anti-social behaviour
4,5,2013,1,E01014400,Bath and North East Somerset 001B,Anti-social behaviour


In [25]:
# Checking the contenet of the config file
config_data

{'file_type': 'csv',
 'dataset_name': 'crime_data2013-2022',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Crime ID',
  'Year',
  'Month',
  'LSOA Name',
  'LSOA Code',
  'Crime Type']}

In [26]:
# Validating the file column names 

util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['lsoa_code', 'crime_id', 'lsoa_name', 'crime_type']
Following YAML columns are not in the file uploaded ['crime type', 'crime id', 'lsoa name', 'lsoa code']


0

In [27]:
#
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['crime_id', 'year', 'month', 'lsoa_code', 'lsoa_name', 'crime_type'], dtype='object')
columns of YAML are: ['Crime ID', 'Year', 'Month', 'LSOA Name', 'LSOA Code', 'Crime Type']


In [28]:
# Writing the file in pipe separated text file (|) in gz format.
import datetime
import csv
import gzip

# Write csv in gz format in pipe separated text file (|)
df.to_csv('test.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

In [29]:
# Creating the file summary

import dask.dataframe as dd
import os

# Read the gzip-compressed file into a Dask DataFrame
df = dd.read_csv('test.gz')

# Getting the number of rows and columns
num_rows = df.shape[0].compute()
num_columns = df.shape[1]

# Getting the file size of the gzip-compressed file
file_size = os.path.getsize('test.gz')

# Print the summary
print("Summary:")
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)
print("File size:", file_size, "bytes")


Summary:
Number of rows: 5
Number of columns: 1
File size: 202 bytes
