In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz

# Unzip the Spark file to the current folder
!tar xf spark-3.0.3-bin-hadoop3.2.tgz

# Install findspark
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

# Import SparkSession
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CreateDataFrame") \
    .getOrCreate()

# Define schema for the DataFrame
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("job_name", StringType(), True),
    StructField("manager_id", IntegerType(), True),
    StructField("hire_date", StringType(), True),
    StructField("salary", FloatType(), True),
    StructField("commission", FloatType(), True),
    StructField("dep_id", IntegerType(), True)
])

# Create a list of tuples containing the data
data = [
    (68319, "KAYLING", "PRESIDENT", None, "1991-11-18", 6000.00, None, 1001),
    (66928, "BLAZE", "MANAGER", 68319, "1991-05-01", 2750.00, None, 3001),
    (67832, "CLARE", "MANAGER", 68319, "1991-06-09", 2550.00, None, 1001),
    (65646, "JONAS", "MANAGER", 68319, "1991-04-02", 2957.00, None, 2001),
    (67858, "SCARLET", "ANALYST", 65646, "1997-04-19", 3100.00, None, 2001),
    (69062, "FRANK", "ANALYST", 65646, "1991-12-03", 3100.00, None, 2001),
    (63679, "SANDRINE", "CLERK", 69062, "1990-12-18", 900.00, None, 2001),
    (64989, "ADELYN", "SALESMAN", 66928, "1991-02-20", 1700.00, 400.00, 3001),
    (65271, "WADE", "SALESMAN", 66928, "1991-02-22", 1350.00, 600.00, 3001),
    (66564, "MADDEN", "SALESMAN", 66928, "1991-09-28", 1350.00, 1500.00, 3001),
    (68454, "TUCKER", "SALESMAN", 66928, "1991-09-08", 1600.00, 0.00, 3001),
    (68736, "ADNRES", "CLERK", 67858, "1997-05-23", 1200.00, None, 2001),
    (69000, "JULIUS", "CLERK", 66928, "1991-12-03", 1050.00, None, 3001),
    (69324, "MARKER", "CLERK", 67832, "1992-01-23", 1400.00, None, 1001)
]

# Create DataFrame
emp_df = spark.createDataFrame(data, schema)

# Show DataFrame
emp_df.show()
department_schema = StructType([
    StructField("dep_id", IntegerType(), True),
    StructField("dep_name", StringType(), True),
    StructField("dep_location", StringType(), True)
])

# Create a list of tuples containing department data
department_data = [
    (1001, "FINANCE", "SYDNEY"),
    (2001, "AUDIT", "MELBOURNE"),
    (3001, "MARKETING", "PERTH"),
    (4001, "PRODUCTION", "BRISBANE")
]
dep_df = spark.createDataFrame(department_data, department_schema)

# Show Department DataFrame
dep_df.show()

+------+--------+---------+----------+----------+------+----------+------+
|emp_id|emp_name| job_name|manager_id| hire_date|salary|commission|dep_id|
+------+--------+---------+----------+----------+------+----------+------+
| 68319| KAYLING|PRESIDENT|      null|1991-11-18|6000.0|      null|  1001|
| 66928|   BLAZE|  MANAGER|     68319|1991-05-01|2750.0|      null|  3001|
| 67832|   CLARE|  MANAGER|     68319|1991-06-09|2550.0|      null|  1001|
| 65646|   JONAS|  MANAGER|     68319|1991-04-02|2957.0|      null|  2001|
| 67858| SCARLET|  ANALYST|     65646|1997-04-19|3100.0|      null|  2001|
| 69062|   FRANK|  ANALYST|     65646|1991-12-03|3100.0|      null|  2001|
| 63679|SANDRINE|    CLERK|     69062|1990-12-18| 900.0|      null|  2001|
| 64989|  ADELYN| SALESMAN|     66928|1991-02-20|1700.0|     400.0|  3001|
| 65271|    WADE| SALESMAN|     66928|1991-02-22|1350.0|     600.0|  3001|
| 66564|  MADDEN| SALESMAN|     66928|1991-09-28|1350.0|    1500.0|  3001|
| 68454|  TUCKER| SALESMA

Task 1: Retrieve employees' names along with their department name.

 *create temporary views for both dataframes.

In [3]:
emp_df.createOrReplaceTempView("employees")
dep_df.createOrReplaceTempView("departments")

In [5]:
query1 = "SELECT emp_name AS EmployeeName, dep_name AS DepartmentName FROM employees JOIN departments ON employees.dep_id = departments.dep_id"
result1 = spark.sql(query1)
result1.show()


+------------+--------------+
|EmployeeName|DepartmentName|
+------------+--------------+
|       BLAZE|     MARKETING|
|      ADELYN|     MARKETING|
|        WADE|     MARKETING|
|      MADDEN|     MARKETING|
|      TUCKER|     MARKETING|
|      JULIUS|     MARKETING|
|     KAYLING|       FINANCE|
|       CLARE|       FINANCE|
|      MARKER|       FINANCE|
|       JONAS|         AUDIT|
|     SCARLET|         AUDIT|
|       FRANK|         AUDIT|
|    SANDRINE|         AUDIT|
|      ADNRES|         AUDIT|
+------------+--------------+



Task 2: Display the details of all employees who have managers, along with the names of their respective managers.

In [17]:
query2 = "SELECT e.emp_id as EmployeeID, e.emp_name AS EmployeeName, e.job_name AS Job,e.hire_date as HireDate,e.salary as Salary,e.commission as Commisssion,e.dep_id as DepartmentID, m.emp_name AS ManagerName FROM employees e JOIN employees m ON e.manager_id = m.emp_id WHERE e.manager_id IS NOT NULL"
result2 = spark.sql(query2)
result2.show()


+----------+------------+--------+----------+------+-----------+------------+-----------+
|EmployeeID|EmployeeName|     Job|  HireDate|Salary|Commisssion|DepartmentID|ManagerName|
+----------+------------+--------+----------+------+-----------+------------+-----------+
|     69324|      MARKER|   CLERK|1992-01-23|1400.0|       null|        1001|      CLARE|
|     64989|      ADELYN|SALESMAN|1991-02-20|1700.0|      400.0|        3001|      BLAZE|
|     65271|        WADE|SALESMAN|1991-02-22|1350.0|      600.0|        3001|      BLAZE|
|     66564|      MADDEN|SALESMAN|1991-09-28|1350.0|     1500.0|        3001|      BLAZE|
|     68454|      TUCKER|SALESMAN|1991-09-08|1600.0|        0.0|        3001|      BLAZE|
|     69000|      JULIUS|   CLERK|1991-12-03|1050.0|       null|        3001|      BLAZE|
|     63679|    SANDRINE|   CLERK|1990-12-18| 900.0|       null|        2001|      FRANK|
|     67858|     SCARLET| ANALYST|1997-04-19|3100.0|       null|        2001|      JONAS|
|     6906

Task 3: Display the details of all employees, including those who don't have a manager, along with the name of their manager if they have one.

In [21]:
query3 = "SELECT  e.emp_id as EmployeeID,e.emp_name AS EmployeeName, e.job_name AS Job,e.hire_date as HireDate,e.salary as Salary,e.commission as Commisssion,e.dep_id as DepartmentID, m.emp_name AS ManagerName FROM employees e LEFT JOIN employees m ON e.manager_id = m.emp_id"
result3 = spark.sql(query3)
result3.show()


+----------+------------+---------+----------+------+-----------+------------+-----------+
|EmployeeID|EmployeeName|      Job|  HireDate|Salary|Commisssion|DepartmentID|ManagerName|
+----------+------------+---------+----------+------+-----------+------------+-----------+
|     68319|     KAYLING|PRESIDENT|1991-11-18|6000.0|       null|        1001|       null|
|     69324|      MARKER|    CLERK|1992-01-23|1400.0|       null|        1001|      CLARE|
|     64989|      ADELYN| SALESMAN|1991-02-20|1700.0|      400.0|        3001|      BLAZE|
|     65271|        WADE| SALESMAN|1991-02-22|1350.0|      600.0|        3001|      BLAZE|
|     66564|      MADDEN| SALESMAN|1991-09-28|1350.0|     1500.0|        3001|      BLAZE|
|     68454|      TUCKER| SALESMAN|1991-09-08|1600.0|        0.0|        3001|      BLAZE|
|     69000|      JULIUS|    CLERK|1991-12-03|1050.0|       null|        3001|      BLAZE|
|     63679|    SANDRINE|    CLERK|1990-12-18| 900.0|       null|        2001|      FRANK|

Task 4: Display the details of all employees who do not have any manager.

In [24]:
query4 = "SELECT emp_id as EmployeeID, emp_name AS EmployeeName, job_name AS Job,hire_date as HireDate,salary as Salary,commission as Commisssion,dep_id as DepartmentID FROM employees WHERE manager_id IS NULL"
result4 = spark.sql(query4)
result4.show()


+----------+------------+---------+----------+------+-----------+------------+
|EmployeeID|EmployeeName|      Job|  HireDate|Salary|Commisssion|DepartmentID|
+----------+------------+---------+----------+------+-----------+------------+
|     68319|     KAYLING|PRESIDENT|1991-11-18|6000.0|       null|        1001|
+----------+------------+---------+----------+------+-----------+------------+



Task 5: Show the details of the manager who has the most number of employees working under him/her.

In [39]:
query5= """
WITH ManagerRank AS (
    SELECT manager_id, COUNT(*) AS NumberOfEmployees
    FROM employees
    WHERE manager_id IS NOT NULL
    GROUP BY manager_id
    ORDER BY NumberOfEmployees DESC
    LIMIT 1
)
SELECT
    e.emp_id AS EmployeeID,
    e.emp_name AS EmployeeName,
    e.job_name AS Job,
    e.manager_id AS Manager,
    e.hire_date AS HireDate,
    e.salary AS Salary,
    e.commission AS Commission,
    e.dep_id AS DepartmentID,
    m.NumberOfEmployees AS NumberOfEmployees
FROM employees e
JOIN ManagerRank m ON e.emp_id = m.manager_id
"""
result5 = spark.sql(query5)
result5.show()


+----------+------------+-------+-------+----------+------+----------+------------+-----------------+
|EmployeeID|EmployeeName|    Job|Manager|  HireDate|Salary|Commission|DepartmentID|NumberOfEmployees|
+----------+------------+-------+-------+----------+------+----------+------------+-----------------+
|     66928|       BLAZE|MANAGER|  68319|1991-05-01|2750.0|      null|        3001|                5|
+----------+------------+-------+-------+----------+------+----------+------------+-----------------+

