In [11]:
from pyspark.sql.functions import *
spark.conf.set("spark.sql.crossJoin.enabled", "true")
from pyspark.sql.functions import col



In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
employee_details_schema = StructType([
    StructField("EmpId", IntegerType(), True),
    StructField("FirstName", StringType(), True),
    StructField("lastName", StringType(), True),
    StructField("ManagerId", IntegerType(), True),
    StructField("DateOfJoining", StringType(), True)
])

employee_salary_schema = StructType([
    StructField("EmpId", IntegerType(), True),
    StructField("Project", StringType(), True),
    StructField("Salary", IntegerType(), True)
])


In [2]:
employee_details = spark.read.csv("Employee_Details.txt",header=False,schema=employee_details_schema)

In [3]:
employee_details.show()

+-----+---------+---------+---------+-------------+
|EmpId|FirstName| lastName|ManagerId|DateOfJoining|
+-----+---------+---------+---------+-------------+
|  121|     Jon |     Snow|      321|   01/31/2014|
|  321|   Walter|    White|      986|   01/30/2015|
|  421| Kuldeep |     Rana|      876|   27/11/2016|
|  121|   Aegon |Targaryen|      321|   01/31/2014|
+-----+---------+---------+---------+-------------+



In [4]:
employee_salary = spark.read.csv("Employee_Salary.txt",header=False,schema=employee_salary_schema)

In [5]:
employee_salary.show()

+-----+-------+------+
|EmpId|Project|Salary|
+-----+-------+------+
|  121|     P1|  8000|
|  321|     P2|  1000|
|  421|     P1| 12000|
+-----+-------+------+



In [6]:
employee_details.registerTempTable("employee_details")
employee_salary.registerTempTable("employee_salary")


DataFrame[EmpId: int, Project: string, Salary: int]

In [8]:
spark.sql("select * from employee_salary").show()
spark.sql("select * from employee_details").show()


+-----+-------+------+
|EmpId|Project|Salary|
+-----+-------+------+
|  121|     P1|  8000|
|  321|     P2|  1000|
|  421|     P1| 12000|
+-----+-------+------+

+-----+---------+---------+---------+-------------+
|EmpId|FirstName| lastName|ManagerId|DateOfJoining|
+-----+---------+---------+---------+-------------+
|  121|     Jon |     Snow|      321|   01/31/2014|
|  321|   Walter|    White|      986|   01/30/2015|
|  421| Kuldeep |     Rana|      876|   27/11/2016|
|  121|   Aegon |Targaryen|      321|   01/31/2014|
+-----+---------+---------+---------+-------------+



In [262]:
# 1) Write a SQL query to fetch the count of employees working in project 'P1'
# select count(*) from employee_salary where Project = 'P1'

one=employee_salary.filter(employee_salary['Project']=='P1').count()

In [263]:
one

2

In [9]:
spark.sql("select count(*) from employee_salary where Project = 'P1'").show()


+--------+
|count(1)|
+--------+
|       2|
+--------+



In [12]:
# Write a SQL query to fetch employee_details names where salary greater than or equal 
# to 5000 and less than or equal to 10000
# select name from employee_details e1 join employee_salary e2 join on e1.EmpId = e2.EmpId
# where e2.Salary >= 5000 and e2.Salary <= 10000

two = employee_details.join(employee_salary,"EmpId").filter((col("Salary") >= 5000) & (col("Salary") <= 10000))

In [13]:
two.show() #.select("FirstName","lastName")

+-----+---------+---------+---------+-------------+-------+------+
|EmpId|FirstName| lastName|ManagerId|DateOfJoining|Project|Salary|
+-----+---------+---------+---------+-------------+-------+------+
|  121|     Jon |     Snow|      321|   01/31/2014|     P1|  8000|
|  121|   Aegon |Targaryen|      321|   01/31/2014|     P1|  8000|
+-----+---------+---------+---------+-------------+-------+------+



In [15]:
spark.sql("select name from employee_details e1 join employee_salary e2 join on e1.EmpId = e2.EmpId \
where e2.Salary >= 5000 and e2.Salary <= 10000").show()



ParseException: "\nmismatched input '.' expecting <EOF>(line 1, pos 71)\n\n== SQL ==\nselect name from employee_details e1 join employee_salary e2 join on e1.EmpId = e2.EmpId where e2.Salary >= 5000 and e2.Salary <= 10000\n-----------------------------------------------------------------------^^^\n"

In [267]:
# Write a SQL query to fetch project-wise count of employees sorted by project's count in descending order. 
three = employee_salary.groupBy("Project").count()

In [268]:
three.show()

+-------+-----+
|Project|count|
+-------+-----+
|     P2|    1|
|     P1|    2|
+-------+-----+



In [269]:
# Write a query to fetch employee names and salary records. 
# Return employee details even if the salary record is not present for the employee.

four = employee_details.join(employee_salary, "EmpId",how="left").select("FirstName","lastName","Salary")

In [270]:
four.show()

+---------+---------+------+
|FirstName| lastName|Salary|
+---------+---------+------+
|     Jon |     Snow|  8000|
|   Walter|    White|  1000|
| Kuldeep |     Rana| 12000|
|   Aegon |Targaryen|  8000|
+---------+---------+------+



In [271]:
# Write a SQL query to fetch all the Employees who are also managers from Employee_Details table.
managers = e1.select("ManagerId").rdd.map(lambda x:x.ManagerId).collect()
managers

[321, 986, 876, 321]

In [272]:
employee_details.where(col("EmpId").isin(managers)).show()

+-----+---------+--------+---------+-------------+
|EmpId|FirstName|lastName|ManagerId|DateOfJoining|
+-----+---------+--------+---------+-------------+
|  321|   Walter|   White|      986|   01/30/2015|
+-----+---------+--------+---------+-------------+



In [278]:
# Write a SQL query to fetch duplicate emp id records from a Employee_Details table.
empCount = employee_details.groupBy("EmpId").count()
empCount.show()

+-----+-----+
|EmpId|count|
+-----+-----+
|  321|    1|
|  121|    2|
|  421|    1|
+-----+-----+



In [281]:
empCount.where(col("count") > 1).select("EmpId").show()

+-----+
|EmpId|
+-----+
|  121|
+-----+



In [282]:
# Write query to delete duplicate rows in a Employee_Details table?
employee_details.show()

+-----+---------+---------+---------+-------------+
|EmpId|FirstName| lastName|ManagerId|DateOfJoining|
+-----+---------+---------+---------+-------------+
|  121|     Jon |     Snow|      321|   01/31/2014|
|  321|   Walter|    White|      986|   01/30/2015|
|  421| Kuldeep |     Rana|      876|   27/11/2016|
|  121|   Aegon |Targaryen|      321|   01/31/2014|
+-----+---------+---------+---------+-------------+



In [285]:
employee_details.dropDuplicates(['EmpId']).show()

+-----+---------+--------+---------+-------------+
|EmpId|FirstName|lastName|ManagerId|DateOfJoining|
+-----+---------+--------+---------+-------------+
|  321|   Walter|   White|      986|   01/30/2015|
|  121|     Jon |    Snow|      321|   01/31/2014|
|  421| Kuldeep |    Rana|      876|   27/11/2016|
+-----+---------+--------+---------+-------------+



In [303]:
# Write SQL query to find the 3rd highest salary from the table without using TOP/limit keyword. 
employee_salary.show()

+-----+-------+------+
|EmpId|Project|Salary|
+-----+-------+------+
|  121|     P1|  8000|
|  321|     P2|  1000|
|  421|     P1| 12000|
+-----+-------+------+



In [287]:
max_salary = employee_salary.agg(max('Salary'))

In [289]:
max_salary.show()

+-----------+
|max(Salary)|
+-----------+
|      12000|
+-----------+



In [319]:
from pyspark.sql.functions import dense_rank,row_number
from pyspark.sql.window import Window
windowSpec = Window.orderBy(employee_salary['Salary'].desc())

# .partitionBy(employee_salary['Salary'])

In [327]:
from pyspark.sql.functions import dense_rank
employee_salary_rank = employee_salary.withColumn("salaryRank", dense_rank().over(windowSpec))
employee_salary_rank.show()

+-----+-------+------+----------+
|EmpId|Project|Salary|salaryRank|
+-----+-------+------+----------+
|  421|     P1| 12000|         1|
|  121|     P1|  8000|         2|
|  321|     P2|  1000|         3|
+-----+-------+------+----------+



In [328]:
employee_salary_rank.select("Salary").where(col('salaryRank') == 3 ).show()

+------+
|Salary|
+------+
|  1000|
+------+

