In [1]:
import time
import os

### Size of the file

In [2]:
os.path.getsize('embeddings.csv')

3447449884

### Read file with Pandas

In [3]:
import pandas as pd
start = time.time()
df = pd.read_csv('embeddings.csv')
end = time.time()
print("Read file with with pandas: ",(end-start),"sec")

Read file with with pandas:  58.42142200469971 sec


### Read file with Dask

In [4]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('embeddings.csv')
end = time.time()
print("Read file with Dask: ",(end-start),"sec")

Read file with Dask:  0.08638119697570801 sec


### Read file with Modin

In [5]:
import modin.pandas as md
start = time.time()
df = md.read_csv('embeddings.csv')
end = time.time()
print("Read file with modin: ",(end-start),"sec")


    from distributed import Client

    client = Client()

2024-01-12 23:15:00,043 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/xb/1hqqlkbx5cqg9t4pk4rd0d0h0000gn/T/dask-worker-space/worker-5xkcp4rt', purging
2024-01-12 23:15:00,044 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/xb/1hqqlkbx5cqg9t4pk4rd0d0h0000gn/T/dask-worker-space/worker-s9l47h63', purging
2024-01-12 23:15:00,045 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/xb/1hqqlkbx5cqg9t4pk4rd0d0h0000gn/T/dask-worker-space/worker-4swapd_q', purging
2024-01-12 23:15:00,046 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/xb/1hqqlkbx5cqg9t4pk4rd0d0h0000gn/T/dask-worker-space/worker-41d3u1bv', purging
2024-01-12 23:15:00,047 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/xb/1hqqlkbx5cqg9t4pk4rd0d0h0000gn/T/dask-worker-space/worker-xntqjs3m', purging
2024-

Read file with modin:  44.58029222488403 sec


### My resolution is that Dask showed the fastest computational speed

### Basic validation on data columns: remove special character and white spaces from the columns names

In [6]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 1025 entries, site_id to feature_1023
dtypes: object(1), float64(1024)

In [7]:
dask_df.head()

Unnamed: 0,site_id,feature_0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,feature_7,feature_8,...,feature_1014,feature_1015,feature_1016,feature_1017,feature_1018,feature_1019,feature_1020,feature_1021,feature_1022,feature_1023
0,HRCE-1_10_AA02_1,2.355969,-0.058361,-0.169764,-0.316499,-0.891334,0.581174,-0.284587,-0.279198,-0.146575,...,0.391186,0.786199,-1.739626,-1.317543,-1.208275,-0.507439,-0.317298,0.285018,-0.091285,-1.553895
1,HRCE-1_10_AA02_2,2.325652,-0.202519,-0.296017,-0.481136,-0.641461,0.702847,0.334191,-0.077498,-0.314538,...,0.638674,1.051621,-1.355659,-1.28521,-1.341911,-0.271349,-0.157707,0.081128,-0.447174,-1.614872
2,HRCE-1_10_AA02_3,2.207082,-0.379794,-0.365562,-0.196667,-0.799039,0.735813,0.081227,-0.393452,0.066324,...,0.552127,0.775428,-1.616731,-0.868382,-1.334486,-0.774456,-0.36325,0.161023,-0.066745,-1.325545
3,HRCE-1_10_AA02_4,2.452741,0.050658,-0.444642,-0.309758,-0.7249,0.658327,-0.128331,-0.164759,0.279105,...,0.689145,1.167066,-1.275786,-2.088455,-1.231964,-0.364454,-0.135066,0.49301,-0.54468,-1.29015
4,HRCE-1_10_AA03_1,2.106868,0.032095,-0.175542,-0.808618,-0.806217,0.72134,-0.213196,-0.187649,-0.070294,...,0.385696,0.503924,-1.192554,-1.123965,-1.323457,-0.359761,-0.10034,0.185773,-0.394267,-1.71276


In [8]:
# remove special character
dask_df.columns=dask_df.columns.str.replace('[@,#,$,%,&,*,+]','')
# remove white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

# all columns
print(dask_df.columns.tolist())

['site_id', 'feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6', 'feature_7', 'feature_8', 'feature_9', 'feature_10', 'feature_11', 'feature_12', 'feature_13', 'feature_14', 'feature_15', 'feature_16', 'feature_17', 'feature_18', 'feature_19', 'feature_20', 'feature_21', 'feature_22', 'feature_23', 'feature_24', 'feature_25', 'feature_26', 'feature_27', 'feature_28', 'feature_29', 'feature_30', 'feature_31', 'feature_32', 'feature_33', 'feature_34', 'feature_35', 'feature_36', 'feature_37', 'feature_38', 'feature_39', 'feature_40', 'feature_41', 'feature_42', 'feature_43', 'feature_44', 'feature_45', 'feature_46', 'feature_47', 'feature_48', 'feature_49', 'feature_50', 'feature_51', 'feature_52', 'feature_53', 'feature_54', 'feature_55', 'feature_56', 'feature_57', 'feature_58', 'feature_59', 'feature_60', 'feature_61', 'feature_62', 'feature_63', 'feature_64', 'feature_65', 'feature_66', 'feature_67', 'feature_68', 'feature_69', 'feature_70', 'fea

### Schema Validation

In [9]:
%%writefile utility.py

import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


### Write YAML file

In [10]:
%%writefile embeddings.yaml
file_type: csv
dataset_name: COVID_19_Image_Embeddings
file_name: embeddings
table_name: COVID_19_Image_Embeddings
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - site_id
    - feature_0
    - feature_1
    - feature_2
    - feature_3
    - feature_4
    - feature_5

Writing embeddings.yaml


In [11]:
file1 = open("embeddings.yaml", "a")
for i in range(6, 1024):
    L = '    - feature_' + str(i) + '\n'
    file1.writelines(L)
file1.close()

In [12]:
import utility as util
config_data = util.read_config_file("embeddings.yaml")

In [13]:
# inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'COVID_19_Image_Embeddings',
 'file_name': 'embeddings',
 'table_name': 'COVID_19_Image_Embeddings',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['site_id',
  'feature_0',
  'feature_1',
  'feature_2',
  'feature_3',
  'feature_4',
  'feature_5',
  'feature_6',
  'feature_7',
  'feature_8',
  'feature_9',
  'feature_10',
  'feature_11',
  'feature_12',
  'feature_13',
  'feature_14',
  'feature_15',
  'feature_16',
  'feature_17',
  'feature_18',
  'feature_19',
  'feature_20',
  'feature_21',
  'feature_22',
  'feature_23',
  'feature_24',
  'feature_25',
  'feature_26',
  'feature_27',
  'feature_28',
  'feature_29',
  'feature_30',
  'feature_31',
  'feature_32',
  'feature_33',
  'feature_34',
  'feature_35',
  'feature_36',
  'feature_37',
  'feature_38',
  'feature_39',
  'feature_40',
  'feature_41',
  'feature_42',
  'feature_43',
  'feature_44',
  'feature_45',
  'feature_46',
  'feature_47',

In [14]:
config_data['inbound_delimiter']

','

In [15]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file)
df.head()

Unnamed: 0,site_id,feature_0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,feature_7,feature_8,...,feature_1014,feature_1015,feature_1016,feature_1017,feature_1018,feature_1019,feature_1020,feature_1021,feature_1022,feature_1023
0,HRCE-1_10_AA02_1,2.355969,-0.058361,-0.169764,-0.316499,-0.891334,0.581174,-0.284587,-0.279198,-0.146575,...,0.391186,0.786199,-1.739626,-1.317543,-1.208275,-0.507439,-0.317298,0.285018,-0.091285,-1.553895
1,HRCE-1_10_AA02_2,2.325652,-0.202519,-0.296017,-0.481136,-0.641461,0.702847,0.334191,-0.077498,-0.314538,...,0.638674,1.051621,-1.355659,-1.28521,-1.341911,-0.271349,-0.157707,0.081128,-0.447174,-1.614872
2,HRCE-1_10_AA02_3,2.207082,-0.379794,-0.365562,-0.196667,-0.799039,0.735813,0.081227,-0.393452,0.066324,...,0.552127,0.775428,-1.616731,-0.868382,-1.334486,-0.774456,-0.36325,0.161023,-0.066745,-1.325545
3,HRCE-1_10_AA02_4,2.452741,0.050658,-0.444642,-0.309758,-0.7249,0.658327,-0.128331,-0.164759,0.279105,...,0.689145,1.167066,-1.275786,-2.088455,-1.231964,-0.364454,-0.135066,0.49301,-0.54468,-1.29015
4,HRCE-1_10_AA03_1,2.106868,0.032095,-0.175542,-0.808618,-0.806217,0.72134,-0.213196,-0.187649,-0.070294,...,0.385696,0.503924,-1.192554,-1.123965,-1.323457,-0.359761,-0.10034,0.185773,-0.394267,-1.71276


In [16]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 305520 entries, 0 to 305519
Columns: 1025 entries, site_id to feature_1023
dtypes: float64(1024), object(1)
memory usage: 2.3+ GB


In [22]:
# get the total number of rows and columns
num_rows = len(df)
num_cols = len(df.columns)

# print the summary
print(f"Total number of rows: {num_rows}")
print(f"Total number of columns: {num_cols}")

Total number of rows: 305520
Total number of columns: 1025


In [18]:
import datetime
import csv
import gzip

# Write csv in gz format in pipe separated text file (|)
dask_df.to_csv('embeddings.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True)

['/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/00.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/01.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/02.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/03.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/04.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/05.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/06.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/07.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/08.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/09.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddings.csv.gz/10.part',
 '/Users/anastasiia/repos/data-glacier-internship/Week06/embeddin

In [19]:
# number of files in gz format folder
parties = os.listdir('embeddings.csv.gz/')
for part in parties:
    print(part)

26.part
30.part
47.part
10.part
06.part
51.part
50.part
07.part
11.part
46.part
31.part
27.part
41.part
16.part
00.part
20.part
36.part
37.part
21.part
01.part
17.part
40.part
02.part
14.part
43.part
38.part
18.part
34.part
22.part
23.part
35.part
19.part
39.part
42.part
15.part
03.part
08.part
49.part
32.part
24.part
04.part
12.part
45.part
28.part
29.part
44.part
13.part
05.part
52.part
25.part
33.part
48.part
09.part


In [20]:
# size of the gz format folder
os.path.getsize('embeddings.csv.gz/')

1760