<h1> Data Ingestion Pipeline

The aim of this project is to create a Data Ingestion Pipeline with a config_file (in this case yaml extension) for performing basic validations of incoming Data.
I created a file called "functions.py" where I defined the functions of the validations.
It is supposed to deal with large datasets.

In this example I am using a Kaggle Dataset called "Riiid Answer Correctness Prediction", you can check it out in this link https://www.kaggle.com/c/riiid-test-answer-prediction/data?select=train.csv.

<h3>About this file:</h3>

Size: 5,45 GB

Rows: + 101.000.000

Columns: 10 


<h4>Import Libraries needed

In [1]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import dask.dataframe as dd

<h4> We will need the config file path

In [2]:
config_path= r'C:\Users\nacho\repositorios\Pipeline'

<h4> Import 'functions.py' file

In [3]:
import functions as fxx

<h4> Load the config file

In [4]:
config_file = fxx.load_config_file('config.yaml',config_path)
config_file

{'data_directory': 'C:\\Users\\nacho\\repositorios\\Pipeline\\data',
 'data_name': 'train.csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns_names': ['time',
  'user',
  'content',
  'task',
  'answer',
  'answer_correctly',
  'elapsedt',
  'expl'],
 'drop_columns': ['row_id', 'content_type_id'],
 'data_type': {'time': 'int64',
  'user': 'int32',
  'content': 'int16',
  'task': 'int16',
  'answer': 'int8',
  'answer_correctly': 'int8',
  'elapsedt': 'float32',
  'expl': 'boolean'},
 'data_clean_folder': 'C:\\Users\\nacho\\repositorios\\Pipeline\\data_clean',
 'clean_data': 'data_clean.gzip'}

<h4> Read de dataset

In [5]:
data = fxx.load_data(config_file)

Data Uploaded from C:\Users\nacho\repositorios\Pipeline\data  name =  train.csv


In [6]:
data.head()

Unnamed: 0,row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation
0,0,0,115,5692,0,1,3,1,,
1,1,56943,115,5716,0,2,2,1,37000.0,False
2,2,118363,115,128,0,0,0,1,55000.0,False
3,3,131167,115,7860,0,3,0,1,19000.0,False
4,4,137965,115,7922,0,4,1,1,11000.0,False


<h4> Drop Irrelevant Features

In [7]:
data = fxx.drop_irrelevant(data, config_file)

In [8]:
len(data.columns)

8

<h4> Performs Headers validations

In [9]:
fxx.headers_validation(data,config_file)

Columns headers validation passed


1

<h4> Reduce the memory usage by the dataset

In [10]:
data = fxx.memory_usage_optimization(data,config_file)

In [11]:
data

Unnamed: 0_level_0,time,user,content,task,answer,answer_correctly,elapsedt,expl
npartitions=92,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,int64,int32,int16,int16,int8,int8,float32,boolean
,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...


<h4> Missing values

In [12]:
data = fxx.drop_missing_values(data)

<h4> Now, save the cleaned data in gz format

In [13]:
fxx.save_clean_data(data,config_file)



('Data saved as ', 'data_clean.gzip')

<h4> Now, I am going to make a different dataset to test this demo

In [13]:
testdata = {
    'city' : ['Delhi', 'Lima', 'Istanbul','Riyadh'],
    'age' : [34, 30, 16,33],
    'Country' : ['India','Peru','Turkey','Saudi Arabia'],
}

In [14]:
testdata = pd.DataFrame(testdata, columns=['city', 'age','Country'])

In [15]:
fxx.headers_validation(testdata,config_file)

Columns length validation failed
You have  3  colummns, you are supposed to read  8 columns


<h4> As you can see, that dataset did not make it throough our validation pipeline