# Big Data Challenge

Considering the 3 datasets (attached in the pdf), write PySpark statements to perform the required operations:

1. Create 3 data frames with the above data
2. Rename the columns by using using capital letters and replace '_' with space
3. Format birth_date as 01.Jan.2021
4. Add a new column in employeeData where you compute the company email address by the following rule: ``[first 2 letter of first_name]`` ``[last_name]@company.com``
5. Calculate the average salary for each job role
6. Add a flag (set value to True) in salaryData if the average salary of the person is lower than the average salary for their job role 

## Data

The first step is to create 3 dataframes with the proposed data. This task was done with [Pandas](https://pandas.pydata.org/).

In [1]:
#employee values
employeeColumn = ["emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date"]
employeeData = [
["10001","1953-09-02","Georgi","Facello","M","1986-06-26"],
["10002","1964-06-02","Bezalel","Simmel","F","1985-11-21"],
["10003","1959-12-03","Parto","Bamford","M","1986-08-28"],
["10004","1954-05-01","Chirstian","Koblick","M","1986-12-01"],
["10005","1955-01-21","Kyoichi","Maliniak","M","1989-09-12"]
] 

In [2]:
#Job values
jobColumn = ["emp_no", "title", "from_date" , "to_date"] 
jobData = [
["10001","Senior Engineer","1986-06-26","9999-01-01"],
["10002","Staff","1996-08-03","9999-01-01"],
["10003","Senior Engineer","1995-12-03","9999-01-01"],
["10004","Senior Engineer","1995-12-01","9999-01-01"],
["10005","Senior Staff","1996-09-12","9999-01-01"]
] 

In [3]:
#Salary values
salaryColumn = ["emp_no", "title", "from_date" , "to_date"] 
salaryData= [
["10001","66074","1988-06-25","1989-06-25"], ["10001","62102","1987-06-26","1988-06-25"],
["10001","60117","1986-06-26","1987-06-26"] , ["10002","72527","2001-08-02","9999-01-01"],
["10002","71963","2000-08-02","2001-08-02"], ["10002","69366","1999-08-03","2000-08-02"] ,
["10003","43311","2001-12-01","9999-01-01"], ["10003","43699","2000-12-01","2001-12-01"],
["10003","43478","1999-12-02","2000-12-01"] ,
["10004","74057","2001-11-27","9999-01-01"], ["10004","70698","2000-11-27","2001-11-27"],
["10004","69722","1999-11-28","2000-11-27"],
["10005","94692","2001-09-09","9999-01-01"], ["10005","91453","2000-09-09","2001-09-09"],
["10005","90531","1999-09-10","2000-09-09"]
] 

In [4]:
import pandas as pd
employee = pd.DataFrame(data = employeeData, columns = employeeColumn)
jobs = pd.DataFrame(data = jobData, columns = jobColumn)
salary = pd.DataFrame(data = salaryData, columns = salaryColumn)

In [5]:
employee.head(5)

Unnamed: 0,emp_no,birth_date,first_name,last_name,gender,hire_date
0,10001,1953-09-02,Georgi,Facello,M,1986-06-26
1,10002,1964-06-02,Bezalel,Simmel,F,1985-11-21
2,10003,1959-12-03,Parto,Bamford,M,1986-08-28
3,10004,1954-05-01,Chirstian,Koblick,M,1986-12-01
4,10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12


In [6]:
jobs.head(5)

Unnamed: 0,emp_no,title,from_date,to_date
0,10001,Senior Engineer,1986-06-26,9999-01-01
1,10002,Staff,1996-08-03,9999-01-01
2,10003,Senior Engineer,1995-12-03,9999-01-01
3,10004,Senior Engineer,1995-12-01,9999-01-01
4,10005,Senior Staff,1996-09-12,9999-01-01


In [7]:
salary.head(5)

Unnamed: 0,emp_no,title,from_date,to_date
0,10001,66074,1988-06-25,1989-06-25
1,10001,62102,1987-06-26,1988-06-25
2,10001,60117,1986-06-26,1987-06-26
3,10002,72527,2001-08-02,9999-01-01
4,10002,71963,2000-08-02,2001-08-02


## PySpark session

With the data frames already created, they are loaded into the Spark session. Spark gives us more efficiency because it lets us spread data and computations over clusters with multiple nodes.

As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster (ref: [DataCamp](https://www.datacamp.com/))

In [8]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()
my_spark.builder.config("spark.memory.offHeap.size","5g")
my_spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Print my_spark
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000001CD7D9C5250>


## Create a DataFrame in SparkSession

After creating the session in Spark, the next step is to convert the Pandas data to a Data Frame (Spark).

In [9]:
# Create temp tables from employee, jobs and salary
employee_temp = my_spark.createDataFrame(employee)
jobs_temp = my_spark.createDataFrame(jobs)
salary_temp = my_spark.createDataFrame(salary)


Rename the columns by using using capital letters and replace "_" with space:

In [11]:
for i in employee_temp.columns: 
    employee_temp = employee_temp.withColumnRenamed(i, i.replace("_", " "))
    
employee_temp.printSchema()

root
 |-- emp no: string (nullable = true)
 |-- birth date: string (nullable = true)
 |-- first name: string (nullable = true)
 |-- last name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire date: string (nullable = true)



In [12]:
employee_temp.show()

+------+----------+----------+---------+------+----------+
|emp no|birth date|first name|last name|gender| hire date|
+------+----------+----------+---------+------+----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12|
+------+----------+----------+---------+------+----------+



Format birthdate as ``01.Jan.2021``

In [13]:
from pyspark.sql.functions import col, date_format
employee_temp = employee_temp.withColumn("birth date",
    date_format(col("birth date"), "dd.MMM.yyyy"))
employee_temp.show()

+------+-----------+----------+---------+------+----------+
|emp no| birth date|first name|last name|gender| hire date|
+------+-----------+----------+---------+------+----------+
| 10001|02.Sep.1953|    Georgi|  Facello|     M|1986-06-26|
| 10002|02.Jun.1964|   Bezalel|   Simmel|     F|1985-11-21|
| 10003|03.Dec.1959|     Parto|  Bamford|     M|1986-08-28|
| 10004|01.May.1954| Chirstian|  Koblick|     M|1986-12-01|
| 10005|21.Jan.1955|   Kyoichi| Maliniak|     M|1989-09-12|
+------+-----------+----------+---------+------+----------+



Add a new column in employeeData where you compute the company email address by the following rule: ``[first 2 letter of first_name][last_name]@company.com``

In [14]:
from pyspark.sql.functions import concat,lit,lower
# Add new column
employee_temp = employee_temp.withColumn("email", 
                                         concat(lower(employee_temp["first name"].substr(1, 2)),
                                                lower(employee_temp["last name"]),
                                                lit('@company.com')))

employee_temp.show()
#employee_temp.createOrReplaceTempView("employee_temp")
#concat(employee_temp.first_name.substr(1, 2),employee_temp.last_name.substr(1, 2),lit('_'))

+------+-----------+----------+---------+------+----------+--------------------+
|emp no| birth date|first name|last name|gender| hire date|               email|
+------+-----------+----------+---------+------+----------+--------------------+
| 10001|02.Sep.1953|    Georgi|  Facello|     M|1986-06-26|gefacello@company...|
| 10002|02.Jun.1964|   Bezalel|   Simmel|     F|1985-11-21|besimmel@company.com|
| 10003|03.Dec.1959|     Parto|  Bamford|     M|1986-08-28|pabamford@company...|
| 10004|01.May.1954| Chirstian|  Koblick|     M|1986-12-01|chkoblick@company...|
| 10005|21.Jan.1955|   Kyoichi| Maliniak|     M|1989-09-12|kymaliniak@compan...|
+------+-----------+----------+---------+------+----------+--------------------+



## SQL statements

Calculate the average salary for each job role

In [15]:
salary_temp = salary_temp.withColumnRenamed("title", "salary")
jobs_temp = jobs_temp.withColumnRenamed("title", "jobs")


In [22]:
# Print the tables in the catalog
for i in my_spark.catalog.listTables():
    print(i)

Table(name='employee_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)
Table(name='jobs_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)
Table(name='salary_avg_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)
Table(name='salary_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)


In [16]:
salary_temp.show(3)

+------+------+----------+----------+
|emp_no|salary| from_date|   to_date|
+------+------+----------+----------+
| 10001| 66074|1988-06-25|1989-06-25|
| 10001| 62102|1987-06-26|1988-06-25|
| 10001| 60117|1986-06-26|1987-06-26|
+------+------+----------+----------+
only showing top 3 rows



In [17]:
jobs_temp.show(3)

+------+---------------+----------+----------+
|emp_no|           jobs| from_date|   to_date|
+------+---------------+----------+----------+
| 10001|Senior Engineer|1986-06-26|9999-01-01|
| 10002|          Staff|1996-08-03|9999-01-01|
| 10003|Senior Engineer|1995-12-03|9999-01-01|
+------+---------------+----------+----------+
only showing top 3 rows



In [18]:
# Add temp tables to the catalog
employee_temp.createOrReplaceTempView("employee_temp")
jobs_temp.createOrReplaceTempView("jobs_temp")
salary_temp.createOrReplaceTempView("salary_temp")

In [19]:
query = """SELECT jt.jobs, ROUND(AVG(salary),2) as Salary 
            FROM salary_temp st, jobs_temp jt 
            WHERE st.emp_no = jt.emp_no
            GROUP BY jt.jobs
            ORDER BY Salary DESC"""
my_spark.sql(query).show()
salaray_avg_temp = my_spark.sql(query)

#Create a view of salary_avf_temp
salaray_avg_temp.createOrReplaceTempView("salary_avg_temp")

+---------------+--------+
|           jobs|  Salary|
+---------------+--------+
|   Senior Staff|92225.33|
|          Staff|71285.33|
|Senior Engineer|59250.89|
+---------------+--------+



Add a flag (set value to True) in salaryData if the average salary of the person is lower than the
average salary for their job role 

In [21]:
query = """SELECT st.emp_no, FIRST(jt.jobs) as Job,
                  ROUND(AVG(st.salary),2) as avg_salary,
                  IF(ROUND(AVG(st.salary),2) < FIRST(sat.Salary)  , "True", "False") as flag_salary  
            FROM salary_temp st, jobs_temp jt, salary_avg_temp sat
            WHERE st.emp_no = jt.emp_no AND sat.jobs = jt.jobs
            GROUP BY st.emp_no
"""

my_spark.sql(query).show()

+------+---------------+----------+-----------+
|emp_no|            Job|avg_salary|flag_salary|
+------+---------------+----------+-----------+
| 10001|Senior Engineer|  62764.33|      False|
| 10002|          Staff|  71285.33|      False|
| 10003|Senior Engineer|   43496.0|       True|
| 10004|Senior Engineer|  71492.33|      False|
| 10005|   Senior Staff|  92225.33|      False|
+------+---------------+----------+-----------+

