# Week 6: Data Ingestion Pipeline

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file: Total number of rows, total number of columns file size


## Read File

In [2]:
import pandas as pd
url = 'https://raw.githubusercontent.com/agbaysa/dataglacier_week6/main/train2.csv'
df = pd.read_csv(url)
df.head(3)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S


In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 57024 entries, 0 to 57023
Data columns (total 12 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  57024 non-null  int64  
 1   Survived     57024 non-null  int64  
 2   Pclass       57024 non-null  int64  
 3   Name         57024 non-null  object 
 4   Sex          57024 non-null  object 
 5   Age          45696 non-null  float64
 6   SibSp        57024 non-null  int64  
 7   Parch        57024 non-null  int64  
 8   Ticket       57024 non-null  object 
 9   Fare         57024 non-null  float64
 10  Cabin        13056 non-null  object 
 11  Embarked     56896 non-null  object 
dtypes: float64(2), int64(5), object(5)
memory usage: 5.2+ MB


## Test Dask

In [4]:
from dask import dataframe as dd
import time

start = time.time()
dask_df = dd.read_csv(url)
end = time.time()
print('Using Dask: ',(end-start),'secs')

Using Dask:  0.47667980194091797 secs


## Test Pandas

In [5]:
import pandas as pd

start = time.time()
df = pd.read_csv(url)
end = time.time()
print('Usng Pandas: ',(end-start),'secs')

Usng Pandas:  0.2733190059661865 secs


In [16]:
# !pip install modin[ray] # Install Modin dependencies and Ray to run on Ray
# !pip install botocore
# !pip install utility
!pip install utils

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting utils
  Downloading utils-1.0.1-py2.py3-none-any.whl (21 kB)
Installing collected packages: utils
Successfully installed utils-1.0.1


## Test Modin and Ray

In [6]:
import modin.pandas as pd
import ray

ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv(url)
end = time.time()
print('Using Modin and Ray',(end-start), 'secs')

2023-01-16 23:34:41,917	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


Using Modin and Ray 1.9231727123260498 secs


## Data Cleaning

In [7]:
import numpy as np

# Replace nans with mean
df['Age'] = df['Age'].replace(np.nan, df['Age'].mean())
df['Fare'] = df['Fare'].replace(np.nan, df['Fare'].mean())

# Impute nan
df['Embarked'] = df['Embarked'].replace(np.nan, 'Q')

In [8]:
# Check for nans
df.isna().sum()

PassengerId        0
Survived           0
Pclass             0
Name               0
Sex                0
Age                0
SibSp              0
Parch              0
Ticket             0
Fare               0
Cabin          43968
Embarked           0
dtype: int64

## Data Ingestion

In [9]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [10]:
%%writefile file.yaml
file_type: csv
dataset_name: file
file_name: train2_final
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - PassengerId
    - Survived
    - Pclass
    - Name
    - Sex
    - Age
    - SibSp
    - Parch
    - Ticket
    - Fare
    - Cabin
    - Embarked

Overwriting file.yaml


In [11]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [12]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'train2_final',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['PassengerId',
  'Survived',
  'Pclass',
  'Name',
  'Sex',
  'Age',
  'SibSp',
  'Parch',
  'Ticket',
  'Fare',
  'Cabin',
  'Embarked']}

In [15]:
# Reading config file
import utility as util
config_data = util.read_config_file('file.yaml')

ModuleNotFoundError: ignored

In [15]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [17]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [18]:
config_data['inbound_delimiter']

','

In [19]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'train2_final',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['PassengerId',
  'Survived',
  'Pclass',
  'Name',
  'Sex',
  'Age',
  'SibSp',
  'Parch',
  'Ticket',
  'Fare',
  'Cabin',
  'Embarked']}

In [22]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("train2.csv",delimiter=',')
df_sample.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [20]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [23]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [24]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['passengerid', 'survived', 'pclass', 'name', 'sex', 'age', 'sibsp',
       'parch', 'ticket', 'fare', 'cabin', 'embarked'],
      dtype='object')
columns of YAML are: ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']


In [25]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [27]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('train2.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('train2_final.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          lineterminator='\n')

['/content/train2_final.csv.gz/0.part']

In [28]:
#number of files in gz format folder
import os
entries = os.listdir()
for entry in entries:
    print(entry)

.config
__pycache__
.ipynb_checkpoints
testutility.py
data.yaml
train2_final.csv.gz
file.yaml
train2.csv
sample_data
