In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


## write YAML file

In [15]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: 5m Sales Records
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - region
    - country
    - item_type
    - sales_channel
    - order_priority
    - order_date
    - order_id
    - ship_date
    - units_sold
    - unit_price
    - unit_cost
    - total_revenue
    - total_cost
    - total_profit


Overwriting file.yaml


In [16]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [17]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': '5m Sales Records',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['region',
  'country',
  'item_type',
  'sales_channel',
  'order_priority',
  'order_date',
  'order_id',
  'ship_date',
  'units_sold',
  'unit_price',
  'unit_cost',
  'total_revenue',
  'total_cost',
  'total_profit']}

In [5]:
# Normal reading process of the file: pandas 16sec
import pandas as pd
df_sample_1 = pd.read_csv("5m Sales Records.csv",delimiter=',')
df_sample_1.head()

Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
0,Australia and Oceania,Palau,Office Supplies,Online,H,3/6/2016,517073523,3/26/2016,2401,651.21,524.96,1563555.21,1260428.96,303126.25
1,Europe,Poland,Beverages,Online,L,4/18/2010,380507028,5/26/2010,9340,47.45,31.79,443183.0,296918.6,146264.4
2,North America,Canada,Cereal,Online,M,1/8/2015,504055583,1/31/2015,103,205.7,117.11,21187.1,12062.33,9124.77
3,Europe,Belarus,Snacks,Online,C,1/19/2014,954955518,2/27/2014,1414,152.58,97.44,215748.12,137780.16,77967.96
4,Middle East and North Africa,Oman,Cereal,Offline,H,4/26/2019,970755660,6/2/2019,7027,205.7,117.11,1445453.9,822931.97,622521.93


In [6]:
# Normal reading process of the file: Dask 3sec
import dask.dataframe as dd
df_sample_2 = dd.read_csv("5m Sales Records.csv",delimiter=',')
df_sample_2.head()

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
0,Australia and Oceania,Palau,Office Supplies,Online,H,3/6/2016,517073523,3/26/2016,2401,651.21,524.96,1563555.21,1260428.96,303126.25
1,Europe,Poland,Beverages,Online,L,4/18/2010,380507028,5/26/2010,9340,47.45,31.79,443183.0,296918.6,146264.4
2,North America,Canada,Cereal,Online,M,1/8/2015,504055583,1/31/2015,103,205.7,117.11,21187.1,12062.33,9124.77
3,Europe,Belarus,Snacks,Online,C,1/19/2014,954955518,2/27/2014,1414,152.58,97.44,215748.12,137780.16,77967.96
4,Middle East and North Africa,Oman,Cereal,Offline,H,4/26/2019,970755660,6/2/2019,7027,205.7,117.11,1445453.9,822931.97,622521.93


In [7]:
# Normal reading process of the file: Modin and Ray 41sec
!pip install modin[ray]




In [8]:
import modin.pandas as md
df_sample_3 = md.read_csv("5m Sales Records.csv",delimiter=',')
df_sample_3.head()


2024-08-29 21:01:43,504	INFO worker.py:1783 -- Started a local Ray instance.


Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
0,Australia and Oceania,Palau,Office Supplies,Online,H,3/6/2016,517073523,3/26/2016,2401,651.21,524.96,1563555.21,1260428.96,303126.25
1,Europe,Poland,Beverages,Online,L,4/18/2010,380507028,5/26/2010,9340,47.45,31.79,443183.0,296918.6,146264.4
2,North America,Canada,Cereal,Online,M,1/8/2015,504055583,1/31/2015,103,205.7,117.11,21187.1,12062.33,9124.77
3,Europe,Belarus,Snacks,Online,C,1/19/2014,954955518,2/27/2014,1414,152.58,97.44,215748.12,137780.16,77967.96
4,Middle East and North Africa,Oman,Cereal,Offline,H,4/26/2019,970755660,6/2/2019,7027,205.7,117.11,1445453.9,822931.97,622521.93


We can see that Dask takes the shortest amount of time to read a large csv file. The file I used is not big enough to fit the requirement of modin and ray thats why it did not perform as well as pandas, if the file is larger I believe modin and ray will take less time than pandas.

In [18]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,delimiter=config_data['inbound_delimiter'])
df.head()

Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
0,Australia and Oceania,Palau,Office Supplies,Online,H,3/6/2016,517073523,3/26/2016,2401,651.21,524.96,1563555.21,1260428.96,303126.25
1,Europe,Poland,Beverages,Online,L,4/18/2010,380507028,5/26/2010,9340,47.45,31.79,443183.0,296918.6,146264.4
2,North America,Canada,Cereal,Online,M,1/8/2015,504055583,1/31/2015,103,205.7,117.11,21187.1,12062.33,9124.77
3,Europe,Belarus,Snacks,Online,C,1/19/2014,954955518,2/27/2014,1414,152.58,97.44,215748.12,137780.16,77967.96
4,Middle East and North Africa,Oman,Cereal,Offline,H,4/26/2019,970755660,6/2/2019,7027,205.7,117.11,1445453.9,822931.97,622521.93


In [19]:
#validate the header of the file
util.col_header_val(df,config_data)

if util.col_header_val(df,config_data)==0:
    print("validation failed")

else:
    print("col validation passed")


column name and column length validation passed
column name and column length validation passed
col validation passed


In [22]:
output_file = "./" + config_data['file_name'] + f'.txt'
print("",output_file)

 ./5m Sales Records.txt


In [24]:
#Write the file in pipe separated text file (|) in gz format.
output_file = "./" + config_data['file_name'] + f'.txt'
df.to_csv(output_file, header=True, index=None, sep=config_data['outbound_delimiter'], mode='a')

In [26]:
#gz format
!gzip '5m Sales Records.txt'

In [29]:
#summary of the file: Total number of rows and columns, file size
total_rows=len(df.axes[0])
total_cols=len(df.axes[1])
print ("Total number of rows: ", total_rows)
print ("Total number of columns: ", total_cols)

import os
file_size = os.path.getsize("5m Sales Records.txt.gz")
print ("File Size is :", file_size, "bytes")

Total number of rows:  5000000
Total number of columns:  14
File Size is : 196157917 bytes
