# File ingestion and schema validation

## Task:
- Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)
- Read the file ( Present approach of reading the file )
- Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency
- Perform basic validation on data columns : eg: remove special character , white spaces from the col name
- As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML
- Validate number of columns and column name of ingested file with YAML.
- Write the file in pipe separated text file (|) in gz format.
- Create a summary of the file:
    - Total number of rows,
    - total number of columns
    - file size

# Read the data file

In [1]:
import os
import time

In [2]:
# get the size of data file
size = os.path.getsize("C:/Users/chitr/Documents/DataGlacier/Data/Books_rating.csv")
print(f"size of the data file : {size}")

size of the data file : 2859504349


## Read the data with Pandas

In [3]:
import pandas as pd
start = time.time()
df = pd.read_csv('C:/Users/chitr/Documents/DataGlacier/Data/Books_rating.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  35.40927815437317 sec


## Read the data with Dask

In [4]:
import dask.dataframe as dd
start = time.time()
df = dd.read_csv('C:/Users/chitr/Documents/DataGlacier/Data/Books_rating.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.8652992248535156 sec


## Read the data with Modin and Ray

In [5]:
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('C:/Users/chitr/Documents/DataGlacier/Data/Books_rating.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2023-02-06 10:27:23,369	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8266 [39m[22m


Read csv with modin and ray:  61.704468965530396 sec


## Read the data with Vaex

In [6]:
import vaex
start = time.time()
df = vaex.open('C:/Users/chitr/Documents/DataGlacier/Data/Books_rating.csv')
end = time.time()
print("Read csv with vaex: ",(end-start),"sec")

Read csv with vaex:  6.6056108474731445 sec


## Here Dask is better than Pandas, Modin and Ray, Vaex with the least reading time of aprroximate 0.87 sec

In [31]:
import dask.dataframe as dd
df = dd.read_csv('C:/Users/chitr/Documents/DataGlacier/Data/Books_rating.csv', delimiter = ',')

In [32]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 10 entries, Id to review/text
dtypes: object(6), float64(2), int64(2)

In [33]:
print("Number of columns : ", len(df.columns))

Number of columns :  10


In [34]:
df.columns

Index(['Id', 'Title', 'Price', 'User_id', 'profileName', 'review/helpfulness',
       'review/score', 'review/time', 'review/summary', 'review/text'],
      dtype='object')

In [38]:
df.columns = df.columns.str.replace('[ ,/,&,#,@,_,-]','')
df.columns



Index(['Id', 'Title', 'Price', 'Userid', 'profileName', 'reviewhelpfulness',
       'reviewscore', 'reviewtime', 'reviewsummary', 'reviewtext'],
      dtype='object')

In [39]:
df.columns = df.columns.str.lower()
df.columns

Index(['id', 'title', 'price', 'userid', 'profilename', 'reviewhelpfulness',
       'reviewscore', 'reviewtime', 'reviewsummary', 'reviewtext'],
      dtype='object')

# Validation

In [48]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [49]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [52]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Books_Ratings
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - id
    - title
    - price
    - userid
    - profilename
    - reviewhelpfulness
    - reviewscore
    - reviewtime
    - reviewsummary
    - reviewtext

Overwriting store.yaml


In [1]:
# Read config file
import utility as util
config_data = util.read_config_file("store.yaml")

NameError: name 'yaml' is not defined