In [11]:
pip install modin

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [8]:
import modin.pandas as pd

In [2]:
import os

os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

In [3]:
# Load csv file using pandas
import pandas as pd
import time
s = time.time()
%time  pandas_df = pd.read_csv("/content/US_Accidents_Dec20_updated.csv")
e = time.time()
print("Pandas Loading Time = {}".format(e-s))
#> CPU times: user 2.91 s, sys: 226 ms, total: 3.14 s
#> Wall time: 3.09 s

CPU times: user 84.2 ms, sys: 18.2 ms, total: 102 ms
Wall time: 103 ms
Pandas Loading Time = 0.10377192497253418


In [9]:
pip install Ray

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting Ray
  Downloading ray-2.0.0-cp37-cp37m-manylinux2014_x86_64.whl (59.4 MB)
[K     |████████████████████████████████| 59.4 MB 1.4 MB/s 
Collecting virtualenv
  Downloading virtualenv-20.16.3-py2.py3-none-any.whl (8.8 MB)
[K     |████████████████████████████████| 8.8 MB 43.9 MB/s 
Collecting grpcio<=1.43.0,>=1.28.1
  Downloading grpcio-1.43.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.1 MB)
[K     |████████████████████████████████| 4.1 MB 46.5 MB/s 
Collecting platformdirs<3,>=2.4
  Downloading platformdirs-2.5.2-py3-none-any.whl (14 kB)
Collecting distlib<1,>=0.3.5
  Downloading distlib-0.3.5-py2.py3-none-any.whl (466 kB)
[K     |████████████████████████████████| 466 kB 38.5 MB/s 
[?25hInstalling collected packages: platformdirs, distlib, virtualenv, grpcio, Ray
  Attempting uninstall: grpcio
    Found existing installation: grpcio 1.47.0
    Uninstalling g

In [12]:
# Load csv file using Modin and Ray
import os
os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray
import ray
import modin.pandas as ray_pd
s = time.time()
%time  mray_df = ray_pd.read_csv("/content/US_Accidents_Dec20_updated.csv")
e = time.time()
print("Modin Loading Time = {}".format(e-s))
#> CPU times: user 762 ms, sys: 473 ms, total: 1.24 s
#> Wall time: 2.67 s

CPU times: user 36.4 ms, sys: 12.3 ms, total: 48.7 ms
Wall time: 164 ms
Modin Loading Time = 0.16568756103515625


In [13]:
# Load csv for Modin with Dask
import os
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

from distributed import Client
client = Client(memory_limit='8GB')
import modin.pandas as dask_pd
s = time.time()
%time  mdask_df = dask_pd.read_csv("/content/US_Accidents_Dec20_updated.csv")
e = time.time()
print("Modin Loading Time = {}".format(e-s))
#> CPU times: user 604 ms, sys: 288 ms, total: 892 ms
#> Wall time: 1.74 s

Perhaps you already have a cluster running?
Hosting the HTTP server on port 46315 instead


CPU times: user 36.6 ms, sys: 14.9 ms, total: 51.5 ms
Wall time: 159 ms
Modin Loading Time = 0.15970420837402344


In [14]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


In [18]:
import pandas as pd
df = pd.read_csv('/content/train.csv')
df.head(2)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C


###Creating a YAML file

In [22]:
%%writefile file.yaml
file_type: csv
dataset_name: titanic
file_name: train
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - PassengerId
    - Survived
    - Pclass
    - Name
    - Sex
    - Age
    - SibSp
    - Parch
    - Ticket
    - Fare
    - Cabin
    - Embarked

Overwriting file.yaml


In [23]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [24]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'titanic',
 'file_name': 'train',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['PassengerId',
  'Survived',
  'Pclass',
  'Name',
  'Sex',
  'Age',
  'SibSp',
  'Parch',
  'Ticket',
  'Fare',
  'Cabin',
  'Embarked']}

###Normal reading process of the file

In [26]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("/content/train.csv",delimiter=',')
df_sample.head(2)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C


###read the file using config file

In [27]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [28]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [29]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['passengerid', 'survived', 'pclass', 'name', 'sex', 'age', 'sibsp',
       'parch', 'ticket', 'fare', 'cabin', 'embarked'],
      dtype='object')
columns of YAML are: ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']


In [30]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


###Creating gzip file

In [34]:
df.to_csv('titanic_gzip.csv.gz',
          sep='|',
          header=True,
          index=False,
          compression='gzip'
          )

In [38]:
import os
gz_file = pd.read_csv('titanic_gzip.csv.gz', compression='gzip', sep='|')
gz_file.shape

(891, 12)

In [39]:
gz_file

Unnamed: 0,passengerid,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
888,889,0,3,"Johnston, Miss. Catherine Helen ""Carrie""",female,,1,2,W./C. 6607,23.4500,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C
