Creating Session, Context and Dummy Data

In [None]:
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("paramo_challenge").getOrCreate()
sc = spark.sparkContext.getOrCreate()

employee_columns = ["emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date"]
employee_body = [["10001","1953-09-02","Georgi","Facello","M","1986-06-26"],
    ["10002","1964-06-02","Bezalel","Simmel","F","1985-11-21"],
    ["10003","1959-12-03","Parto","Bamford","M","1986-08-28"],
    ["10004","1954-05-01","Chirstian","Koblick","M","1986-12-01"],
    ["10005","1955-01-21","Kyoichi","Maliniak","M","1989-09-12"]] 

job_columns = ["emp_no", "title", "from_date" , "to_date"] 
job_body = [["10001","Senior Engineer","1986-06-26","9999-01-01"],
    ["10002","Staff","1996-08-03","9999-01-01"],
    ["10003","Senior Engineer","1995-12-03","9999-01-01"],
    ["10004","Senior Engineer","1995-12-01","9999-01-01"],
    ["10005","Senior Staff","1996-09-12","9999-01-01"]] 

salary_columns = ["emp_no", "salary", "from_date" , "to_date"] 
salary_body = [["10001","66074","1988-06-25","1989-06-25"], 
    ["10001","62102","1987-06-26","1988-06-25"],
    ["10001","60117","1986-06-26","1987-06-26"], 
    ["10002","72527","2001-08-02","9999-01-01"],
    ["10002","71963","2000-08-02","2001-08-02"], 
    ["10002","69366","1999-08-03","2000-08-02"],
    ["10003","43311","2001-12-01","9999-01-01"], 
    ["10003","43699","2000-12-01","2001-12-01"],
    ["10003","43478","1999-12-02","2000-12-01"],
    ["10004","74057","2001-11-27","9999-01-01"], 
    ["10004","70698","2000-11-27","2001-11-27"],
    ["10004","69722","1999-11-28","2000-11-27"],
    ["10005","94692","2001-09-09","9999-01-01"], 
    ["10005","91453","2000-09-09","2001-09-09"],
    ["10005","90531","1999-09-10","2000-09-09"]] 

Create 3 data frames with the above data

In [None]:
employee = sc.parallelize(employee_body).toDF(employee_columns)
job = sc.parallelize(job_body).toDF(job_columns)
salary = sc.parallelize(salary_body).toDF(salary_columns)

Rename the columns by using using capital letters and replace '_' with space

In [None]:
for col in employee.columns:
    employee = employee.withColumnRenamed(col,col.replace("_"," "))
print(employee.columns)

for col in job.columns:
    job = job.withColumnRenamed(col,col.replace("_"," "))
print(job.columns)

for col in salary.columns:
    salary = salary.withColumnRenamed(col,col.replace("_"," "))
print(salary.columns)


Format birth_date as 01.Jan.2021

In [None]:
from pyspark.sql.functions import col, date_format

employee = employee.withColumn("birth date",date_format(col("birth date"),"d.MMM.y"))
employee.show()

Add a new column in employeeData where you compute the company email address by the
following rule: [first 2 letter of first_name][last_name]@company.com

In [None]:
from pyspark.sql.functions import concat, substring, lit

_concat = (
    concat(
        substring(col("first name"),0,2),
        col("last name"),
        lit("@company.com")
    )
)
employee = employee.withColumn("email", _concat)
employee.show()

 Calculate the average salary for each job role 

In [None]:
from pyspark.sql.functions import datediff, current_date, when

join_table = salary.alias("salary").join(other=job.alias("job"), on="emp no", how="inner")
join_table = join_table[[
    "salary.`emp no`",
    "job.`title`", 
    "salary.`salary`",
    "salary.`from date`",
    "salary.`to date`"
]]

# The approach is going to be first calculate the avg salary of each employee-role combination throughtout time so that we know 
# how much each person in each role earns on average, next we use that info to know the average salary of the role itself

final = (
    join_table
    .withColumn(
        "years",
        when(col("to date")=="9999-01-01",datediff(current_date(), col("from date"))/365)
        .otherwise(datediff(col("to date"), col("from date"))/365)
    )
    .withColumn(
        "total_earned",
        col("years")*col("salary")
    )
    .groupBy(
        col("emp no"),
        col("title")
    )
    .agg(
        (
            F.sum(col("total_earned"))/F.sum(col("years"))
        )
        .alias("avg_per_employee_role")
    )
    .groupBy(
        col("title")
    )
    .agg(
        (
            F.sum(col("avg_per_employee_role"))/F.count(col("emp no"))
        ).alias("avg_per_role")
    )
)

final.show()

Add a flag (set value to True) in salaryData if the average salary of the person is lower than the  average salary for their job role 

In [None]:
from pyspark.sql.window import Window as W

# The approach here is going to be the same as before but now we have to use window functions to avoid joins

final = (
    join_table # Making use of the join_table defined in the previous exercise
    .withColumn(
        "years",
        when(col("to date")=="9999-01-01",datediff(current_date(), col("from date"))/365)
        .otherwise(datediff(col("to date"), col("from date"))/365)
    )
    .withColumn(
        "total_earned",
        col("years")*col("salary")
    )
    .withColumn(
        "avg_per_employee_role",
        F.sum(col("total_earned")).over(W.partitionBy(col("emp no"), col("title")))
        /
        F.sum(col("years")).over(W.partitionBy(col("emp no"), col("title")))
        
    )
    .withColumn(
        "avg_per_role",
        F.sum(col("avg_per_employee_role")).over(W.partitionBy(col("title")))
        /
        F.count(col("emp no")).over(W.partitionBy(col("title")))
    )
    .withColumn(
        "avg_per_employee",
        F.sum(col("total_earned")).over(W.partitionBy(col("emp no")))
        /
        F.sum(col("years")).over(W.partitionBy(col("emp no")))
    )
    .withColumn(
        "flag",
        when(col("avg_per_employee")<col("avg_per_role"),lit(True))
        .otherwise(lit(False))
    )
)

salary = final[["salary.*","flag"]]
    
salary.show()