# Pipeline

In [7]:
# Importing libraries
import numpy as np
import pandas as pd
import math
import random

In [8]:
# Extracting data from three different .csv sources

# Here we read employees.csv file and store in emp as a dataframe
emp = pd.read_csv('/home/sage_gaurab/chained/code_world/ds_py/data_src/employees.csv')

# Here we read departments.csv file and store in dept as a dataframe
dept = pd.read_csv('/home/sage_gaurab/chained/code_world/ds_py/data_src/departments.csv')

# Here we read bonus.csv file and store in bonus as a dataframe
bonus = pd.read_csv('/home/sage_gaurab/chained/code_world/ds_py/data_src/bonus.csv')


In [9]:
# Funciton for producing full_name column
def add_full_name(df):
    return df.assign(full_name = df['first_name'] + ' ' + df['last_name'])

# Funciton to replace null salary value
def replace_null_salary(df, method = 'mean'):
    if method == 'mean':
        fill_value = df['salary'].mean().round(0)
    elif method == 'dept_mean':
        fill_value = df.groupby('department_id')['salary'].transform('mean').round(0)
    elif method == 'median':
        fill_value = df['salary'].median().round(0)
    return df.assign(salary = df['salary'].fillna(fill_value))

# Function to join all three table
def joined_df(df, df_dept, df_bonus):
    return(
        df.merge(df_dept, on = 'department_id', how = 'left').
           merge(df_bonus, on = 'employee_id',  how = 'left')
    )
# Function to fill null bonus rows                                                                  
def replace_null_bonus(df):
    return df.assign(bonus_amount = df['bonus_amount'].fillna(0))
    
# Function to calculate annual salary
def add_annual_salary(df):
    return df.assign(annual_salary = df['salary']*12) # 1st chal

# Function to calculate perfomance bonus
def add_performance_bonus(df):
    return df.assign(performance_bonus = (df['bonus_amount'] * df['performance_score']/100).round(0))
    

# _________________________________________ Analysing function _________________________________________________________

def average_salary(df):
    return df.assign(average_salary = df['salary'].mean())

def total_salary(df):
    return df.assign(total_salary = df['salary'].sum())


In [10]:
cleaned_df = (
    emp.pipe(add_full_name)
        .pipe(replace_null_salary, method = 'dept_mean')
        .pipe(joined_df, dept, bonus)
        .pipe(replace_null_bonus)
        .pipe(add_annual_salary)
        .pipe(add_performance_bonus)
        .pipe(average_salary)
        .pipe(total_salary)
)
cleaned_df[['employee_id','full_name','salary','annual_salary','department_name','bonus_amount','performance_score', 'performance_bonus']].sample(5)

Unnamed: 0,employee_id,full_name,salary,annual_salary,department_name,bonus_amount,performance_score,performance_bonus
9,10,Olivia Thomas,78000.0,936000.0,Marketing,0.0,96,0.0
2,3,Mike Johnson,65000.0,780000.0,Sales,0.0,78,0.0
15,16,Isabella Garcia,82500.0,990000.0,IT,0.0,93,0.0
19,20,John Doe,60000.0,720000.0,Sales,0.0,85,0.0
0,1,John Doe,60000.0,720000.0,Sales,5000.0,85,4250.0


### PipelineFactory: Parameterized Reusable Pipeline
----

In [11]:
def outer_func(method = 'mean', join_choice = True, bonus_fill = False, performance_bonus = False):
    
    def pipeline(df, df_dept = None, df_bonus = None):
        result = df.pipe(add_full_name).pipe(replace_null_salary, method = method)
        if join_choice:
            result = result.pipe(joined_df, df_dept, df_bonus)
            
        if bonus_fill and join_choice:
            result = result.pipe(replace_null_bonus)
            
        if performance_bonus and join_choice:
            result = result.pipe(add_performance_bonus)
        return result
        
    return pipeline

In [12]:
# generating piple using pipline factory 
result_pipeline =  outer_func(method = 'mean', join_choice = False, bonus_fill = True, performance_bonus = True)

# pipline which transform data according to passed arguments
result_pipeline(emp, df_dept = dept, df_bonus = bonus).sample(5)

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name
13,14,Ava,Martin,10,63000.0,2021-04-25,Atlanta,80,Ava Martin
17,18,Charlotte,Robinson,20,76000.0,2021-08-25,Orlando,94,Charlotte Robinson
3,4,Emily,Brown,30,80000.0,2020-04-05,Houston,95,Emily Brown
8,9,James,Anderson,40,58000.0,2020-09-25,San Jose,72,James Anderson
7,8,Sophia,Taylor,10,62000.0,2020-08-30,Dallas,91,Sophia Taylor


In [13]:
result_pipeline2 = outer_func(method = 'mean', join_choice = True, bonus_fill = True, performance_bonus = True)
result_pipeline2(emp, df_dept = dept, df_bonus = bonus).head(5)

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date,performance_bonus
0,1,John,Doe,10,60000.0,2020-01-15,New York,85,John Doe,Sales,New York,500000,5000.0,2023-01-01,4250.0
1,2,Jane,Smith,20,75000.0,2020-02-20,Los Angeles,92,Jane Smith,Marketing,Los Angeles,800000,8000.0,2023-01-01,7360.0
2,3,Mike,Johnson,10,65000.0,2020-03-10,Chicago,78,Mike Johnson,Sales,New York,500000,0.0,,0.0
3,4,Emily,Brown,30,80000.0,2020-04-05,Houston,95,Emily Brown,IT,Chicago,1200000,10000.0,2023-01-01,9500.0
4,5,David,Davis,40,69667.0,2020-05-12,Phoenix,88,David Davis,HR,Houston,400000,0.0,,0.0


In [14]:
emp.sample()

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score
4,5,David,Davis,40,,2020-05-12,Phoenix,88


In [15]:
dept.sample()

Unnamed: 0,department_id,department_name,location,budget
3,40,HR,Houston,400000


In [16]:
bonus.sample()

Unnamed: 0,employee_id,bonus_amount,bonus_date
6,13,7500,2023-01-01


In [17]:
# 3rd chall
def outer_f():
    def inner_f(df):
        result = df.pipe(add_full_name).pipe(replace_null_salary, method = 'mean').pipe(joined_df, dept, bonus)
        return result
    return inner_f            

In [18]:
pipeline = outer_f()
pipeline(emp).sample(3)

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date
11,12,Emma,White,30,82000.0,2021-02-15,Denver,89,Emma White,IT,Chicago,1200000,,
12,13,Michael,Harris,20,74000.0,2021-03-20,Miami,87,Michael Harris,Marketing,Los Angeles,800000,7500.0,2023-01-01
10,11,William,Jackson,10,61000.0,2021-01-10,Seattle,83,William Jackson,Sales,New York,500000,5500.0,2023-01-01


In [19]:
# 4th chall
def outer_4f(method = 'mean'):
    def inner_4f(df):
        result = df.pipe(add_full_name).pipe(replace_null_salary, method = method).pipe(joined_df, dept, bonus)
        return result
    return inner_4f       

In [20]:
x = outer_4f(method = 'dept_mean')
x(emp).sample(5)

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date
17,18,Charlotte,Robinson,20,76000.0,2021-08-25,Orlando,94,Charlotte Robinson,Marketing,Los Angeles,800000,9000.0,2023-01-01
14,15,Daniel,Thompson,40,56000.0,2021-05-30,Boston,75,Daniel Thompson,HR,Houston,400000,4000.0,2023-01-01
2,3,Mike,Johnson,10,65000.0,2020-03-10,Chicago,78,Mike Johnson,Sales,New York,500000,,
6,7,Robert,Moore,30,85000.0,2020-07-22,San Antonio,79,Robert Moore,IT,Chicago,1200000,,
10,11,William,Jackson,10,61000.0,2021-01-10,Seattle,83,William Jackson,Sales,New York,500000,5500.0,2023-01-01


In [21]:
def outer_5f(method = 'mean', include_join = True):
    def inner_5f(df):
        result = df.pipe(add_full_name).pipe(replace_null_salary, method = method)
        if include_join:
            result = result.pipe(joined_df, dept, bonus).pipe(replace_null_bonus)
        return result
    return inner_5f    

In [22]:
# Instance 1
quick_analysis = outer_5f(method = 'mean', include_join = False)
quick_analysis(emp).sample()

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name
4,5,David,Davis,40,69667.0,2020-05-12,Phoenix,88,David Davis


In [23]:
# Instance 2
dept_analysis = outer_5f(method = 'dept_mean', include_join = True)
dept_analysis(emp).sample()

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date
10,11,William,Jackson,10,61000.0,2021-01-10,Seattle,83,William Jackson,Sales,New York,500000,5500.0,2023-01-01


In [24]:
# Instance 3
custom_analysis = outer_5f(method = 'median', include_join = True)
custom_analysis(emp).sample()

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date
15,16,Isabella,Garcia,30,68500.0,2021-06-15,Portland,93,Isabella Garcia,IT,Chicago,1200000,0.0,


In [25]:
def outer_func(method = 'mean', join_choice = True, bonus_fill = False, performance_bonus = False, analyze = True):
    
    def pipeline(df, df_dept = None, df_bonus = None):
        result = df.pipe(add_full_name).pipe(replace_null_salary, method = method)
        if join_choice:
            result = result.pipe(joined_df, df_dept, df_bonus)
            
        if bonus_fill and join_choice:
            result = result.pipe(replace_null_bonus)
            
        if performance_bonus and join_choice:
            result = result.pipe(add_performance_bonus)
            
        if analyze:
            result = result.pipe(average_salary).pipe(total_salary)
            
        return result
        
    return pipeline

In [26]:
test = outer_func(method = 'mean', join_choice = True, bonus_fill = True, performance_bonus = True, analyze = True)
test(emp, dept, bonus).sample(5)

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date,performance_bonus,average_salary,total_salary
1,2,Jane,Smith,20,75000.0,2020-02-20,Los Angeles,92,Jane Smith,Marketing,Los Angeles,800000,8000.0,2023-01-01,7360.0,69666.7,1393334.0
3,4,Emily,Brown,30,80000.0,2020-04-05,Houston,95,Emily Brown,IT,Chicago,1200000,10000.0,2023-01-01,9500.0,69666.7,1393334.0
6,7,Robert,Moore,30,85000.0,2020-07-22,San Antonio,79,Robert Moore,IT,Chicago,1200000,0.0,,0.0,69666.7,1393334.0
12,13,Michael,Harris,20,74000.0,2021-03-20,Miami,87,Michael Harris,Marketing,Los Angeles,800000,7500.0,2023-01-01,6525.0,69666.7,1393334.0
8,9,James,Anderson,40,58000.0,2020-09-25,San Jose,72,James Anderson,HR,Houston,400000,0.0,,0.0,69666.7,1393334.0


In [35]:
# config takes dictionary as a argument
# config got key : value pairs of 
# function and its arguments

def pipeline_factory(config):
    steps = [add_full_name] # first steps in the pipeline

    if config.get('method'): # if nothing is passed in the dictionary
            steps.append(lambda df: replace_null_salary(df, method=config['method']))

    if config.get('join_choice'):
            steps.append(lambda df: joined_df(df, config['df_dept'], config['df_bonus']))

    if config.get('bonus_fill') and config.get('join_choice'):
            steps.append(replace_null_bonus)

    def pipeline(df):
        for step in steps:
            df = df.pipe(step)
        return df
    return pipeline

In [39]:
pipe_fac_dic = {
    'method' : 'mean',
    'join_choice' : True,
    'bonus_fill' : True
}

In [40]:
pipeline_factory(pipe_fac_dic)

<function __main__.pipeline_factory.<locals>.pipeline(df)>

In [41]:
data_pipeline = pipeline(emp)

In [42]:
data_pipeline

Unnamed: 0,employee_id,first_name,last_name,department_id,salary,hire_date,city,performance_score,full_name,department_name,location,budget,bonus_amount,bonus_date
0,1,John,Doe,10,60000.0,2020-01-15,New York,85,John Doe,Sales,New York,500000,5000.0,2023-01-01
1,2,Jane,Smith,20,75000.0,2020-02-20,Los Angeles,92,Jane Smith,Marketing,Los Angeles,800000,8000.0,2023-01-01
2,3,Mike,Johnson,10,65000.0,2020-03-10,Chicago,78,Mike Johnson,Sales,New York,500000,,
3,4,Emily,Brown,30,80000.0,2020-04-05,Houston,95,Emily Brown,IT,Chicago,1200000,10000.0,2023-01-01
4,5,David,Davis,40,69667.0,2020-05-12,Phoenix,88,David Davis,HR,Houston,400000,,
5,6,Laura,Wilson,20,72000.0,2020-06-18,Philadelphia,90,Laura Wilson,Marketing,Los Angeles,800000,7000.0,2023-01-01
6,7,Robert,Moore,30,85000.0,2020-07-22,San Antonio,79,Robert Moore,IT,Chicago,1200000,,
7,8,Sophia,Taylor,10,62000.0,2020-08-30,Dallas,91,Sophia Taylor,Sales,New York,500000,6000.0,2023-01-01
8,9,James,Anderson,40,58000.0,2020-09-25,San Jose,72,James Anderson,HR,Houston,400000,,
9,10,Olivia,Thomas,20,78000.0,2020-10-15,Austin,96,Olivia Thomas,Marketing,Los Angeles,800000,,
