<a href="https://colab.research.google.com/github/Nivedha-M/NIVEDHA/blob/master/WINDOW_FUNCTIONS_IN_SQL_using_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('employee').getOrCreate()
df = spark.read.csv('employees_few.csv',header=True,inferSchema= True)
df = df.withColumn('SALARY',col('SALARY').cast(IntegerType()))
df.createOrReplaceTempView('employee_table')


In [2]:
#1) MAX SALARY FROM EMPLOYEE TABLE USING WINDOW FUNCTION
max_salary = spark.sql('''
select *, max(SALARY) over(partition by DEPARTMENT_ID) as max_salary from employee_table
''')
max_salary.show()



+-----------+----------+-------------+------+----------+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|SALARY|max_salary|
+-----------+----------+-------------+------+----------+
|        198|    Donald|        Admin|  2600|      4400|
|        199|   Douglas|        Admin|  2600|      4400|
|        200|  Jennifer|        Admin|  4400|      4400|
|        204|   Hermann|      Finance| 10000|     24000|
|        205|   Shelley|      Finance| 12008|     24000|
|        206|   William|      Finance|  8300|     24000|
|        100|    Steven|      Finance| 24000|     24000|
|        101|     Neena|           HR| 17000|     17000|
|        102|       Lex|           HR| 17000|     17000|
|        103| Alexander|           HR|  9000|     17000|
|        104|     Bruce|           HR|  6000|     17000|
|        201|   Michael|           IT| 13000|     13000|
|        202|       Pat|           IT|  6000|     13000|
|        203|     Susan|           IT|  6500|     13000|
+-----------+----------+-------

In [3]:

#ROW_NUMBER FUNC FOR EMP_TABLE
rw_num = spark.sql('''
select *,
row_number() over(partition by DEPARTMENT_ID order by SALARY) as rw_num from employee_table
''')
rw_num.show()

+-----------+----------+-------------+------+------+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|SALARY|rw_num|
+-----------+----------+-------------+------+------+
|        198|    Donald|        Admin|  2600|     1|
|        199|   Douglas|        Admin|  2600|     2|
|        200|  Jennifer|        Admin|  4400|     3|
|        206|   William|      Finance|  8300|     1|
|        204|   Hermann|      Finance| 10000|     2|
|        205|   Shelley|      Finance| 12008|     3|
|        100|    Steven|      Finance| 24000|     4|
|        104|     Bruce|           HR|  6000|     1|
|        103| Alexander|           HR|  9000|     2|
|        101|     Neena|           HR| 17000|     3|
|        102|       Lex|           HR| 17000|     4|
|        202|       Pat|           IT|  6000|     1|
|        203|     Susan|           IT|  6500|     2|
|        201|   Michael|           IT| 13000|     3|
+-----------+----------+-------------+------+------+



In [4]:

#RANK and DENSE RANK FUNC FOR EMP_TABLE
rn_func = spark.sql('''
select *,
rank() over(partition by DEPARTMENT_ID order by SALARY)as rank_func,
dense_rank() over(partition by DEPARTMENT_ID order by SALARY) as dense_func
from employee_table
''')
rn_func.show()

+-----------+----------+-------------+------+---------+----------+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|SALARY|rank_func|dense_func|
+-----------+----------+-------------+------+---------+----------+
|        198|    Donald|        Admin|  2600|        1|         1|
|        199|   Douglas|        Admin|  2600|        1|         1|
|        200|  Jennifer|        Admin|  4400|        3|         2|
|        206|   William|      Finance|  8300|        1|         1|
|        204|   Hermann|      Finance| 10000|        2|         2|
|        205|   Shelley|      Finance| 12008|        3|         3|
|        100|    Steven|      Finance| 24000|        4|         4|
|        104|     Bruce|           HR|  6000|        1|         1|
|        103| Alexander|           HR|  9000|        2|         2|
|        101|     Neena|           HR| 17000|        3|         3|
|        102|       Lex|           HR| 17000|        3|         3|
|        202|       Pat|           IT|  6000|        1|       

In [6]:
#SCENARIO BASED QUESTIONS

#FETCH first 2 EMPLOYEES FROM EACH DPT TO JOIN THE COMPANY
#Assume least employee_id is joined prior than the previous emp id

fetch_frst_2_emp = spark.sql(
'''
select * from(
  select e.*,
  row_number() over(partition by DEPARTMENT_ID order by EMPLOYEE_ID) as rw_num from employee_table e) x
where x.rw_num<3

'''
)
fetch_frst_2_emp.show()

+-----------+----------+-------------+------+------+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|SALARY|rw_num|
+-----------+----------+-------------+------+------+
|        198|    Donald|        Admin|  2600|     1|
|        199|   Douglas|        Admin|  2600|     2|
|        100|    Steven|      Finance| 24000|     1|
|        204|   Hermann|      Finance| 10000|     2|
|        101|     Neena|           HR| 17000|     1|
|        102|       Lex|           HR| 17000|     2|
|        201|   Michael|           IT| 13000|     1|
|        202|       Pat|           IT|  6000|     2|
+-----------+----------+-------------+------+------+



In [7]:

#FETCH top 3 employees in each dpt earning max salary
top_3_max_sal = spark.sql(
'''
select * from(
  select *,
  rank() over(partition by DEPARTMENT_ID order by SALARY desc) as rnk from employee_table) x
where x.rnk<4

''')
top_3_max_sal.show()

+-----------+----------+-------------+------+---+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|SALARY|rnk|
+-----------+----------+-------------+------+---+
|        200|  Jennifer|        Admin|  4400|  1|
|        198|    Donald|        Admin|  2600|  2|
|        199|   Douglas|        Admin|  2600|  2|
|        100|    Steven|      Finance| 24000|  1|
|        205|   Shelley|      Finance| 12008|  2|
|        204|   Hermann|      Finance| 10000|  3|
|        101|     Neena|           HR| 17000|  1|
|        102|       Lex|           HR| 17000|  1|
|        103| Alexander|           HR|  9000|  3|
|        201|   Michael|           IT| 13000|  1|
|        203|     Susan|           IT|  6500|  2|
|        202|       Pat|           IT|  6000|  3|
+-----------+----------+-------------+------+---+



In [8]:

#FETCH A QUERY TO DISPLAY IF SALARY OF AN EMPLOYEE IS HIGHER OR LOWER OR EQUAL TO PREVIOUS EMPLOYEE
salary_high_lower = spark.sql('''
select * ,
case when(x.SALARY > x.previous_salary) THEN "Higher"
     when(x.SALARY < x.previous_salary) THEN "lower"
     when(x.previous_salary is Null) THEN "No previous employee"
     else "Equal" end as salary_comparison
from(
  select *,
  lag(SALARY) OVER(PARTITION BY DEPARTMENT_ID order by EMPLOYEE_ID) as previous_salary from employee_table) x

''')

salary_high_lower.show()


+-----------+----------+-------------+------+---------------+--------------------+
|EMPLOYEE_ID|FIRST_NAME|DEPARTMENT_ID|SALARY|previous_salary|   salary_comparison|
+-----------+----------+-------------+------+---------------+--------------------+
|        198|    Donald|        Admin|  2600|           NULL|No previous employee|
|        199|   Douglas|        Admin|  2600|           2600|               Equal|
|        200|  Jennifer|        Admin|  4400|           2600|              Higher|
|        100|    Steven|      Finance| 24000|           NULL|No previous employee|
|        204|   Hermann|      Finance| 10000|          24000|               lower|
|        205|   Shelley|      Finance| 12008|          10000|              Higher|
|        206|   William|      Finance|  8300|          12008|               lower|
|        101|     Neena|           HR| 17000|           NULL|No previous employee|
|        102|       Lex|           HR| 17000|          17000|               Equal|
|   