Data Ingestion

In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import dask.dataframe as dd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string


def col_header_val(df, table_config):
    '''
    Replace whitespaces in the column
    and standardize column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("Column name and column length validation passed")
        return 1
    else:
        print("Column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

def details_of_file(df):
   num_col = df.shape[1]
   print("Total number of columns:", num_col)
   num_records = df.shape[0].compute()
   print("Total number of rows:", num_records)

In [None]:
%%writefile file.yaml
file_type : csv
dataset_name: MyDrive
file_name: dataset
inbound_delimiter : ","
outbound_delimiter : "|"
skip_leading_rows : 1
gz_file : 'data.csv.gz'
gz_delimiter : '|'
columns :
    - en
    - fr
batch_size : 4000000

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [4]:
import pandas as pd
import testutility as util
import dask.dataframe as dd
import modin.pandas as mpd
import time
import ray
import warnings
warnings.filterwarnings("ignore")

In [5]:
# Read config file

config_data = util.read_config_file("file.yaml")

In [None]:
config_data['inbound_delimiter']

In [None]:
#inspecting data of config file
config_data

In [8]:
ft = config_data['file_type']
source_file = "/content/drive/MyDrive/" + config_data['file_name'] + f'.{ft}'

In [9]:

df = dd.read_csv(source_file, blocksize='100MB')

In [None]:
validation_passed = True
if util.col_header_val(df, config_data) == 0:
    print("Column validation failed for a chunk. File rejected.")
    validation_passed = False
if validation_passed:
    print("Column validation passed for all chunks.")

Details of Data

In [11]:
util.details_of_file(df)

Total number of columns: 2
Total number of rows: 22520376


Read File

1.  Pandas

In [None]:
start = time.time()
pandas_df = pd.read_csv(source_file, sep=config_data['inbound_delimiter'], chunksize=config_data['batch_size'])
for chunk in pandas_df:
    pass
print("Time to read with pandas: {} seconds".format(round(time.time() - start, 3)))

In [13]:
c = 1
for chunk in pandas_df:
    print('Batch',c)
    print(chunk.shape)
    c+=1

Dask

In [None]:
start = time.time()
dask_df = dd.read_csv(source_file, sep=config_data['inbound_delimiter'])
print("Time to read with pandas: {} seconds".format(round(time.time() - start, 3)))

In [None]:
dask_df.shape

Modin Ray

In [None]:
ray.init()
start = time.time()
modin_df = mpd.read_csv(source_file, sep=config_data['inbound_delimiter'], chunksize=config_data['batch_size'])
for c in modin_df:
  pass
print("Time to read with Modin: {} seconds".format(round(time.time() - start, 3)))

As a result, Dask outperforms Modin, Ray, and Pandas by reading 2 GB+ data in seconds without the need for batch processing.