In [5]:
import pandas as pd
import csv
import numpy as np
import modin.pandas as mpd
import ray
from ray.data import read_csv
import dask.dataframe as dd
import yaml
import os
import re
import logging



# DIFFERENT METHODS OF FILE READING:
- Pandas
- Python CSV
- Dask


## Reading Data Using Pandas

In [9]:
%%time
Pandas_df = pd.read_csv('recommendations.csv')

CPU times: total: 1min 7s
Wall time: 1min 35s


## Reading Data Using Python CSV

In [6]:
%%time
with open('recommendations.csv', 'r') as file:
    csv_df = csv.reader(file)
   

CPU times: total: 0 ns
Wall time: 0 ns


## Reading Data With Dask

In [10]:
%%time
Dask_df = dd.read_csv('recommendations.csv')

CPU times: total: 188 ms
Wall time: 346 ms


CSV from python module is the best for reading CSV file when compared to Pandas and Dask. While Pandas CPU times: total: 1min 7s
Wall time: 1min 35s, Dask's CPU times: total: 188 ms Wall time: 346 ms, Python CSV's CPU times: total: 0 ns Wall time: 0 ns

#  CREATE  YAML FILE

In [18]:
%%writefile schema_file.yaml
file_type: csv
file_name: recommendations
inbound_delimiter: ","
outbound_delimiter: "|"
columns: 
    - app_id
    - helpful
    - funny
    - date
    - is_recommended
    - hours
    - user_id
    - review_id

Overwriting schema_file.yaml


# CREATE UTILITY FILE

In [19]:
%%writefile utility_file.py

def read_yaml_file(file):
    with open(file, 'r') as f:
        try:
            return yaml.safe_load(f)
        except yaml.YAMLError as error:
            logging.error(error)

            
def validate(data, config_data):
    if 'columns' not in config_data:
        print('Error: Missing Columns Dictionary!!!')
        return 0
    
    config_cols = sorted(config_data['columns'])
    df_cols = sorted(data.columns)
    
    data.columns = [x.strip().lower().replace(' ', '_') for x in data.columns]
    
    if len(config_cols) != len(df_cols) or list(config_cols) != list(df_cols):
        print('Error: Invalid number of columns or column names not matching as per config file.')
        return 0
    
    print('Validation Successful')
    return 1


Overwriting utility_file.py


In [20]:
import utility_file as util

config_data = util.read_yaml_file(os.path.join(os.getcwd(), 'schema_file.yaml'))
config_data

{'file_type': 'csv',
 'file_name': 'recommendations',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns': ['app_id',
  'helpful',
  'funny',
  'date',
  'is_recommended',
  'hours',
  'user_id',
  'review_id']}

In [21]:
Pandas_df.head()

Unnamed: 0,app_id,helpful,funny,date,is_recommended,hours,user_id,review_id
0,975370,0,0,2022-12-12,True,36.3,51580,0
1,304390,4,0,2017-02-17,False,11.5,2586,1
2,1085660,2,0,2019-11-17,True,336.5,253880,2
3,703080,0,0,2022-09-23,True,27.4,259432,3
4,526870,0,0,2021-01-10,True,7.9,23869,4


# VALIDATION

In [22]:
util.validate(Pandas_df, config_data)

Success


1

In [23]:
Pandas_df.columns

Index(['app_id', 'helpful', 'funny', 'date', 'is_recommended', 'hours',
       'user_id', 'review_id'],
      dtype='object')

In [28]:
config_data['columns']

['app_id',
 'helpful',
 'funny',
 'date',
 'is_recommended',
 'hours',
 'user_id',
 'review_id']

# SAVE IN 'GZ' FORMAT

In [30]:
outbound_delimeter= config_data['outbound_delimiter']
outbound_delimeter

'|'

In [31]:
Pandas_df.to_csv('final_recommendations_file.csv.gz', 
                 sep=outbound_delimeter, 
                 compression='gzip', 
                 index=False)

# DATASET SUMMARY

In [32]:
print('No. of Columns: {}'.format(len(Pandas_df.columns)))
print('No. of Rows: {}'.format(len(Pandas_df)))
print('Final compressed file size: {} GB'.format((os.stat('final_recommendations_file.csv.gz')).st_size / (1024 * 1024 * 1024)))

No. of Columns: 8
No. of Rows: 41154794
Final compressed file size: 0.5544608971104026 GB


The file size was reduced from 1.88 GB to 0.55 GB after compression