In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import time # runtime 

In [2]:
#Size of the file
import os
print("File size: ",round((os.path.getsize('./Data/yelp_academic_dataset_review.csv'))/10**9,2) , "gb")

File size:  4.85 gb


### Reading dataset using Pandas

In [3]:
start = time.time()
df = pd.read_csv('./Data/yelp_academic_dataset_review.csv')
end = time.time()
p_time = round(end-start,5)
print("Read csv with pandas: ",p_time,"sec")

Read csv with pandas:  42.5321 sec


### Reading dataset using Dask

In [4]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('./Data/yelp_academic_dataset_review.csv')
end = time.time()
d_time = round(end-start,5)
print("Read csv with dask: ",d_time,"sec")

Read csv with dask:  0.015 sec


### Reading dataset with Modin and Ray

In [5]:
import modin.pandas as mod
import ray
ray.shutdown()
ray.init()
start = time.time()
df = mod.read_csv('./Data/yelp_academic_dataset_review.csv')
end = time.time()
m_r_time = round(end-start,5)
print("Read csv with modin and ray: ",m_r_time,"sec")

2023-06-19 12:33:51,879	INFO worker.py:1636 -- Started a local Ray instance.


Read csv with modin and ray:  81.88658 sec


In [6]:
print('Dask is much faster than Pandas and modin, and ray. with reading time of {}'.format(d_time))

Dask is much faster than Pandas and modin, and ray. with reading time of 0.015


In [7]:
df = dd.read_csv('./Data/yelp_academic_dataset_review.csv',delimiter=',')
df.info()


<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, user_id to stars
dtypes: object(5), float64(1), int64(3)

In [8]:
# Number of rows and columns
print("length of rows:{} and length of columns:{}".format(len(df.index),len(df.columns)))

length of rows:6685900 and length of columns:9


In [9]:
# remove special character and white spaces
df.columns=df.columns.str.replace('[#,@,&]','')
df.columns = df.columns.str.replace(' ', '')
df.head()



Unnamed: 0,user_id,text,date,review_id,business_id,funny,cool,useful,stars
0,b'hG7b0MtEbXx5QzbzE6C_VA',b'Total bill for this horrible service? Over $...,b'2013-05-07 04:34:36',b'Q1sbwvVQXV2734tPgoKj4Q',b'ujmEBvifdJM6h6RLv4wQIg',1,0,6,1.0
1,b'yXQM5uF2jS6es16SJzNHfg',"b""I *adore* Travis at the Hard Rock's new Kell...",b'2017-01-14 21:30:33',b'GJXCdrto3ASJOqKeVWPi6Q',b'NZnhc2sEQy3RmzKTZnqtwQ',0,0,0,5.0
2,b'n6-Gk65cPZL6Uz8qRm3NYw',"b""I have to say that this office really has it...",b'2016-11-09 20:09:03',b'2TzJjDVDEuAW6MR5Vuc1ug',b'WTqjgwHlXbSFevF32_DJVw',0,0,3,5.0
3,b'dacAIZ6fTM6mqwW5uxkskg',"b""Went in for a lunch. Steak sandwich was deli...",b'2018-01-09 20:56:38',b'yi0R0Ugj_xUx_Nek0-_Qig',b'ikCg8xy5JIg_NGPx-MSIDA',0,0,0,5.0
4,b'ssoyf2_x0EQMed6fgHeMyQ',b'Today was my second out of three sessions I ...,b'2018-01-30 23:07:38',b'11a8sVPMUFtaC7_ABRkmtw',b'b1b1eb3uo-w561D0ZfCEiQ',0,0,7,1.0


In [10]:
columns = df.columns
columns

Index(['user_id', 'text', 'date', 'review_id', 'business_id', 'funny', 'cool',
       'useful', 'stars'],
      dtype='object')

In [11]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


In [12]:
%%writefile file.yaml
columns: 
    - user_id
    - text
    - date
    - review_id
    - business_id
    - funny
    - cool
    - useful
    - stars
dataset_name: yelp_academic_dataset_review
dtypes:
    user_id: string
    text: string
    date: string
    review_id: string
    business_id: string
    funny: float32
    cool: float32
    useful: float32
    stars: float32
file_type: csv

file_name: data
table name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1


Writing file.yaml


In [13]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [14]:
#inspecting data of config file
config_data

{'columns': ['user_id',
  'text',
  'date',
  'review_id',
  'business_id',
  'funny',
  'cool',
  'useful',
  'stars'],
 'dataset_name': 'yelp_academic_dataset_review',
 'dtypes': {'user_id': 'string',
  'text': 'string',
  'date': 'string',
  'review_id': 'string',
  'business_id': 'string',
  'funny': 'float32',
  'cool': 'float32',
  'useful': 'float32',
  'stars': 'float32'},
 'file_type': 'csv',
 'file_name': 'data',
 'table name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1}

In [15]:
import pandas as pd
# read the file using config file
file_type = config_data['file_type']
source_file = "./"+config_data['file_name']+"/"+config_data['dataset_name'] + f'.{file_type}'
datatypes = config_data['dtypes']

# Reading using Pandas
df = pd.read_csv(source_file, delimiter=config_data['inbound_delimiter'], dtype=datatypes)
df.head()
     

Unnamed: 0,user_id,text,date,review_id,business_id,funny,cool,useful,stars
0,b'hG7b0MtEbXx5QzbzE6C_VA',b'Total bill for this horrible service? Over $...,b'2013-05-07 04:34:36',b'Q1sbwvVQXV2734tPgoKj4Q',b'ujmEBvifdJM6h6RLv4wQIg',1.0,0.0,6.0,1.0
1,b'yXQM5uF2jS6es16SJzNHfg',"b""I *adore* Travis at the Hard Rock's new Kell...",b'2017-01-14 21:30:33',b'GJXCdrto3ASJOqKeVWPi6Q',b'NZnhc2sEQy3RmzKTZnqtwQ',0.0,0.0,0.0,5.0
2,b'n6-Gk65cPZL6Uz8qRm3NYw',"b""I have to say that this office really has it...",b'2016-11-09 20:09:03',b'2TzJjDVDEuAW6MR5Vuc1ug',b'WTqjgwHlXbSFevF32_DJVw',0.0,0.0,3.0,5.0
3,b'dacAIZ6fTM6mqwW5uxkskg',"b""Went in for a lunch. Steak sandwich was deli...",b'2018-01-09 20:56:38',b'yi0R0Ugj_xUx_Nek0-_Qig',b'ikCg8xy5JIg_NGPx-MSIDA',0.0,0.0,0.0,5.0
4,b'ssoyf2_x0EQMed6fgHeMyQ',b'Today was my second out of three sessions I ...,b'2018-01-30 23:07:38',b'11a8sVPMUFtaC7_ABRkmtw',b'b1b1eb3uo-w561D0ZfCEiQ',0.0,0.0,7.0,1.0


In [16]:
#validating the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [17]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['user_id', 'text', 'date', 'review_id', 'business_id', 'funny', 'cool',
       'useful', 'stars'],
      dtype='object')
columns of YAML are: ['user_id', 'text', 'date', 'review_id', 'business_id', 'funny', 'cool', 'useful', 'stars']


In [18]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [19]:
import gzip

# Write the file in pipe-separated text file (|) in gz format
output_file = "output_data.txt"
df.to_csv(output_file, sep="|", index=False)

# Compress the file to gz format
compressed_file = "output_data.txt.gz"
with open(output_file, "rt") as f_in:
    with gzip.open(compressed_file, "wt") as f_out:
        f_out.writelines(f_in)