In [0]:
df = spark.read.option('header','true').csv('/FileStore/tables/emp.txt')

In [0]:
# dbutils.fs.rm('/FileStore/tables/emp.txt')

In [0]:
df.show()

+------+------+-------+------+
|emp_id|  name|   dept|salary|
+------+------+-------+------+
|   101| rohan|     IT|  4000|
|   102| mohan|  admin|  3000|
|   103| rohit|finance|  5000|
|   104|  rana|     HR|  3500|
|   105| sohan|     IT|  6000|
|   106|mohani|  admin|  3500|
|   107| sohit|finance|  5500|
|   108|  kana|     HR|  4500|
|   109|rohani|     IT|  3000|
|   110| mohit|  admin|  3800|
|   111| lohit|finance|  6000|
|   112|  mana|     HR|  3300|
|   113| ruhan|     IT|  4400|
|   114|mukesh|  admin|  6000|
|   115| rohan|finance|  5050|
|   116|   ram|     HR|  8500|
|   117|  roja|     IT|  4800|
|   118|  mojo|  admin|  7000|
|   119| rakul|finance|  5800|
|   120|  raju|     HR|  6500|
+------+------+-------+------+
only showing top 20 rows



In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import *

In [0]:
df.createOrReplaceTempView('df')

In [0]:
%sql select * from df

emp_id,name,dept,salary
101,rohan,IT,4000
102,mohan,admin,3000
103,rohit,finance,5000
104,rana,HR,3500
105,sohan,IT,6000
106,mohani,admin,3500
107,sohit,finance,5500
108,kana,HR,4500
109,rohani,IT,3000
110,mohit,admin,3800


####Max salary from each department

In [0]:
%sql select dept,max(salary)
from df
group by dept

dept,max(salary)
HR,8500
IT,6000
admin,7000
finance,6000


In [0]:
df.groupBy('dept').agg(max('salary')).show()

+-------+-----------+
|   dept|max(salary)|
+-------+-----------+
|     HR|       8500|
|     IT|       6000|
|  admin|       7000|
|finance|       6000|
+-------+-----------+



####max salary over whole table as single window

In [0]:
%sql select df.*,
max(salary) over() as max_salary
from df

emp_id,name,dept,salary,max_salary
101,rohan,IT,4000,8500
102,mohan,admin,3000,8500
103,rohit,finance,5000,8500
104,rana,HR,3500,8500
105,sohan,IT,6000,8500
106,mohani,admin,3500,8500
107,sohit,finance,5500,8500
108,kana,HR,4500,8500
109,rohani,IT,3000,8500
110,mohit,admin,3800,8500


####max salary in particular dept

In [0]:
%sql select df.*,
max(salary) over(partition by dept) as max_salary_in_dept from df

emp_id,name,dept,salary,max_salary_in_dept
104,rana,HR,3500,8500
108,kana,HR,4500,8500
112,mana,HR,3300,8500
116,ram,HR,8500,8500
120,raju,HR,6500,8500
124,romeo,HR,3300,8500
101,rohan,IT,4000,6000
105,sohan,IT,6000,6000
109,rohani,IT,3000,6000
113,ruhan,IT,4400,6000


In [0]:
windowSpec = Window.partitionBy('dept')

df.withColumn('max_salary_in_dept',max('salary').over(windowSpec)).show()

+------+------+-------+------+------------------+
|emp_id|  name|   dept|salary|max_salary_in_dept|
+------+------+-------+------+------------------+
|   104|  rana|     HR|  3500|              8500|
|   108|  kana|     HR|  4500|              8500|
|   112|  mana|     HR|  3300|              8500|
|   116|   ram|     HR|  8500|              8500|
|   120|  raju|     HR|  6500|              8500|
|   124| romeo|     HR|  3300|              8500|
|   101| rohan|     IT|  4000|              6000|
|   105| sohan|     IT|  6000|              6000|
|   109|rohani|     IT|  3000|              6000|
|   113| ruhan|     IT|  4400|              6000|
|   117|  roja|     IT|  4800|              6000|
|   121|ramesh|     IT|  5400|              6000|
|   102| mohan|  admin|  3000|              7000|
|   106|mohani|  admin|  3500|              7000|
|   110| mohit|  admin|  3800|              7000|
|   114|mukesh|  admin|  6000|              7000|
|   118|  mojo|  admin|  7000|              7000|


####row number on table as one window

In [0]:
%sql select df.*,
row_number() over(order by emp_id) as rn from df

emp_id,name,dept,salary,rn
101,rohan,IT,4000,1
102,mohan,admin,3000,2
103,rohit,finance,5000,3
104,rana,HR,3500,4
105,sohan,IT,6000,5
106,mohani,admin,3500,6
107,sohit,finance,5500,7
108,kana,HR,4500,8
109,rohani,IT,3000,9
110,mohit,admin,3800,10


In [0]:
windowSpec = Window.orderBy('emp_id')
df.select('*',row_number().over(windowSpec).alias('rn')).show()

+------+------+-------+------+---+
|emp_id|  name|   dept|salary| rn|
+------+------+-------+------+---+
|   101| rohan|     IT|  4000|  1|
|   102| mohan|  admin|  3000|  2|
|   103| rohit|finance|  5000|  3|
|   104|  rana|     HR|  3500|  4|
|   105| sohan|     IT|  6000|  5|
|   106|mohani|  admin|  3500|  6|
|   107| sohit|finance|  5500|  7|
|   108|  kana|     HR|  4500|  8|
|   109|rohani|     IT|  3000|  9|
|   110| mohit|  admin|  3800| 10|
|   111| lohit|finance|  6000| 11|
|   112|  mana|     HR|  3300| 12|
|   113| ruhan|     IT|  4400| 13|
|   114|mukesh|  admin|  6000| 14|
|   115| rohan|finance|  5050| 15|
|   116|   ram|     HR|  8500| 16|
|   117|  roja|     IT|  4800| 17|
|   118|  mojo|  admin|  7000| 18|
|   119| rakul|finance|  5800| 19|
|   120|  raju|     HR|  6500| 20|
+------+------+-------+------+---+
only showing top 20 rows



####row_number by dept

In [0]:
%sql select *,
row_number() over(partition by dept order by emp_id) as rn from df

emp_id,name,dept,salary,rn
104,rana,HR,3500,1
108,kana,HR,4500,2
112,mana,HR,3300,3
116,ram,HR,8500,4
120,raju,HR,6500,5
124,romeo,HR,3300,6
101,rohan,IT,4000,1
105,sohan,IT,6000,2
109,rohani,IT,3000,3
113,ruhan,IT,4400,4


In [0]:
windowSpec = Window.partitionBy('dept').orderBy('emp_id')
df.select('*',row_number().over(windowSpec).alias('rn')).show()

+------+------+-------+------+---+
|emp_id|  name|   dept|salary| rn|
+------+------+-------+------+---+
|   104|  rana|     HR|  3500|  1|
|   108|  kana|     HR|  4500|  2|
|   112|  mana|     HR|  3300|  3|
|   116|   ram|     HR|  8500|  4|
|   120|  raju|     HR|  6500|  5|
|   124| romeo|     HR|  3300|  6|
|   101| rohan|     IT|  4000|  1|
|   105| sohan|     IT|  6000|  2|
|   109|rohani|     IT|  3000|  3|
|   113| ruhan|     IT|  4400|  4|
|   117|  roja|     IT|  4800|  5|
|   121|ramesh|     IT|  5400|  6|
|   102| mohan|  admin|  3000|  1|
|   106|mohani|  admin|  3500|  2|
|   110| mohit|  admin|  3800|  3|
|   114|mukesh|  admin|  6000|  4|
|   118|  mojo|  admin|  7000|  5|
|   122| mukul|  admin|  3800|  6|
|   103| rohit|finance|  5000|  1|
|   107| sohit|finance|  5500|  2|
+------+------+-------+------+---+
only showing top 20 rows



####row_number by dept only fetch top 2

In [0]:
%sql
select * from (select *,
row_number() over(partition by dept order by emp_id) as rn from df) where rn <3

emp_id,name,dept,salary,rn
104,rana,HR,3500,1
108,kana,HR,4500,2
101,rohan,IT,4000,1
105,sohan,IT,6000,2
102,mohan,admin,3000,1
106,mohani,admin,3500,2
103,rohit,finance,5000,1
107,sohit,finance,5500,2


In [0]:
windowSpec = Window.partitionBy('dept').orderBy('emp_id')
df.select('*',row_number().over(windowSpec).alias('rn')).where('rn<3').show()

+------+------+-------+------+---+
|emp_id|  name|   dept|salary| rn|
+------+------+-------+------+---+
|   104|  rana|     HR|  3500|  1|
|   108|  kana|     HR|  4500|  2|
|   101| rohan|     IT|  4000|  1|
|   105| sohan|     IT|  6000|  2|
|   102| mohan|  admin|  3000|  1|
|   106|mohani|  admin|  3500|  2|
|   103| rohit|finance|  5000|  1|
|   107| sohit|finance|  5500|  2|
+------+------+-------+------+---+



####top 2 salaries in each department

In [0]:
%sql

select * from (select *,
rank() over(partition by dept order by salary desc) as rnk from df) where rnk<3

emp_id,name,dept,salary,rnk
116,ram,HR,8500,1
120,raju,HR,6500,2
105,sohan,IT,6000,1
121,ramesh,IT,5400,2
118,mojo,admin,7000,1
114,mukesh,admin,6000,2
111,lohit,finance,6000,1
123,rik,finance,5900,2


In [0]:
windowSpec = Window.partitionBy('dept').orderBy(col('salary').desc())
df.select('*',rank().over(windowSpec).alias('rnk')).where('rnk<3').show()

+------+------+-------+------+---+
|emp_id|  name|   dept|salary|rnk|
+------+------+-------+------+---+
|   116|   ram|     HR|  8500|  1|
|   120|  raju|     HR|  6500|  2|
|   105| sohan|     IT|  6000|  1|
|   121|ramesh|     IT|  5400|  2|
|   118|  mojo|  admin|  7000|  1|
|   114|mukesh|  admin|  6000|  2|
|   111| lohit|finance|  6000|  1|
|   123|   rik|finance|  5900|  2|
+------+------+-------+------+---+



####compare rank,row_number,dense_rank

In [0]:
%sql
select *,
row_number() over(partition by dept order by salary desc ) as rn,
rank() over(partition by dept order by salary desc) as rnk,
dense_rank() over(partition by dept order by salary desc) as dense_rnk from df

emp_id,name,dept,salary,rn,rnk,dense_rnk
116,ram,HR,8500,1,1,1
120,raju,HR,6500,2,2,2
108,kana,HR,4500,3,3,3
104,rana,HR,3500,4,4,4
112,mana,HR,3300,5,5,5
124,romeo,HR,3300,6,5,5
105,sohan,IT,6000,1,1,1
121,ramesh,IT,5400,2,2,2
117,roja,IT,4800,3,3,3
113,ruhan,IT,4400,4,4,4


In [0]:
windowSpec = Window.partitionBy('dept').orderBy(df['salary'].desc())
df.select('*',row_number().over(windowSpec).alias('rn'),rank().over(windowSpec).alias('rnk'),dense_rank().over(windowSpec).alias('dense_rnk')).show()

+------+------+-------+------+---+---+---------+
|emp_id|  name|   dept|salary| rn|rnk|dense_rnk|
+------+------+-------+------+---+---+---------+
|   116|   ram|     HR|  8500|  1|  1|        1|
|   120|  raju|     HR|  6500|  2|  2|        2|
|   108|  kana|     HR|  4500|  3|  3|        3|
|   104|  rana|     HR|  3500|  4|  4|        4|
|   112|  mana|     HR|  3300|  5|  5|        5|
|   124| romeo|     HR|  3300|  6|  5|        5|
|   105| sohan|     IT|  6000|  1|  1|        1|
|   121|ramesh|     IT|  5400|  2|  2|        2|
|   117|  roja|     IT|  4800|  3|  3|        3|
|   113| ruhan|     IT|  4400|  4|  4|        4|
|   101| rohan|     IT|  4000|  5|  5|        5|
|   109|rohani|     IT|  3000|  6|  6|        6|
|   118|  mojo|  admin|  7000|  1|  1|        1|
|   114|mukesh|  admin|  6000|  2|  2|        2|
|   110| mohit|  admin|  3800|  3|  3|        3|
|   122| mukul|  admin|  3800|  4|  3|        3|
|   106|mohani|  admin|  3500|  5|  5|        4|
|   102| mohan|  adm

####rowsBetween in Pyspark

In [0]:
windowSpec = Window.partitionBy('dept').orderBy('salary').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn('sum_till',sum('salary').over(windowSpec)).show()

+------+------+-------+------+--------+
|emp_id|  name|   dept|salary|sum_till|
+------+------+-------+------+--------+
|   112|  mana|     HR|  3300|  3300.0|
|   124| romeo|     HR|  3300|  6600.0|
|   104|  rana|     HR|  3500| 10100.0|
|   108|  kana|     HR|  4500| 14600.0|
|   120|  raju|     HR|  6500| 21100.0|
|   116|   ram|     HR|  8500| 29600.0|
|   109|rohani|     IT|  3000|  3000.0|
|   101| rohan|     IT|  4000|  7000.0|
|   113| ruhan|     IT|  4400| 11400.0|
|   117|  roja|     IT|  4800| 16200.0|
|   121|ramesh|     IT|  5400| 21600.0|
|   105| sohan|     IT|  6000| 27600.0|
|   102| mohan|  admin|  3000|  3000.0|
|   106|mohani|  admin|  3500|  6500.0|
|   110| mohit|  admin|  3800| 10300.0|
|   122| mukul|  admin|  3800| 14100.0|
|   114|mukesh|  admin|  6000| 20100.0|
|   118|  mojo|  admin|  7000| 27100.0|
|   103| rohit|finance|  5000|  5000.0|
|   115| rohan|finance|  5050| 10050.0|
+------+------+-------+------+--------+
only showing top 20 rows



####Lag/Lead

In [0]:
%sql
select *,
lag(salary,1,0) over(partition by dept order by emp_id) as prev_emp,
lead(salary,1,0) over(partition by dept order by emp_id) as nxt_emp
from df

emp_id,name,dept,salary,prev_emp,nxt_emp
104,rana,HR,3500,0,4500
108,kana,HR,4500,3500,3300
112,mana,HR,3300,4500,8500
116,ram,HR,8500,3300,6500
120,raju,HR,6500,8500,3300
124,romeo,HR,3300,6500,0
101,rohan,IT,4000,0,6000
105,sohan,IT,6000,4000,3000
109,rohani,IT,3000,6000,4400
113,ruhan,IT,4400,3000,4800


In [0]:
windowSpec = Window.partitionBy('dept').orderBy('emp_id')


df.withColumn('prev_emp',lag('salary',1,0).over(windowSpec))\
  .withColumn('nxt_emp',lead('salary',1,0).over(windowSpec))\
  .show()

+------+------+-------+------+--------+-------+
|emp_id|  name|   dept|salary|prev_emp|nxt_emp|
+------+------+-------+------+--------+-------+
|   104|  rana|     HR|  3500|       0|   4500|
|   108|  kana|     HR|  4500|    3500|   3300|
|   112|  mana|     HR|  3300|    4500|   8500|
|   116|   ram|     HR|  8500|    3300|   6500|
|   120|  raju|     HR|  6500|    8500|   3300|
|   124| romeo|     HR|  3300|    6500|      0|
|   101| rohan|     IT|  4000|       0|   6000|
|   105| sohan|     IT|  6000|    4000|   3000|
|   109|rohani|     IT|  3000|    6000|   4400|
|   113| ruhan|     IT|  4400|    3000|   4800|
|   117|  roja|     IT|  4800|    4400|   5400|
|   121|ramesh|     IT|  5400|    4800|      0|
|   102| mohan|  admin|  3000|       0|   3500|
|   106|mohani|  admin|  3500|    3000|   3800|
|   110| mohit|  admin|  3800|    3500|   6000|
|   114|mukesh|  admin|  6000|    3800|   7000|
|   118|  mojo|  admin|  7000|    6000|   3800|
|   122| mukul|  admin|  3800|    7000| 

####salary of the current employee is higher or lower than prevision employee

In [0]:
%sql
select *,
lag(salary) over(partition by dept order by emp_id) as prev_salary,
case
when salary < lag(salary) over(partition by dept order by emp_id) then 'lower'
when salary > lag(salary) over(partition by dept order by emp_id) then 'higher'
when salary = lag(salary) over(partition by dept order by emp_id) then 'equal'
end as salary_compare
from df

emp_id,name,dept,salary,prev_salary,salary_compare
104,rana,HR,3500,,
108,kana,HR,4500,3500.0,higher
112,mana,HR,3300,4500.0,lower
116,ram,HR,8500,3300.0,higher
120,raju,HR,6500,8500.0,lower
124,romeo,HR,3300,6500.0,lower
101,rohan,IT,4000,,
105,sohan,IT,6000,4000.0,higher
109,rohani,IT,3000,6000.0,lower
113,ruhan,IT,4400,3000.0,higher


In [0]:
windowSpec = Window.partitionBy('dept').orderBy('emp_id')

df.withColumn('salary_compare',
             when((lag('salary').over(windowSpec)) < col('salary'),lit('higher')).\
             when((lag('salary').over(windowSpec)) > col('salary'),lit('lower')).\
             when((lag('salary').over(windowSpec))== col('salary'),lit('same'))
             ).show()

+------+------+-------+------+--------------+
|emp_id|  name|   dept|salary|salary_compare|
+------+------+-------+------+--------------+
|   104|  rana|     HR|  3500|          null|
|   108|  kana|     HR|  4500|        higher|
|   112|  mana|     HR|  3300|         lower|
|   116|   ram|     HR|  8500|        higher|
|   120|  raju|     HR|  6500|         lower|
|   124| romeo|     HR|  3300|         lower|
|   101| rohan|     IT|  4000|          null|
|   105| sohan|     IT|  6000|        higher|
|   109|rohani|     IT|  3000|         lower|
|   113| ruhan|     IT|  4400|        higher|
|   117|  roja|     IT|  4800|        higher|
|   121|ramesh|     IT|  5400|        higher|
|   102| mohan|  admin|  3000|          null|
|   106|mohani|  admin|  3500|        higher|
|   110| mohit|  admin|  3800|        higher|
|   114|mukesh|  admin|  6000|        higher|
|   118|  mojo|  admin|  7000|        higher|
|   122| mukul|  admin|  3800|         lower|
|   103| rohit|finance|  5000|    

####Using CTE : Common table expression

In [0]:
%sql 
with avg_salary(avg_sal) as
  (select cast(avg(salary) as int) from df)

select * from df d,avg_salary a
where d.salary > a.avg_sal



emp_id,name,dept,salary,avg_sal
103,rohit,finance,5000,4897
105,sohan,IT,6000,4897
107,sohit,finance,5500,4897
111,lohit,finance,6000,4897
114,mukesh,admin,6000,4897
115,rohan,finance,5050,4897
116,ram,HR,8500,4897
118,mojo,admin,7000,4897
119,rakul,finance,5800,4897
120,raju,HR,6500,4897


#####find the department who's salary is better than the average salary across all departments.

* total salary of each dept. -- total_salary
* avg salary w.r.t each dept. -- avg_salary
* find the department where total_salary > avg_salary

In [0]:
%sql select d.dept,sum(salary) as tsalary from df d group by d.dept

dept,tsalary
finance,33250.0
HR,29600.0
IT,27600.0
admin,27100.0


In [0]:
%sql select cast(avg(tsalary) as int) as avg_sal from (select d.dept,sum(salary) as tsalary from df d group by d.dept) s

avg_sal
29387


In [0]:
%sql
with total_salary(dept,tsalary) as
    (select d.dept,sum(salary) as tsalary from df d group by d.dept),
    avg_salary(avg_sal) as
    (select cast(avg(tsalary) as int) as avg_sal from total_salary)
    
select * from total_salary t 
         join avg_salary a
    on t.tsalary>a.avg_sal

dept,tsalary,avg_sal
finance,33250.0,29387
HR,29600.0,29387
