# Data Pipeline 

<p>data pipelining, they might be using scikit-learn pipelines create different functions / classes and bring them together using a scikit-learn pipeline to apply on any new dataset and then writing logs to monitor what is happening when the pipelines are being executed</p>

https://www.geeksforgeeks.org/create-a-pipeline-in-pandas/

<p>Then you can go through this link as well for some conceptual understanding of communicating btw two modules (messaging queues) </p> 

https://www.activestate.com/blog/how-to-create-scalable-data-pipelines-with-python/ 


# Pipeline in pandas using df.pipe

In [1]:
# https://github.com/SuperDataWorld/Python/blob/main/Pipe.ipynb
import pandas as pd
df = pd.read_csv('https://raw.githubusercontent.com/SuperDataWorld/Python/main/loan.csv')
df.head(8)

Unnamed: 0,Loan_ID,Gender,Married,Dependents,Education,Self_Employed,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History,Property_Area,Loan_Status
0,LP001002,Male,No,0,Graduate,No,5849,0.0,,360.0,1.0,Urban,Y
1,LP001003,Male,Yes,1,Graduate,No,4583,1508.0,128.0,360.0,1.0,Rural,N
2,LP001005,Male,Yes,0,Graduate,Yes,3000,0.0,66.0,360.0,1.0,Urban,Y
3,LP001006,Male,Yes,0,Not Graduate,No,2583,2358.0,120.0,360.0,1.0,Urban,Y
4,LP001008,Male,No,0,Graduate,No,6000,0.0,141.0,360.0,1.0,Urban,Y
5,LP001011,Male,Yes,2,Graduate,Yes,5417,4196.0,267.0,360.0,1.0,Urban,Y
6,LP001013,Male,Yes,0,Not Graduate,No,2333,1516.0,95.0,360.0,1.0,Urban,Y
7,LP001014,Male,Yes,3+,Graduate,No,3036,2504.0,158.0,360.0,0.0,Semiurban,N


In [2]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 614 entries, 0 to 613
Data columns (total 13 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   Loan_ID            614 non-null    object 
 1   Gender             601 non-null    object 
 2   Married            611 non-null    object 
 3   Dependents         599 non-null    object 
 4   Education          614 non-null    object 
 5   Self_Employed      582 non-null    object 
 6   ApplicantIncome    614 non-null    int64  
 7   CoapplicantIncome  614 non-null    float64
 8   LoanAmount         592 non-null    float64
 9   Loan_Amount_Term   600 non-null    float64
 10  Credit_History     564 non-null    float64
 11  Property_Area      614 non-null    object 
 12  Loan_Status        614 non-null    object 
dtypes: float64(4), int64(1), object(8)
memory usage: 62.5+ KB


In [3]:
df['Dependents'].value_counts()

0     345
1     102
2     101
3+     51
Name: Dependents, dtype: int64

In [4]:
# 1. Drop Rows with Null Columns
def drop_nulls(df, cols):
  df.dropna(subset = cols, inplace=True)
  return df

# 2. Fill Null Columns with Av Values 
def fill_vals(df, cols):
 for i in cols:
   av = df[i].mean()
   df[i].fillna(av, inplace = True)
 return df

# 3. Replace Strings with numbers and convert type
def replace_strings(df, cols):
  for i in cols:
    df[i].replace('3+',4, inplace = True)
    df[i] = pd.to_numeric(df[i])
  return df


null_cols = ['Gender','Married','Dependents','Credit_History','Self_Employed','Loan_Amount_Term']
av_cols = ['LoanAmount']
rp_cols = ['Dependents']


df = df.pipe(drop_nulls, null_cols).pipe(fill_vals,av_cols).pipe(replace_strings, rp_cols)

df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 499 entries, 0 to 613
Data columns (total 13 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   Loan_ID            499 non-null    object 
 1   Gender             499 non-null    object 
 2   Married            499 non-null    object 
 3   Dependents         499 non-null    int64  
 4   Education          499 non-null    object 
 5   Self_Employed      499 non-null    object 
 6   ApplicantIncome    499 non-null    int64  
 7   CoapplicantIncome  499 non-null    float64
 8   LoanAmount         499 non-null    float64
 9   Loan_Amount_Term   499 non-null    float64
 10  Credit_History     499 non-null    float64
 11  Property_Area      499 non-null    object 
 12  Loan_Status        499 non-null    object 
dtypes: float64(4), int64(2), object(7)
memory usage: 54.6+ KB


In [5]:
df.head(8)

Unnamed: 0,Loan_ID,Gender,Married,Dependents,Education,Self_Employed,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History,Property_Area,Loan_Status
0,LP001002,Male,No,0,Graduate,No,5849,0.0,144.735417,360.0,1.0,Urban,Y
1,LP001003,Male,Yes,1,Graduate,No,4583,1508.0,128.0,360.0,1.0,Rural,N
2,LP001005,Male,Yes,0,Graduate,Yes,3000,0.0,66.0,360.0,1.0,Urban,Y
3,LP001006,Male,Yes,0,Not Graduate,No,2583,2358.0,120.0,360.0,1.0,Urban,Y
4,LP001008,Male,No,0,Graduate,No,6000,0.0,141.0,360.0,1.0,Urban,Y
5,LP001011,Male,Yes,2,Graduate,Yes,5417,4196.0,267.0,360.0,1.0,Urban,Y
6,LP001013,Male,Yes,0,Not Graduate,No,2333,1516.0,95.0,360.0,1.0,Urban,Y
7,LP001014,Male,Yes,4,Graduate,No,3036,2504.0,158.0,360.0,0.0,Semiurban,N


In [6]:
df['Dependents'].value_counts()

0    284
2     88
1     84
4     43
Name: Dependents, dtype: int64

#  Pipeline in pandas using pdpipe

<p><a href="https://www.nbshare.io/notebook/400231434/Data-Cleaning-With-Python-Pdpipe/" rel="noreferrer">[SOURCE]</a></p>
<h1 id="types-of-pipeline-stages">Types of Pipeline Stages</h1>
<p>All built-in stages are thoroughly documented, including examples; if you find any documentation lacking please open an issue. A list of briefly described available built-in stages follows:</p>
<h2 id="built-in-pandas-methods">Built-in pandas methods</h2>
<p>Ad-hoc pipeline stages that wrap any <code>pandas.DataFrame</code> built-in method that returns a dataframe object can be easily created using the <code><a title="pdpipe.df" href="df.html">pdpipe.df</a></code> submodule:</p>
<pre><code class="language-python hljs">  pipeline = pdp.PdPipeline([
    pdp.df.set_axis(labels=<span class="hljs-string">'datetime'</span>),
    pdp.ColDrop(<span class="hljs-string">'age),
  ])
</span></code></pre>
<p>Refer to the <code><a title="pdpipe.df" href="df.html">pdpipe.df</a></code> module for a more detailed documentation.</p>
<h2 id="basic-stages">Basic Stages</h2>
<p>Refer to submodule <code><a title="pdpipe.basic_stages" href="basic_stages.html">pdpipe.basic_stages</a></code></p>
<ul>
<li>AdHocStage - Define custom pipeline stages on the fly.</li>
<li>ColDrop - Drop columns by name.</li>
<li>ValDrop - Drop rows by by their value in specific or all columns.</li>
<li>ValKeep - Keep rows by by their value in specific or all columns.</li>
<li>ColRename - Rename columns.</li>
<li>DropNa - Drop null values. Supports all parameter supported by pandas.dropna function. </li>
<li>FreqDrop - Drop rows by value frequency threshold on a specific column.</li>
<li>ColReorder - Reorder columns.</li>
<li>RowDrop - Drop rows by callable conditions.</li>
<li>Schematize - Learn a dataframe schema on fit and transform to it on future transforms.</li>
<li>DropDuplicates - Drop duplicate values in a subset of columns.</li>
</ul>
<h2 id="column-generation">Column Generation</h2>
<p>Refer to submodule <code><a title="pdpipe.col_generation" href="col_generation.html">pdpipe.col_generation</a></code></p>
<ul>
<li>Bin - Convert a continuous valued column to categoric data using binning.</li>
<li>OneHotEncode - Convert a categorical column to the several binary columns corresponding to it.</li>
<li>MapColVals - Replace column values by a map.</li>
<li>ApplyToRows - Generate columns by applying a function to each row.</li>
<li>ApplyByCols - Generate columns by applying an element-wise function to columns.</li>
<li>ColByFrameFunc - Add a column by applying a dataframe-wide function.</li>
<li>AggByCols - Generate columns by applying an series-wise function to columns.</li>
<li>Log - Log-transform numeric data, possibly shifting data before.</li>
</ul>
<h2 id="text-stages">Text Stages</h2>
<p>Refer to submodule <code><a title="pdpipe.text_stages" href="text_stages.html">pdpipe.text_stages</a></code></p>
<ul>
<li>RegexReplace - Replace regex occurences in columns of strings.</li>
<li>DropTokensByLength - Drop tokens in token lists by token length.</li>
<li>DropTokensByList - Drop every occurence of a given set of string tokens in token lists.</li>
</ul>
<h2 id="scikit-learn-dependent-stages">Scikit-learn-dependent Stages</h2>
<p>Refer to submodule <code><a title="pdpipe.sklearn_stages" href="sklearn_stages.html">pdpipe.sklearn_stages</a></code></p>
<ul>
<li>Encode - Encode a categorical column to corresponding number values.</li>
<li>Scale - Scale data with any of the sklearn scalers.</li>
<li>TfidfVectorizeTokenLists - Transform a column of token lists into the correponding set of tfidf vector columns.</li>
</ul>
<h2 id="nltk-dependent-stages">nltk-dependent Stages</h2>
<p>Refer to submodule <code><a title="pdpipe.nltk_stages" href="nltk_stages.html">pdpipe.nltk_stages</a></code></p>
<ul>
<li>TokenizeWords - Tokenize a sentence into a list of tokens by whitespaces.</li>
<li>UntokenizeWords - Joins token lists into whitespace-seperated strings.</li>
<li>RemoveStopwords - Remove stopwords from a tokenized list.</li>
<li>SnowballStem - Stems tokens in a list using the Snowball stemmer.</li>
<li>DropRareTokens - Drop rare tokens from token lists.</li>
</ul>

<h1>Creating Pipelines</h1>
<code>pipeline = pdp.PdPipeline([pdp.ColDrop("Name"), pdp.OneHotEncode("Label")]
pipeline = pdp.make_pdpipeline(pdp.ColDrop("Name"), pdp.OneHotEncode("Label"))
pipeline = pdp.ColDrop("Name").OneHotEncode("Label").ValDrop([-1], "Children")
pipeline = pdp.ColDrop("Name") + pdp.OneHotEncode("Label")
pipeline = pdp.ColDrop("Name") + pdp.OneHotEncode("Label")
pipeline += pdp.MapColVals("Job", {"Part": True, "Full":True, "No": False})
pipeline += pdp.PdPipeline([pdp.ColRename({"Job": "Employed"})])
</code>

<h1>Pipeline Slicing & Printing Pipelines</h1>
<code>print(pipeline)
pipeline[1:2]
</code>

In [7]:
# !pip install pdpipe
import pandas as pd
import pdpipe as pdp

df = pd.read_csv('./resources/cars.csv')
df.columns = df.columns.str.strip()
df.tail()

Unnamed: 0,mpg,cylinders,cubicinches,hp,weightlbs,time-to-60,year,brand
256,17.0,8,305,130,3840,15,1980,US.
257,36.1,4,91,60,1800,16,1979,Japan.
258,22.0,6,232,112,2835,15,1983,US.
259,18.0,6,232,100,3288,16,1972,US.
260,22.0,6,250,105,3353,15,1977,US.


In [12]:
def classifyYear(n):
    if n < 1980:
        return 'before 1980s'
    else:
        return 'after 1980s'
    
def numberOfCylinders(x):
    if x <= 4:
        return 'No'
    else:
        return 'Yes'
    

    
df_array = []
project_pipeline = pdp.ColDrop("time-to-60")
df_array.append(project_pipeline(df))
project_pipeline += pdp.ApplyByCols("year",classifyYear,"Year_Classification")
df_array.append(project_pipeline(df))
project_pipeline+= pdp.OneHotEncode('Year_Classification')
df_array.append(project_pipeline(df))
project_pipeline+=pdp.ApplyByCols('cylinders', numberOfCylinders, 'CylindersLessThan_4', drop=False)
project_pipeline+=pdp.ValDrop(['No'],'CylindersLessThan_4')
df_array.append(project_pipeline(df))
project_pipeline+= pdp.ColDrop('CylindersLessThan_4')
df_array.append(project_pipeline(df))
project_pipeline+= pdp.RowDrop({'hp': lambda x: x <= 100})
df_array.append(project_pipeline(df))
project_pipeline+= pdp.Scale('MinMaxScaler', exclude_columns=['mpg','year','brand','cubicinches'])
df_array.append(project_pipeline(df))
final_df = project_pipeline(df)
print("*"*100)
print("-"*5,"Printing pipeline","-"*5)
print(project_pipeline)

for i in range(len(df_array)):
    print("*"*100)
    print(project_pipeline[i],"\n")
    print(df_array[i])
    print("*"*100,"\n")

****************************************************************************************************
----- Printing pipeline -----
A pdpipe pipeline:
[ 0]  Drop columns time-to-60
[ 1]  Apply a function  to columns year
[ 2]  One-hot encode Year_Classification
[ 3]  Apply a function  to columns cylinders
[ 4]  Drop values No in columns CylindersLessThan_4
[ 5]  Drop columns CylindersLessThan_4
[ 6]  Drop rows in columns hp by conditions
[ 7]  Scale columns Columns of dtypes <class 'numpy.number'> (except mpg,
      year, brand, cubicinches)

****************************************************************************************************
PdPipelineStage: Drop columns time-to-60 

      mpg  cylinders cubicinches   hp weightlbs  year     brand
0    14.0          8         350  165      4209  1972       US.
1    31.9          4          89   71      1925  1980   Europe.
2    17.0          8         302  140      3449  1971       US.
3    15.0          8         400  150      3761  197

# Pipeline using Sklearn and Feature-engine

In [9]:
# https://github.com/krishnaik06/Pipelines-Using-Sklearn/blob/master/SklearnPipeline.ipynb 