In [1]:
import os
import time

In [2]:
#Size of the file
os.path.getsize('transactions_test.csv')

3488002253

In [3]:
#Dask reading process
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('transactions_test.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.03997302055358887 sec


In [4]:
#Normal reading process of the file
import pandas as pd
start = time.time()
df_sample = pd.read_csv("transactions_test.csv",delimiter=',')
end = time.time()
print("Read csv with panda: ",(end-start),"sec")
#df_sample.head(10)

Read csv with panda:  22.656229257583618 sec


In [5]:
#Modin and Ray reading process
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('transactions_test.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2022-09-11 10:07:14,418	INFO services.py:1456 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


Read csv with modin and ray:  37.67683672904968 sec


In [10]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: transactions_test
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - t_dat
    - customer_id
    - article_id
    - Price
    - Sales_channel_id

Overwriting file.yaml


In [11]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [12]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [13]:
config_data['inbound_delimiter']

','

In [14]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'transactions_test',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['t_dat',
  'customer_id',
  'article_id',
  'Price',
  'Sales_channel_id']}

In [15]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,0.050831,2
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,0.030492,2
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,0.015237,2
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,0.016932,2
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,0.016932,2


In [16]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [17]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['t_dat', 'customer_id', 'article_id', 'price', 'sales_channel_id'], dtype='object')
columns of YAML are: ['t_dat', 'customer_id', 'article_id', 'Price', 'Sales_channel_id']


In [18]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation passed
col validation passed


In [8]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('transactions_test.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('transactions_test.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:/Users/schou/Documents/transactions_test.csv.gz\\00.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\01.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\02.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\03.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\04.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\05.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\06.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\07.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\08.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\09.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\10.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\11.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\12.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\13.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\14.part',
 'C:/Users/schou/Documents/transactions_test.csv.gz\\15

In [9]:
#number of files in gz format folder
import os
entries = os.listdir('transactions_test.csv.gz/')
for entry in entries:
    print(entry)

00.part
01.part
02.part
03.part
04.part
05.part
06.part
07.part
08.part
09.part
10.part
11.part
12.part
13.part
14.part
15.part
16.part
17.part
18.part
19.part
20.part
21.part
22.part
23.part
24.part
25.part
26.part
27.part
28.part
29.part
30.part
31.part
32.part
33.part
34.part
35.part
36.part
37.part
38.part
39.part
40.part
41.part
42.part
43.part
44.part
45.part
46.part
47.part
48.part
49.part
50.part
51.part
52.part
53.part
54.part


In [10]:
#size of the gz format folder
os.path.getsize('transactions_test.csv.gz')

8192