In [2]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting testutility.py


Write YAML file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - city
    - price
    - distance

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [4]:
config_data['inbound_delimiter']

','

In [5]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['city', 'price', 'distance']}

In [2]:
from google.colab import drive 
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
##pandas
import pandas as pd
import time

start_time = time.time()

chunksize = 10 ** 2

chunks = []

for chunk in pd.read_csv('/content/gdrive/My Drive/Colab Notebooks/vgg19_features_val.csv', chunksize=chunksize):
    chunks.append(chunk)  

df_sample = pd.concat(chunks, axis=0)

end_time = time.time()
elapsed_time = end_time - start_time

print(f'The code took {elapsed_time} seconds to run.')


Read the files 

In [21]:
## dask
import dask.dataframe as dd
import time

start_time = time.time()

df_sample = dd.read_csv("/content/gdrive/My Drive/Colab Notebooks/vgg19_features_val.csv",delimiter=',')
df_sample.head()

end_time = time.time()
elapsed_time = end_time - start_time

print(f'The code took {elapsed_time} seconds to run.')##17.445751905441284 seconds



The code took 17.445751905441284 seconds to run.


In [23]:
!pip install modin[dask]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting modin[dask]
  Downloading modin-0.22.1-py3-none-any.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: modin
Successfully installed modin-0.22.1


In [7]:
##modin
import modin.pandas as pd
import time

start_time = time.time()

chunksize = 10 ** 3

chunks = []

for chunk in pd.read_csv('/content/gdrive/My Drive/Colab Notebooks/vgg19_features_val.csv', chunksize=chunksize):
    chunks.append(chunk)  

df_sample = pd.concat(chunks, axis=0)

end_time = time.time()
elapsed_time = end_time - start_time

print(f'The code took {elapsed_time} seconds to run.')#951.532466173172 seconds




    from distributed import Client

    client = Client()

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:45197
INFO:distributed.scheduler:  dashboard at:            127.0.0.1:8787
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35337'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44169'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44297', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44297
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:38548
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:39913', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute

The code took 951.532466173172 seconds to run.


In [17]:
import os
import time
import modin.pandas as pd

os.environ["MODIN_ENGINE"] = "ray"  # Set Modin to use Ray

start_time = time.time()

chunksize = 10 ** 3 

chunks = []

for chunk in pd.read_csv('/content/gdrive/My Drive/Colab Notebooks/vgg19_features_val.csv', chunksize=chunksize):
    chunks.append(chunk)  

df_sample = pd.concat(chunks, axis=0)

end_time = time.time()
elapsed_time = end_time - start_time

print(f'The code took {elapsed_time} seconds to run.')#904.9927513599396 seconds


INFO:distributed.utils_perf:full garbage collection released 141.14 MiB from 1511 reference cycles (threshold: 9.54 MiB)


The code took 904.9927513599396 seconds to run.


In [9]:
##Perform basic validation on data columns : eg: remove special character , white spaces from the col name
util.col_header_val(df_sample,config_data)

INFO:distributed.scheduler:User asked for computation on lost data, _deploy_dask_func-3f376a84-63c5-43cf-98a5-5b82d3b646eb


column name and column length validation failed
Following File columns are not in the YAML file ['15248', '8831', '15488', '11981', '7670', '4254', '1485', '3141', '12199', '1293', '12087', '16296', '17123', '11953', '15414', '5522', '16684', '2877', '14227', '15155', '11756', '13185', '3225', '16005', '16092', '1632', '12732', '8073', '4004', '17690', '11113', '14946', '3165', '4980', '4249', '12798', '10584', '12960', '14886', '13941', '11401', '15382', '3259', '4143', '2783', '4441', '13908', '790', '10420', '8241', '9623', '15670', '8351', '9290', '5506', '17422', '14226', '11094', '18233', '14111', '2798', '1418', '2428', '9029', '12288', '14131', '8191', '9062', '2882', '6543', '7738', '10652', '9522', '16584', '16708', '4726', '1606', '1716', '4750', '11493', '17891', '12028', '13028', '2757', '4204', '15099', '1313', '13311', '15614', '12632', '8733', '15924', '1374', '14816', '6264', '15130', '833', '15146', '15244', '18049', '16236', '4665', '9952', '16726', '8584', '11013', 

INFO:distributed.scheduler:User asked for computation on lost data, lambda-75748b1c2145199bac1a54bd0fb3675c


0

INFO:distributed.scheduler:User asked for computation on lost data, lambda-f7af3a38076ef0d31c220a335a14620a
INFO:distributed.scheduler:User asked for computation on lost data, lambda-95bf50260dd21f234e63e84bf94fd80b
INFO:distributed.scheduler:User asked for computation on lost data, lambda-9bea1a78e93efb1c4ae4bff1415e798a
INFO:distributed.scheduler:User asked for computation on lost data, _deploy_dask_func-723aad79-68b6-4390-bc5a-98803d1c258f
INFO:distributed.scheduler:User asked for computation on lost data, lambda-cfdb36dcc5c0abb53acfb735ef55f99a
INFO:distributed.scheduler:User asked for computation on lost data, lambda-9efad1c336e85b393ec0a3f7c4b76f2b
INFO:distributed.scheduler:User asked for computation on lost data, lambda-e00b577f3b42df832a9cd0e0574c7799
INFO:distributed.scheduler:User asked for computation on lost data, lambda-8995a4accd249d7b0a2bf63dd975844c
INFO:distributed.scheduler:User asked for computation on lost data, _deploy_dask_func-743c7133-8d86-4364-a999-9bd8b13e3e4

In [12]:
df_sample.columns

Index(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
       ...
       '18423', '18424', '18425', '18426', '18427', '18428', '18429', '18430',
       '18431', '1'],
      dtype='object', length=18433)

In [13]:
##As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML
%%writefile vgg.yaml
file_type: csv
dataset_name: vgg19_features
file_name: vgg19_features_val
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - 0
    - 1
    - 2
    - ...
    - 18341
    - 1

Writing vgg.yaml


In [16]:
##Validate number of columns and column name of ingested file with YAML.
config_data = util.read_config_file("vgg.yaml")
print("columns of files are:" ,df_sample.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
       ...
       '18423', '18424', '18425', '18426', '18427', '18428', '18429', '18430',
       '18431', '1'],
      dtype='object', length=18433)
columns of YAML are: [0, 1, 2, '...', 18341, 1]


In [20]:
##Write the file in pipe separated text file (|) in gz format.
df_sample.to_csv('your_file_name.txt.gz', sep='|', compression='gzip', index=False)


AssertionError: ignored

In [22]:
##Create a summary of the file:

#Total number of rows,
print(df_sample.shape[0])#50000
#total number of columns
print(df_sample.shape[1])#18433
#file size:5.05GB


50000
18433
