In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType

### Creating APP

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("PySpark SQL Five Question") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Reading dataset

In [162]:
#Initializing File Type and path for data train
file_type = 'text'
delimeter=','


In [163]:
def load_data(file_type):
    """input type of file "text" or "parquet" and Return pyspark dataframe"""
    if file_type =="text": # use text as file type input
        df = spark.read.option("header", "true") \
                       .option("delimeter",delimeter)\
                       .option("inferSchema", "true") \
                       .csv(path)  #path file that you want import
    else:  
        df= spark.read.parquet("example.parquet") #path file that you want import
    return df


In [164]:
# reading worker dataset
path=r'/home/hasan/DATA SET/PySpark SQL/Employee.csv'
employee = load_data(file_type)

# reading department dataset
path = r'/home/hasan/DATA SET/PySpark SQL/department.csv'
department = load_data(file_type)

### converting to DateType

In [165]:
employee = employee.withColumn("HireDate", employee["HireDate"].cast(DateType()))

### Initializing dataset

In [166]:
employee.show()

+-------+--------+--------+-----------------+-------+----------+------+----------+--------+
|EmpCode|EmpFName|EmpLName|              Job|Manager|  HireDate|Salary|Commission|DEPTCODE|
+-------+--------+--------+-----------------+-------+----------+------+----------+--------+
|   9369|    TONY|   STARK|SOFTWARE ENGINEER|   7902|1980-12-17|  2800|         0|      20|
|   9499|     TIM|   ADOLF|         SALESMAN|   7698|1981-02-20|  1600|       300|      30|
|   9566|     KIM|  JARVIS|          MANAGER|   7839|1981-04-02|  3570|         0|      20|
|   9654|     SAM|   MILES|         SALESMAN|   7698|1981-09-28|  1250|      1400|      30|
|   9782|   KEVIN|    HILL|          MANAGER|   7839|1981-06-09|  2940|         0|      10|
|   9788|  CONNIE|   SMITH|          ANALYST|   7566|1982-12-09|  3000|         0|      20|
|   9839|  ALFRED| KINSLEY|        PRESIDENT|   7566|1981-11-17|  5000|         0|      10|
|   9844|    PAUL| TIMOTHY|         SALESMAN|   7698|1981-09-08|  1500|         

In [167]:
employee.printSchema()

root
 |-- EmpCode: integer (nullable = true)
 |-- EmpFName: string (nullable = true)
 |-- EmpLName: string (nullable = true)
 |-- Job: string (nullable = true)
 |-- Manager: integer (nullable = true)
 |-- HireDate: date (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Commission: integer (nullable = true)
 |-- DEPTCODE: integer (nullable = true)



In [168]:
department.show()

+--------+---------+----------+
|DEPTCODE| DeptName|  LOCATION|
+--------+---------+----------+
|      10|  FINANCE| EDINBURGH|
|      20| SOFTWARE|PADDINGTON|
|      30|    SALES| MAIDSTONE|
|      40|MARKETING|DARLINGTON|
|      50|    ADMIN|BIRMINGHAM|
+--------+---------+----------+



In [169]:
department.printSchema()

root
 |-- DEPTCODE: integer (nullable = true)
 |-- DeptName: string (nullable = true)
 |-- LOCATION: string (nullable = true)



### DataFrame as SQL

In [170]:
employee.createOrReplaceTempView('employee_data')
department.createOrReplaceTempView('department_data')

### SQL Question and Answer

In [171]:
#1 Create a query that displays EMPFNAME, EMPLNAME, DEPTCODE, DEPTNAME, LOCATION from EMPLOYEE, and 
#DEPARTMENT tables. Make sure the results are in the ascending order based on the EMPFNAME and LOCATION of the department.
spark.sql(" select E.EmpFName, E.EmpLName, E.DEPTCODE, D.DeptName, D.LOCATION from  employee_data E, department_data D where E.DEPTCODE=D.DEPTCODE order by EmpFName, LOCATION ").show()

+--------+--------+--------+---------+----------+
|EmpFName|EmpLName|DEPTCODE| DeptName|  LOCATION|
+--------+--------+--------+---------+----------+
|  ALFRED| KINSLEY|      10|  FINANCE| EDINBURGH|
|  ANDREW|FAULKNER|      10|  FINANCE| EDINBURGH|
|  ATHENA|  WILSON|      50|    ADMIN|BIRMINGHAM|
|   BELLA|    SWAN|      30|    SALES| MAIDSTONE|
|  CONNIE|   SMITH|      20| SOFTWARE|PADDINGTON|
|JENNIFER|  HUETTE|      50|    ADMIN|BIRMINGHAM|
|    JOHN|  ASGHAR|      20| SOFTWARE|PADDINGTON|
|   KAREN|MATTHEWS|      20| SOFTWARE|PADDINGTON|
|   KEVIN|    HILL|      10|  FINANCE| EDINBURGH|
|     KIM|  JARVIS|      20| SOFTWARE|PADDINGTON|
|   MADII| HIMBURY|      40|MARKETING|DARLINGTON|
|    PAUL| TIMOTHY|      30|    SALES| MAIDSTONE|
|    ROSE| SUMMERS|      20| SOFTWARE|PADDINGTON|
|     SAM|   MILES|      30|    SALES| MAIDSTONE|
|     TIM|   ADOLF|      30|    SALES| MAIDSTONE|
|    TONY|   STARK|      20| SOFTWARE|PADDINGTON|
|   WENDY|   SHAWN|      30|    SALES| MAIDSTONE|


In [172]:
#2 Display EMPFNAME and “TOTAL SALARY” for each employee
spark.sql(" select EmpFName, result1.Total_salary from employee_data, (select sum(Salary+Commission) as Total_salary from employee_data group by EmpCode) result1 ").show()

+--------+------------+
|EmpFName|Total_salary|
+--------+------------+
|    TONY|        2950|
|    TONY|        5100|
|    TONY|        3570|
|    TONY|        2940|
|    TONY|        2650|
|    TONY|        3420|
|    TONY|        3000|
|    TONY|        1500|
|    TONY|        2800|
|    TONY|        5000|
|    TONY|        1900|
|    TONY|        3300|
|    TONY|         500|
|    TONY|        2200|
|    TONY|        3100|
|    TONY|        7100|
|    TONY|        3000|
|     TIM|        2950|
|     TIM|        5100|
|     TIM|        3570|
+--------+------------+
only showing top 20 rows



In [173]:
#3 Display MAX and 2nd MAX SALARY from the EMPLOYEE table. 
spark.sql(" select Salary from employee_data order by Salary desc limit 2 ").show()

+------+
|Salary|
+------+
|  7000|
|  5000|
+------+



In [174]:
# 3 Display 2nd MAX SALARY from the EMPLOYEE table.
spark.sql(" select max(Salary) as Second_Max from employee_data where Salary not in (select max(Salary) from employee_data) ").show()

+----------+
|Second_Max|
+----------+
|      5000|
+----------+



In [175]:
#3 Display MAX and 2nd MAX SALARY from the EMPLOYEE table.
spark.sql(" select \
          (select max(Salary) as MaxSalary from employee_data) \
          union \
          (select max(Salary) as SecondMax from employee_data where Salary not in (select max(Salary) from employee_data)) ").show()

+----------------+
|scalarsubquery()|
+----------------+
|            5000|
|            7000|
+----------------+



In [176]:
#4 Display the TOTAL SALARY drawn by an analyst working in dept no 20
spark.sql(" select sum(Salary+Commission) as Total_Salary from employee_data where Job='ANALYST'  and DEPTCODE=20 ").show()

+------------+
|Total_Salary|
+------------+
|        3000|
+------------+



In [178]:
#5 Compute average, minimum and maximum salaries of the group of employees having the job of ANALYST.
spark.sql(" select avg(Salary) as average, min(Salary) as minimum, max(Salary) as maximum from employee_data where Job='ANALYST' ").show()

+-------+-------+-------+
|average|minimum|maximum|
+-------+-------+-------+
| 4000.0|   2000|   7000|
+-------+-------+-------+

