<a href="https://colab.research.google.com/github/iammustafatz/ETL-using-Luigi/blob/main/ETL_using_Luigi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Installation of Luigi

In [35]:
!pip install luigi

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


##Cloning the git repository to extract data

In [32]:
!git clone https://github.com/iammustafatz/ETL-using-Luigi.git

Cloning into 'ETL-using-Luigi'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), done.


##Luigi Pipeline

In [36]:
#importing libraries
import pandas as pd
import numpy as np
import datetime as dt
import luigi
from pathlib import Path
import sqlite3
from sqlalchemy import create_engine

#creating a sqlite db
sqlite3.connect('my.db')
#creating a sql engine
engine=create_engine('sqlite:////content/my.db')

#importing file paths
file_path='/content/ETL-using-Luigi/train.csv' # path to fetch data
OUTPUT_PATH = Path('/content')  #path to store data

#function to impute numerical values where null values are present
def impute_num(x):
  numerical_cols=x.select_dtypes(include=np.number).columns.tolist() #extracting all numerical cols from the dataframe
  numerical_cols_null_counter=x[numerical_cols].isna().sum().sort_values(ascending=False) #counting the no. of null values in those numerical cols
  for a,b in numerical_cols_null_counter.items():  #col_name is taken as a and null value counts is taken as b
    #imputing median value in-place of null values, for those null numerical cols  
      if b>0:
        m=np.nanmedian(x[a])
        x[a].fillna(m,inplace=True)
      else:
        continue

#function to impute categorical values where null values are present
def impute_cat(x):
  categorical_cols=x.select_dtypes(exclude=np.number).columns.tolist() #extracting all categorical cols from the dataframe
  categorical_cols_null_counter=x[categorical_cols].isna().sum().sort_values(ascending=False) #counting the no. of null values in those categorical cols
  for a,b in categorical_cols_null_counter.items():  #col_name is taken as a and null value counts is taken as b
    #imputing mode value in-place of null values, for those null categorical cols  
    if b>0:
        m=x[a].mode()[0]
        x[a].fillna(m,inplace=True)
    else:
      continue

class Extract(luigi.Task):
    
    #date parameter - to handle filename with date in luigi tasks
    date = luigi.DateParameter(default=dt.date.today())

    #path of the extracted file
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'extracted_train_{self.date}.csv')

    def run(self):
      #reading the data from input path
      data=pd.read_csv(file_path)
      #stroring extracted data as csv in output path
      data.to_csv(self.output().path, index=False)

        
class Transform(luigi.Task):
    
    date = luigi.DateParameter(default=dt.date.today())
    
    def requires(self):
        # Passing the luigi paramater back to upstream task
        return Extract(self.date) 

    #path of the transformed file       
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'transformed_train_{self.date}.csv')

    def run(self):
      #reading the extracted data
      extracted_data=pd.read_csv(self.input().path)
      #getting the no. of null values in each column
      total = extracted_data.isnull().sum().sort_values(ascending=False)
      #getting percentage of null values counts in each column
      percent = ((extracted_data.isnull().sum()/extracted_data.isnull().count())*100).sort_values(ascending=False)
      #merging total and percent data
      missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
      #getting columns that have more than 80% of null values and deleting them
      nd=missing_data[missing_data['Percent']>80]
      a=[]
      for i in nd.index:
        a+=[i]
      extracted_data.drop(a,axis=1,inplace=True)
      #calling the impute_num function
      impute_num(extracted_data)
      #imputing 'None' value in 'MasVnrType' column, as it has value named 'None'
      extracted_data['MasVnrType'].fillna('None',inplace=True)
      #calling the impute_cat function
      impute_cat(extracted_data)
      #stroring transformed data as csv in output path
      extracted_data.to_csv(self.output().path, index=False)

class Load(luigi.Task):
    
    date = luigi.DateParameter(default=dt.date.today())
    
    def requires(self):
        # Passing the luigi paramater back to upstream task
        return Transform(self.date) 

    #path of the load/output file       
    def output(self):
        pass

    def run(self):
      #reading the transformed data
      transformed_data=pd.read_csv(self.input().path)
      #loading output data as csv in output path
      transformed_data.to_sql('output',engine,index=False,if_exists='replace') 

#building luigi task and calling Load class
luigi.build([Load()], local_scheduler=True)

DEBUG: Checking if Load(date=2022-11-01) is complete
DEBUG:luigi-interface:Checking if Load(date=2022-11-01) is complete
  is_complete = task.complete()
DEBUG: Checking if Transform(date=2022-11-01) is complete
DEBUG:luigi-interface:Checking if Transform(date=2022-11-01) is complete
INFO: Informed scheduler that task   Load_2022_11_01_215ad7575f   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   Load_2022_11_01_215ad7575f   has status   PENDING
DEBUG: Checking if Extract(date=2022-11-01) is complete
DEBUG:luigi-interface:Checking if Extract(date=2022-11-01) is complete
INFO: Informed scheduler that task   Transform_2022_11_01_215ad7575f   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   Transform_2022_11_01_215ad7575f   has status   PENDING
INFO: Informed scheduler that task   Extract_2022_11_01_215ad7575f   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   Extract_2022_11_01_215ad7575f   has status   PENDING
INFO: 

True

##Checking the output table is in my.db with zero-null values

In [37]:
df=pd.read_sql('output',engine)
print(df.isna().sum().to_string())

Id               0
MSSubClass       0
MSZoning         0
LotFrontage      0
LotArea          0
Street           0
LotShape         0
LandContour      0
Utilities        0
LotConfig        0
LandSlope        0
Neighborhood     0
Condition1       0
Condition2       0
BldgType         0
HouseStyle       0
OverallQual      0
OverallCond      0
YearBuilt        0
YearRemodAdd     0
RoofStyle        0
RoofMatl         0
Exterior1st      0
Exterior2nd      0
MasVnrType       0
MasVnrArea       0
ExterQual        0
ExterCond        0
Foundation       0
BsmtQual         0
BsmtCond         0
BsmtExposure     0
BsmtFinType1     0
BsmtFinSF1       0
BsmtFinType2     0
BsmtFinSF2       0
BsmtUnfSF        0
TotalBsmtSF      0
Heating          0
HeatingQC        0
CentralAir       0
Electrical       0
1stFlrSF         0
2ndFlrSF         0
LowQualFinSF     0
GrLivArea        0
BsmtFullBath     0
BsmtHalfBath     0
FullBath         0
HalfBath         0
BedroomAbvGr     0
KitchenAbvGr     0
KitchenQual 