## Data Pipeline

A data pipeline is a series of processes that move data from one place to another. In Python, a data pipeline can be built using various libraries and tools such as pandas, numpy, scikit-learn, etc.

For example, you can use pandas to extract data from a database, clean and transform the data, and then use scikit-learn to train a machine learning model on the transformed data. Finally, you can use the trained model to make predictions on new data. This series of steps constitutes a data pipeline in Python.

By building a data pipeline, you can automate many of the repetitive and time-consuming tasks involved in working with data, such as data loading, cleaning, and transformation, making the process more efficient and scalable.

In [1]:
import numpy as np
import pandas as pd
%matplotlib inline
import seaborn as sns
pd.options.display.max_rows = 50
from scipy.stats import scoreatpercentile as pct,iqr

In [2]:
def load_original_data():
    file1 = pd.read_csv('Data/file1.csv')
    file2 = pd.read_csv('Data/file2.txt', sep = '\t')
    file3 = pd.read_excel('Data/file3.xlsx')
    file4 = pd.read_excel('Data/file4.xlsx')
    return pd.concat([file1,file2,file3, file4], axis=0)

In [3]:
def remove_outliers_in_col(df,col):
    col = str(col)
    print("removing outliers in columns: ", col)
    pct_75 = pct(df[col], 75)  # Calculate percentile 75 using scipy function scoreatpercentile
    pct_25 = pct(df[col], 25)  # Calculate percentile 25 using scipy function scoreatpercentile
    upper_bound = pct_75 + 1.5*iqr(df[col])  # iqr - > Scipy function to calculate the Interquartile Range
    lower_bound = pct_25 - 1.5*iqr(df[col])
    return df[(df[col] <= upper_bound) & (df[col] >= lower_bound)][col]  # Filter out the outliers

In [4]:
def remove_outliers(df):
    for col in list(df._get_numeric_data().columns):
        df[col] = remove_outliers_in_col(df,col).reset_index(drop=True)
    return df


In [5]:
def lower_case_column_names(hk_df):
    hk_df.columns=[i.lower() for i in hk_df.columns]
    return hk_df

In [6]:
def rename_columns(hk_df):
    hk_df.rename(columns={'controln':'id','hv1':'median_home_val', 'ic1':'median_household_income',"ic2":"med_fam_income", "ic3":"avg_household_income","ic4": "avg_fam_income","ic5":"per_capita_income"}, inplace=True )
    return hk_df

In [7]:
def drop_columns(hk_df) :
    hk_df.drop(columns=["id","tcode",'pobc1',"dob"], inplace=True)
    return hk_df


In [8]:
def clean_gender(hk_df):
    ## do your magic about gender cleanfing
    hk_df["gender"]
    return hk_df


## Pipeline Controller

In [11]:
#list(hk_df._get_numeric_data().columns)

In [10]:
hk_df = load_original_data()

hk_df= hk_df.pipe(lower_case_column_names).pipe(rename_columns).pipe(drop_columns).pipe(clean_gender).pipe(remove_outliers)



removing outliers in columns:  median_household_income
removing outliers in columns:  avg_fam_income
removing outliers in columns:  hvp1
removing outliers in columns:  pobc2
removing outliers in columns:  med_fam_income
removing outliers in columns:  avg_household_income
removing outliers in columns:  avggift
removing outliers in columns:  target_d


## Pipelining using Sklearn - Another Example

In the example below, the pipeline first applies the SimpleImputer to fill missing values with the mean value, then applies the StandardScaler to scale the data. The resulting cleaned and transformed data is stored in the df_cleaned variable.

In [None]:
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# Load data into a pandas DataFrame
df = pd.read_csv("data.csv")

# Define a pipeline with steps for imputing missing values and scaling the data
pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
])

# Fit the pipeline to the DataFrame and transform the data
df_cleaned = pipeline.fit_transform(df)


## Running the pipeline as python script

Now that we have written the code to produce this analysis once, we will want to automate it so that we can run it again next year without having to modify the code. To do this, we are going to organize our code by wrapping it into functions according to the step in the pipeline.

In [None]:
import numpy as np
import pandas as pd
%matplotlib inline
import seaborn as sns
pd.options.display.max_rows = 50
from scipy.stats import scoreatpercentile as pct,iqr

def load_original_data():
    file1 = pd.read_csv('Data/file1.csv')
    file2 = pd.read_csv('Data/file2.txt', sep = '\t')
    file3 = pd.read_excel('Data/file3.xlsx')
    file4 = pd.read_excel('Data/file4.xlsx')
    return pd.concat([file1,file2,file3, file4], axis=0)

def remove_outliers_in_col(df,col):
    col = str(col)
    print("removing outliers in columns: ", col)
    pct_75 = pct(df[col], 75)  # Calculate percentile 75 using scipy function scoreatpercentile
    pct_25 = pct(df[col], 25)  # Calculate percentile 25 using scipy function scoreatpercentile
    upper_bound = pct_75 + 1.5*iqr(df[col])  # iqr - > Scipy function to calculate the Interquartile Range
    lower_bound = pct_25 - 1.5*iqr(df[col])
    return df[(df[col] <= upper_bound) & (df[col] >= lower_bound)][col]  # Filter out the outliers

def remove_outliers(df):
    for col in list(df._get_numeric_data().columns):
        df[col] = remove_outliers_in_col(df,col).reset_index(drop=True)
    return df

def lower_case_column_names(hk_df):
    hk_df.columns=[i.lower() for i in hk_df.columns]
    return hk_df

def rename_columns(hk_df):
    hk_df.rename(columns={'controln':'id','hv1':'median_home_val', 'ic1':'median_household_income',"ic2":"med_fam_income", "ic3":"avg_household_income","ic4": "avg_fam_income","ic5":"per_capita_income"}, inplace=True )
    return hk_df

def drop_columns(hk_df) :
    hk_df.drop(columns=["id","tcode",'pobc1',"dob"], inplace=True)
    return hk_df

def clean_gender(hk_df):
    ## do your magic about gender cleanfing
    hk_df["gender"]
    return hk_df

## Executing the main program
if __name__ == '__main__':
    hk_df = load_original_data()
    hk_df= hk_df.pipe(lower_case_column_names).pipe(rename_columns).pipe(drop_columns).pipe(clean_gender).pipe(remove_outliers)


Note that we have also added an if __name__ == '__main__' statement at the bottom where the functions are actually called. This statement should be added when there is code that we want the Python interpreter to execute only when a script is run as the main program (not imported into another Python file).

We are going to save this code in a Python file called health_care_data_cleaning_pipeline.py file that we can then run via the command line.

If your pipeline does not need any user intervention, you can schedule it to run automatically at a desired frequency (daily, weekly, monthly, annually, etc.). The instructions for how to do that vary significantly from one operating system to another, depend on where on your machine you installed Python, and is ultimately beyond the scope of this lesson. However, scheduling is something that you should research for the operating system you have.

For Mac users, you want to Google cron and launchd and look for tutorials. Windows users will want to investigate the Task Scheduler that comes with Windows. Both operating systems make it possible to run Python scripts automatically on a scheduler, but neither of them has a way to do so that is both "official" and easy.