In [None]:
# Table: Employee

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| Id           | int     |
| Name         | varchar |
| Salary       | int     |
| DepartmentId | int     |
+--------------+---------+
Id is the primary key for this table.
Each row contains the ID, name, salary, and department of one employee.
 

Table: Department

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| Id          | int     |
| Name        | varchar |
+-------------+---------+
Id is the primary key for this table.
Each row contains the ID and the name of one department.
 

A company's executives are interested in seeing who earns the most money in each of the company's departments. 
A high earner in a department is an employee who has a salary in the top three unique salaries for that department.

Write an SQL query to find the employees who are high earners in each of the departments.

Return the result table in any order.

The query result format is in the following example:

 

Employee table:
+----+-------+--------+--------------+
| Id | Name  | Salary | DepartmentId |
+----+-------+--------+--------------+
| 1  | Joe   | 85000  | 1            |
| 2  | Henry | 80000  | 2            |
| 3  | Sam   | 60000  | 2            |
| 4  | Max   | 90000  | 1            |
| 5  | Janet | 69000  | 1            |
| 6  | Randy | 85000  | 1            |
| 7  | Will  | 70000  | 1            |
+----+-------+--------+--------------+

Department table:
+----+-------+
| Id | Name  |
+----+-------+
| 1  | IT    |
| 2  | Sales |
+----+-------+

Result table:
+------------+----------+--------+
| Department | Employee | Salary |
+------------+----------+--------+
| IT         | Max      | 90000  |
| IT         | Joe      | 85000  |
| IT         | Randy    | 85000  |
| IT         | Will     | 70000  |
| Sales      | Henry    | 80000  |
| Sales      | Sam      | 60000  |
+------------+----------+--------+

In the IT department:
- Max earns the highest unique salary
- Both Randy and Joe earn the second-highest unique salary
- Will earns the third-highest unique salary

In the Sales department:
- Henry earns the highest salary
- Sam earns the second-highest salary
- There is no third-highest salary as there are only two employees

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("dept_top_three_sal").master("local[*]").getOrCreate()

emp_schema = ["id", "name", "salary", "department"]
dept_schema = ["id", "name"]

emp_data = [(1, "Joe", 85000, 1), (2, "Henry", 80000, 2), (3, "Sam", 60000, 2), (4, "Max", 90000, 1), 
            (5, "Janet", 69000, 1), (6, "Randy", 85000, 1), (7, "Will", 70000, 1)]

dept_data = [(1, "IT"), (2, "Sales")]

emp_df = spark.createDataFrame(emp_data, emp_schema)
dept_df = spark.createDataFrame(dept_data, dept_schema)

emp_df.show()
dept_df.show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1|  Joe| 85000|         1|
|  2|Henry| 80000|         2|
|  3|  Sam| 60000|         2|
|  4|  Max| 90000|         1|
|  5|Janet| 69000|         1|
|  6|Randy| 85000|         1|
|  7| Will| 70000|         1|
+---+-----+------+----------+

+---+-----+
| id| name|
+---+-----+
|  1|   IT|
|  2|Sales|
+---+-----+



In [4]:
# join both the df on department id key
from pyspark.sql.functions import broadcast, col

emp_df = emp_df.withColumnRenamed("id", "emp_id").withColumnRenamed("department", "dept_id")
dept_df = dept_df.withColumnRenamed("name", "department")

emp_df.show()
dept_df.show()

joined_df = emp_df.join(broadcast(dept_df), emp_df.dept_id == dept_df.id, "inner").drop("emp_id", "dept_id", "id")

joined_df.show()

+------+-----+------+-------+
|emp_id| name|salary|dept_id|
+------+-----+------+-------+
|     1|  Joe| 85000|      1|
|     2|Henry| 80000|      2|
|     3|  Sam| 60000|      2|
|     4|  Max| 90000|      1|
|     5|Janet| 69000|      1|
|     6|Randy| 85000|      1|
|     7| Will| 70000|      1|
+------+-----+------+-------+

+---+----------+
| id|department|
+---+----------+
|  1|        IT|
|  2|     Sales|
+---+----------+

+-----+------+----------+
| name|salary|department|
+-----+------+----------+
|  Joe| 85000|        IT|
|Henry| 80000|     Sales|
|  Sam| 60000|     Sales|
|  Max| 90000|        IT|
|Janet| 69000|        IT|
|Randy| 85000|        IT|
| Will| 70000|        IT|
+-----+------+----------+



In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, dense_rank

windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())

joined_df = joined_df.withColumn("salary_rank", dense_rank().over(windowSpec))

joined_df.show()

joined_df.filter("salary_rank <= 3").drop("salary_rank").orderBy("department").select("department", "name", "salary").show()




+-----+------+----------+-----------+
| name|salary|department|salary_rank|
+-----+------+----------+-----------+
|Henry| 80000|     Sales|          1|
|  Sam| 60000|     Sales|          2|
|  Max| 90000|        IT|          1|
|  Joe| 85000|        IT|          2|
|Randy| 85000|        IT|          2|
| Will| 70000|        IT|          3|
|Janet| 69000|        IT|          4|
+-----+------+----------+-----------+

+----------+-----+------+
|department| name|salary|
+----------+-----+------+
|        IT|  Max| 90000|
|        IT|  Joe| 85000|
|        IT|Randy| 85000|
|        IT| Will| 70000|
|     Sales|Henry| 80000|
|     Sales|  Sam| 60000|
+----------+-----+------+

