In [1]:
import os
import time

In [2]:
# files size
os.path.getsize("custom_1988_2020.csv") 

4544707885

### Read data with Dask

In [3]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv("custom_1988_2020.csv")
end = time.time()
print("Read csv(dask): ",(end-start),"seconds")

Read csv(dask):  0.004055023193359375 seconds


In [4]:
import pandas as pd
start = time.time()
df = pd.read_csv("custom_1988_2020.csv")
end = time.time()
print("Read csv(pandas): ",(end-start),"seconds")

Read csv(pandas):  28.9464008808136 seconds


### Read data with Modin and Ray

In [5]:
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv("custom_1988_2020.csv")
end = time.time()
print("Read csv with modin and ray: ",(end-start),"seconds")

2023-11-11 18:57:23,127	INFO worker.py:1673 -- Started a local Ray instance.
[36m(raylet)[0m Spilled 3463 MiB, 5 objects, write throughput 1432 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[36m(raylet)[0m Spilled 6065 MiB, 8 objects, write throughput 1814 MiB/s.


Read csv with modin and ray:  20.278686046600342 seconds


### Dask proves to be better than Pandas, Modin and Ray because the time it took for Dask was the fastest: 0.0037 seconds

In [6]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 8 entries, 198801 to 34353
dtypes: int64(8)

In [7]:
dask_df = dask_df.drop(columns= '0')

In [8]:
# number of rows
len(dask_df.index)

113607321

In [9]:
# number of columns
len(dask_df.columns)

7

In [10]:
# removed special characters
dask_df.columns=dask_df.columns.str.replace('[#,@,&]','')

In [11]:
#To remove white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

In [12]:
dask_df.columns

Index(['198801', '1', '103', '100', '000000190', '35843', '34353'], dtype='object')

In [13]:
dask_df = dask_df.rename(columns={'198801' : 'year_month', '1' : 'export_1_import_2', '103' : 'HS_code', '100': 'customs', '000000190': 'country', '35843': 'quantity', '34353': 'value(yen)'})

In [14]:
dask_df.columns

Index(['year_month', 'export_1_import_2', 'HS_code', 'customs', 'country',
       'quantity', 'value(yen)'],
      dtype='object')

In [15]:
!pip install virtualenv -p python3


Usage:   
  pip install [options] <requirement specifier> [package-index-options] ...
  pip install [options] -r <requirements file> [package-index-options] ...
  pip install [options] [-e] <vcs project url> ...
  pip install [options] [-e] <local project path> ...
  pip install [options] <archive url/path> ...

no such option: -p


In [16]:
!pip install pyyaml



In [17]:
%%writefile utility.py

import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [18]:
%%writefile custom.yaml
file_type: csv
dataset_name: file
file_name: custom_1988_2020
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - year_month
      - export_1_import_2
      - HS_code
      - customs
      - country
      - quantity
      - value(yen)

Overwriting custom.yaml


In [19]:
!pip install python-utils



In [20]:
# Reading config file
import utility as util
config_data = util.read_config_file("custom.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'custom_1988_2020',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['year_month - export_1_import_2 - HS_code - customs - country - quantity - value(yen)']}

In [21]:
config_data['file_type']

'csv'

In [22]:
config_data['inbound_delimiter']

','

In [23]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = config_data['file_name'] + f'.{file_type}'

In [24]:
import pandas as pd
df = pd.read_csv(source_file)
df.head()

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070


In [None]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('custom_1988_2020.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('custom_1988_2020.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          lineterminator = '\n')

In [None]:
#number of files in gz format folder
import os
entries = os.listdir('custom_1988_2020.csv.gz/')
for entry in entries:
    print(entry)

In [None]:
#size of the gz format folder
os.path.getsize('custom_1988_2020.csv.gz')