**Task: File Ingestion and Schema validation**

Take any csv/text file of 2+ GB of your choice.

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

*  Total number of rows
*   total number of columns
*   file size



# Amazon Books Reviews dataset used from kaggle 
# (10 columns)

In [1]:
import os
import time

# Size of the file

In [2]:
os.path.getsize('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv')

2859504349

**Read the date with Pandas**

In [3]:
import pandas as pd

start = time.time()
pd_data = pd.read_csv('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv')
end = time.time()

print("Read data with Pandas: ",(end-start),"sec")


Read data with Pandas:  59.0678277015686 sec


**Read the data with Dask** 

In [4]:
from dask import dataframe as dd

start = time.time()
dask_data = dd.read_csv('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv')
end = time.time()

print("Read data with Dask: ",(end-start),"sec")


Read data with Dask:  0.1530764102935791 sec


**Read the data with Modin and Ray**

In [31]:
# os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray
import modin.pandas as pd

start = time.time()
modin_data = pd.read_csv('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv')
end = time.time()

print("Read data with Modin: ",(end-start),"sec")



# Dask is better than Pandas, Modin and ray with least reading time of 0.15

In [38]:
from dask import dataframe as dd
d_data= dd.read_csv('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv')


In [39]:
d_data.info()


<class 'dask.dataframe.core.DataFrame'>
Columns: 10 entries, Id to review/text
dtypes: object(6), float64(2), int64(2)

**remove underscores**

In [41]:

d_data.columns=d_data.columns.str.replace('[_,@,&]','')





In [42]:
#To remove white space from columns
d_data.columns = d_data.columns.str.replace(' ', '')

In [43]:
d_data.columns

Index(['Id', 'Title', 'Price', 'Userid', 'profileName', 'review/helpfulness',
       'review/score', 'review/time', 'review/summary', 'review/text'],
      dtype='object')

# Validation

**Create YAML file**

**File Reading**

In [44]:
%%writefile Testutility.py
import yaml
import logging
import subprocess
import yaml
import datetime
import gc
import re 
import pandas as pd
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string


def col_header_val(df, table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting Testutility.py


In [45]:
%%writefile books.yaml
file_type: csv
dataset_name: file
file_name: books_rating
table_name : edsurv
inbound_delimiter : ","
outbound_delimiter : "|"
skip_leading_rows: 1
columns: 
    - Id
    - Title
    - Price
    - Userid
    - profileName
    - review/helpfulness
    - review/score
    - review/time
    - review/summary
    - review/text


Overwriting books.yaml


**Read config file**

In [46]:
import Testutility as util

data_config =util.read_config_file("books.yaml")

**Data of config file**

In [47]:

data_config

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'books_rating',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Id',
  'Title',
  'Price',
  'Userid',
  'profileName',
  'review/helpfulness',
  'review/score',
  'review/time',
  'review/summary',
  'review/text']}

**Normal reading process of the file**

In [61]:

from dask import dataframe as dd
df= dd.read_csv('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv')


**Reading the file using config file**

In [62]:

file_type = data_config['file_type']
source_file = "./" + data_config['file_name'] + f'.{file_type}'
df_data = pd.read_csv(source_file,data_config['inbound_delimiter'])
df_data.head()


ModuleNotFoundError: No module named 'ray'

**Validate the header of the file**

In [59]:

util.col_header_val(df,data_config)


AttributeError: 'DataFrame' object has no attribute 'reindex'

In [63]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,data_config['columns'])

columns of files are: Index(['Id', 'Title', 'Price', 'User_id', 'profileName', 'review/helpfulness',
       'review/score', 'review/time', 'review/summary', 'review/text'],
      dtype='object')
columns of YAML are: ['Id', 'Title', 'Price', 'Userid', 'profileName', 'review/helpfulness', 'review/score', 'review/time', 'review/summary', 'review/text']


In [64]:
if util.col_header_val(df,data_config)==0:
    print("validation failed")
else:
    print("col validation passed")

AttributeError: 'DataFrame' object has no attribute 'reindex'

In [67]:
import csv
import gzip
import datetime

from dask import dataframe as dd
df = dd.read_csv('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Books_rating.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+--------+--------+----------+
| Column | Found  | Expected |
+--------+--------+----------+
| Id     | object | int64    |
+--------+--------+----------+

The following columns also raised exceptions on conversion:

- Id
  ValueError("invalid literal for int() with base 10: 'B000L4056E'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'Id': 'object'}

to the call to `read_csv`/`read_table`.

**number of files in gz format folder**

**size of the gz format folder**

In [66]:

partitions = os.listdir('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv.gz')
for partition in partitions:
    print(partition)


os.path.getsize('E:/solo projects/Data_Glacier_virtual_internship/Data_Glacier_virtual_internship/Week 6/Books_rating.csv.gz')    

0