###  Write testutility.py file

In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


### Write YAML file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: dataset
file_name: custom_1988_2020
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - yr
    - exp_imp
    - hs_code
    - customs
    - country
    - q1
    - q2
    - value

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [4]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'dataset',
 'file_name': 'custom_1988_2020',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['yr',
  'exp_imp',
  'hs_code',
  'customs',
  'country',
  'q1',
  'q2',
  'value']}

## 1st method of reading the csv file, through 'pandas' library

In [5]:
# read the file using config file
import pandas as pd
file_type = config_data['file_type']
source_file = "D:/" + config_data['file_name'] + f'.{file_type}'

%time dataset = pd.read_csv(source_file,config_data['inbound_delimiter'])
dataset.head()

  exec(code, glob, local_ns)


Wall time: 1min 14s


Unnamed: 0,yr,exp_imp,hs_code,customs,country,Q1,Q2,value
0,198801,1,103,100,190,0,35843,34353
1,198801,1,103,100,120991000,0,1590,4154
2,198801,1,103,100,210390900,0,4500,2565
3,198801,1,103,100,220890200,0,3000,757
4,198801,1,103,100,240220000,0,26000,40668


In [6]:
#validate the header of the file
util.col_header_val(dataset,config_data)

column name and column length validation passed


1

In [7]:
print("columns of csv file are:" ,dataset.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of csv file are: Index(['yr', 'exp_imp', 'hs_code', 'customs', 'country', 'q1', 'q2', 'value'], dtype='object')
columns of YAML are: ['yr', 'exp_imp', 'hs_code', 'customs', 'country', 'q1', 'q2', 'value']


In [8]:
if util.col_header_val(dataset,config_data)==0:
    print("validation failed")
    os.remove("D:\custom_1988_2020.csv")
else:
    print("col validation passed")
    

column name and column length validation passed
col validation passed


### Further actions after passing column validation

In [9]:
%time dataset.groupby("exp_imp").exp_imp.count()

Wall time: 2.23 s


exp_imp
1    69088368
2    44518954
Name: exp_imp, dtype: int64

In [10]:
%time dataset.yr.max()

Wall time: 111 ms


202012

In [11]:
%time dataset.value.mean()

Wall time: 111 ms


32408.302375721876

In [12]:
%time dataset.value.std()

Wall time: 214 ms


376908.21176299546

In [18]:
%time dataset.describe()

Wall time: 29.7 s


Unnamed: 0,yr,exp_imp,hs_code,customs,country,q1,q2,value
count,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0
mean,200512.6,1.391867,193.2035,313.3187,654576300.0,47076.75,127167.0,32408.3
std,928.6563,0.4881672,121.0249,179.774,252176400.0,29145920.0,4571867.0,376908.2
min,198801.0,1.0,103.0,100.0,11.0,0.0,0.0,30.0
25%,199711.0,1.0,106.0,104.0,401693000.0,0.0,98.0,720.0
50%,200511.0,1.0,123.0,300.0,741521000.0,0.0,715.0,2475.0
75%,201309.0,2.0,222.0,423.0,851790000.0,7.0,6720.0,10052.0
max,202012.0,2.0,703.0,908.0,970600000.0,125500000000.0,1885790000.0,183278400.0


In [14]:
x = dataset.q2.values
y = dataset.value.values

In [15]:
x = x.reshape(len(x),1)
y = y.reshape(len(y),1)

In [16]:
from sklearn.linear_model import LinearRegression

In [17]:
%time model = LinearRegression().fit(x,y)

Wall time: 5.23 s


In [19]:
model.score(x,y)

0.1824214630085732

In [20]:
print('intercept:', model.intercept_)

intercept: [27930.60932872]


In [21]:
print('slope:', model.coef_)

slope: [[0.03521113]]


## 2nd method of reading the csv file, through 'dask' library
### We restart the kernel and run the first, second and third cells of the notebook again

### Then we load the 'dask' library

In [4]:
import dask.dataframe as dd
# read the file using config file
file_type = config_data['file_type']
source_file = "D:/" + config_data['file_name'] + f'.{file_type}'

%time dataset = dd.read_csv(source_file)
dataset.head()

Wall time: 15.6 ms


Unnamed: 0,yr,exp_imp,hs_code,customs,country,Q1,Q2,value
0,198801,1,103,100,190,0,35843,34353
1,198801,1,103,100,120991000,0,1590,4154
2,198801,1,103,100,210390900,0,4500,2565
3,198801,1,103,100,220890200,0,3000,757
4,198801,1,103,100,240220000,0,26000,40668


In [5]:
print("columns of csv file are:" ,dataset.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of csv file are: Index(['yr', 'exp_imp', 'hs_code', 'customs', 'country', 'Q1', 'Q2', 'value'], dtype='object')
columns of YAML are: ['yr', 'exp_imp', 'hs_code', 'customs', 'country', 'q1', 'q2', 'value']


### Further actions after passing column validation

In [6]:
%time dataset.groupby("exp_imp").exp_imp.count().compute()

Wall time: 22.9 s


exp_imp
1    69088368
2    44518954
Name: exp_imp, dtype: int64

In [7]:
%time dataset.yr.max().compute()

Wall time: 19.4 s


202012

In [8]:
%time dataset.value.mean().compute()

Wall time: 19.7 s


32408.302375721876

In [9]:
%time dataset.describe().compute()

Wall time: 35.1 s


Unnamed: 0,yr,exp_imp,hs_code,customs,country,Q1,Q2,value
count,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0
mean,200512.6,1.391867,193.2035,313.3187,654576300.0,47076.75,127167.0,32408.3
std,928.6563,0.4881672,121.0249,179.774,252176400.0,29145920.0,4571867.0,376908.2
min,198801.0,1.0,103.0,100.0,11.0,0.0,0.0,30.0
25%,199710.0,1.0,111.0,104.0,482010900.0,0.0,116.0,769.0
50%,200511.0,1.0,205.0,303.0,830710000.0,0.0,921.0,2804.0
75%,201308.0,2.0,302.0,500.0,852691900.0,26.0,9000.0,12080.0
max,202012.0,2.0,703.0,908.0,970600000.0,125500000000.0,1885790000.0,183278400.0


In [6]:
from dask_ml.linear_model import LinearRegression

In [13]:
from dask_glm.datasets import make_regression

In [None]:
model = LinearRegression()

In [8]:
x = dataset.Q2.values
y = dataset.value.values

In [16]:
x,y = make_regression()

In [21]:
%time model = LinearRegression().fit(x,y)

Wall time: 22.5 s


## 3rd method of reading the csv file, through 'modin' library
### We restart the kernel and run the first, second and third cells of the notebook again
### Then we load the 'modin' library

In [4]:
from dask.distributed import Client

# set up cluster and workers
client = Client(n_workers=4, 
                threads_per_worker=1,
                memory_limit='12GB')

import modin.pandas as mpd
# read the file using config file
file_type = config_data['file_type']
source_file = "D:/" + config_data['file_name'] + f'.{file_type}'

%time dataset = mpd.read_csv(source_file,config_data['inbound_delimiter'])
dataset.head()

Wall time: 52.7 s


Unnamed: 0,yr,exp_imp,hs_code,customs,country,Q1,Q2,value
0,198801,1,103,100,190,0,35843,34353
1,198801,1,103,100,120991000,0,1590,4154
2,198801,1,103,100,210390900,0,4500,2565
3,198801,1,103,100,220890200,0,3000,757
4,198801,1,103,100,240220000,0,26000,40668


### Further actions after passing column validation

In [5]:
%time dataset.groupby("exp_imp").exp_imp.count()

Wall time: 8.31 s


exp_imp
1    69088368
2    44518954
Name: exp_imp, dtype: int64

In [6]:
%time dataset.yr.max()

Wall time: 1.63 s


202012

In [7]:
%time dataset.value.mean()

Wall time: 3.03 s


32408.302375721876

In [8]:
%time dataset.value.std()

Wall time: 3.05 s


376908.21176299546

In [9]:
x = dataset["Q2"].values
y = dataset["value"].values

In [10]:
x = x.reshape(len(x),1)
y = y.reshape(len(y),1)

In [11]:
from sklearn.linear_model import LinearRegression

%time model = LinearRegression().fit(x,y)


Wall time: 4.17 s


In [12]:
print('intercept:', model.intercept_)

intercept: [27930.60932872]


In [13]:
print('slope:', model.coef_)

slope: [[0.03521113]]


In [None]:
# dataset.describe()

## 4th method of reading the csv file, through 'vaex' library
### We restart the kernel and run the first, second and third cells of the notebook again
### Then we load the 'vaex' library

In [4]:
import vaex


In [5]:
file_type = config_data['file_type']
source_file = "D:/" + config_data['file_name'] + f'.{file_type}'

source_file

'D:/custom_1988_2020.csv'

In [6]:
%time dataset = vaex.from_csv(source_file,config_data['inbound_delimiter'])

Wall time: 1min 27s


In [7]:
%time dataset.groupby('exp_imp' , agg=[vaex.agg.count(dataset.exp_imp)])

Wall time: 387 ms


#,exp_imp,exp_imp_count
0,1,69088368
1,2,44518954


In [8]:
%time dataset.yr.max()

Wall time: 111 ms


array(202012, dtype=int64)

In [9]:
%time dataset.value.mean()

Wall time: 245 ms


array(32408.30237572)

In [10]:
%time dataset.value.std()

Wall time: 660 ms


376908.21010742965

In [11]:
%time dataset.describe()

Wall time: 7.47 s


Unnamed: 0,yr,exp_imp,hs_code,customs,country,Q1,Q2,value,index
data_type,int64,int64,int64,int64,int64,int64,int64,int64,int64
count,113607322,113607322,113607322,113607322,113607322,113607322,113607322,113607322,113607322
,0,0,0,0,0,0,0,0,0
mean,200512.6231617008,1.3918669432239588,193.20352292962244,313.31866609794747,654576293.6521226,47076.754848697165,127166.98737165902,32408.302375721876,56803660.5
std,928.656401,0.488167,121.024864,179.77396,252176352.235584,29145919.720134,4571867.433452,376908.210107,32795608.969313
min,198801,1,103,100,11,0,0,30,0
max,202012,2,703,908,970600000,125500000000,1885789678,183278402,113607321


In [12]:
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LinearRegression

In [13]:
features = ['Q2']
model = Predictor(model=LinearRegression(), features=features, target='value', prediction_name='pred')

In [14]:
%time model = model.fit(dataset)

Wall time: 5.16 s


### It is interesting to note how faster 'vaex' library would be if the file was in '.hdf5' format, as we can see below

In [15]:
%time dataset =  vaex.open('D:\custom_1988_2020.csv.hdf5')

Wall time: 1.66 s


## Write the file in pipe delimited format

In [None]:
dataset.to_csv(r"D:\custom_1988_2020.txt",sep="|",index=False)

## Save the file in gz format

In [None]:
import gzip
with open(r"D:\custom_1988_2020.txt", 'rb') as f_in, gzip.open(r"D:\custom_1988_2020.txt.gz", 'wb') as f_out:
    f_out.writelines(f_in)