# Summary: 

- **Total number of Rows:**  42,448,764
- **Total number of Columns:** 9
- **File Size:** 5GB

## Dataset Download Link:

https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store?fbclid=IwAR3Rji50q1WtaHD8By7X-_E6WO52Ksj7SQmulpV7-flbmhQKerdWR1M2cdc&select=2019-Oct.csv


## Task:

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

    
# Data Ingestion sample code walkthrough

## 
  > Create a utility file
  
  > Config file creation
  
  > Data ingestion pipeline


# File Reading

In [3]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import yaml


def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Write YAML File

In [4]:
%%writefile ecommerce.yaml
file_type: csv
dataset_name: testfile
file_name: 2019-Nov
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - event_time
    - event_type
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session

Writing ecommerce.yaml


# Read Config File

In [5]:

import testutility as util
config_data = util.read_config_file("ecommerce.yaml")

In [6]:
config_data['columns']

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'user_session']

In [7]:
#inspecting data of config file
config_data

{'columns': ['event_time',
  'event_type',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session'],
 'dataset_name': 'testfile',
 'file_name': '2019-Nov',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

# Import Dataset

In [17]:
import pandas as pd 
import numpy as np 
#!pip install "dask[dataframe]"
import dask.dataframe as dd


ecommerce = dd.read_csv("/content/drive/MyDrive/Week-6-DataIngestionPipeline/2019-Oct.csv", delimiter=',')
ecommerce.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


# Summary about Data

In [18]:
print("The number of rows: ", len(ecommerce))

The number of rows:  42448764


In [19]:
print("The number of columns: ", len(ecommerce.columns))

The number of columns:  9


In [22]:
import os

file_size = os.path.getsize('/content/drive/MyDrive/Week-6-DataIngestionPipeline/2019-Oct.csv')
print("File Size is :", file_size, "bytes")

File Size is : 5668612855 bytes


# Validate the file header

In [23]:
util.col_header_val(ecommerce ,config_data)

column name and column length validation passed


1

In [24]:
print("columns of files are:" ,ecommerce.columns)
print("------------------------------------------------------------------------")
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
------------------------------------------------------------------------
columns of YAML are: ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


# Create a Pipeline

In [27]:
import gzip 

while True:
    if util.col_header_val(ecommerce,config_data)==0:
        print("---------------------------------------------")
        print("Validation Failed! Please, check file columns!")
    else:
        print("Column Validation Passed")
        input = open('/content/drive/MyDrive/Week-6-DataIngestionPipeline/2019-Oct.csv', 'rb')
        s = input.read()
        input.close()
        
        output = gzip.GzipFile('/content/drive/MyDrive/Week-6-DataIngestionPipeline/2019-Oct.csv.gz','wb')
        output.write(s)
        output.close()
        
        #ecommerce.to_csv('ecommerce.gz',compression='gzip')
        print("Your Dataframe has been compressed to a .gzip file in the same folder.")
    break

column name and column length validation passed
Column Validation Passed
Your Dataframe has been compressed to a .gzip file in the same folder.
