In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

In [0]:
data = [
    ("Ananya", "HR", 52000),
    ("Rahul", "Engineering", 65000),
    ("Priya", "Engineering", 60000),
    ("Zoya", "Marketing", 48000),
    ("Karan", "HR", 53000),
    ("Naveen", "Engineering", 70000),
    ("Fatima", "Marketing", 45000)
]
columns = ["Name", "Department", "Salary"]
df_emp = spark.createDataFrame(data, columns)

In [0]:
performance = [
    ("Ananya", 2023, 4.5),
    ("Rahul", 2023, 4.9),
    ("Priya", 2023, 4.3),
    ("Zoya", 2023, 3.8),
    ("Karan", 2023, 4.1),
    ("Naveen", 2023, 4.7),
    ("Fatima", 2023, 3.9)
]
columns_perf = ["Name", "Year", "Rating"]
df_perf = spark.createDataFrame(performance, columns_perf)

In [0]:
# 1. Get the average salary by department.
from pyspark.sql.functions import avg

df_emp.groupBy("Department").agg(avg("Salary").alias("Avg_Salary")).show()


+-----------+----------+
| Department|Avg_Salary|
+-----------+----------+
|         HR|   52500.0|
|Engineering|   65000.0|
|  Marketing|   46500.0|
+-----------+----------+



In [0]:
# 2. Count of employees per department.
from pyspark.sql.functions import count

df_emp.groupBy("Department").agg(count("*").alias("Employee_Count")).show()


+-----------+--------------+
| Department|Employee_Count|
+-----------+--------------+
|         HR|             2|
|Engineering|             3|
|  Marketing|             2|
+-----------+--------------+



In [0]:
# 3. Maximum and minimum salary in Engineering.
from pyspark.sql.functions import max, min

df_emp.filter(df_emp.Department == "Engineering") \
    .agg(
        max("Salary").alias("Max_Salary"),
        min("Salary").alias("Min_Salary")
    ).show()


+----------+----------+
|Max_Salary|Min_Salary|
+----------+----------+
|     70000|     60000|
+----------+----------+



In [0]:
# 4. Perform an inner join between employee_data and performance_data on Name.
df_joined = df_emp.join(df_perf, on="Name", how="inner")
df_joined.show()


+------+-----------+------+----+------+
|  Name| Department|Salary|Year|Rating|
+------+-----------+------+----+------+
|Ananya|         HR| 52000|2023|   4.5|
|Fatima|  Marketing| 45000|2023|   3.9|
| Karan|         HR| 53000|2023|   4.1|
|Naveen|Engineering| 70000|2023|   4.7|
| Priya|Engineering| 60000|2023|   4.3|
| Rahul|Engineering| 65000|2023|   4.9|
|  Zoya|  Marketing| 48000|2023|   3.8|
+------+-----------+------+----+------+



In [0]:
# 5. Show each employee’s salary and performance rating.
df_joined.select("Name", "Salary", "Rating").show()


+------+------+------+
|  Name|Salary|Rating|
+------+------+------+
|Ananya| 52000|   4.5|
|Fatima| 45000|   3.9|
| Karan| 53000|   4.1|
|Naveen| 70000|   4.7|
| Priya| 60000|   4.3|
| Rahul| 65000|   4.9|
|  Zoya| 48000|   3.8|
+------+------+------+



In [0]:
# 6. Filter employees with rating > 4.5 and salary > 60000.
df_joined.filter((df_joined.Rating > 4.5) & (df_joined.Salary > 60000)).show()


+------+-----------+------+----+------+
|  Name| Department|Salary|Year|Rating|
+------+-----------+------+----+------+
|Naveen|Engineering| 70000|2023|   4.7|
| Rahul|Engineering| 65000|2023|   4.9|
+------+-----------+------+----+------+



In [0]:
# 7. Rank employees by salary department-wise.
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

window_spec = Window.partitionBy("Department").orderBy(df_emp["Salary"].desc())

df_ranked = df_emp.withColumn("Rank", rank().over(window_spec))
df_ranked.show()


+------+-----------+------+----+
|  Name| Department|Salary|Rank|
+------+-----------+------+----+
|Naveen|Engineering| 70000|   1|
| Rahul|Engineering| 65000|   2|
| Priya|Engineering| 60000|   3|
| Karan|         HR| 53000|   1|
|Ananya|         HR| 52000|   2|
|  Zoya|  Marketing| 48000|   1|
|Fatima|  Marketing| 45000|   2|
+------+-----------+------+----+



In [0]:
# 8. Calculate cumulative salary in each department.
from pyspark.sql.functions import sum

window_spec_cum = Window.partitionBy("Department").orderBy("Salary").rowsBetween(Window.unboundedPreceding, 0)

df_cumsum = df_emp.withColumn("CumulativeSalary", sum("Salary").over(window_spec_cum))
df_cumsum.show()


+------+-----------+------+----------------+
|  Name| Department|Salary|CumulativeSalary|
+------+-----------+------+----------------+
| Priya|Engineering| 60000|           60000|
| Rahul|Engineering| 65000|          125000|
|Naveen|Engineering| 70000|          195000|
|Ananya|         HR| 52000|           52000|
| Karan|         HR| 53000|          105000|
|Fatima|  Marketing| 45000|           45000|
|  Zoya|  Marketing| 48000|           93000|
+------+-----------+------+----------------+



In [0]:
# 9. Add a new column JoinDate with random dates between 2020 and 2023.
from pyspark.sql.functions import expr

df_dates = df_emp.withColumn("JoinDate", expr(
    "CASE Name " +
    "WHEN 'Ananya' THEN DATE('2020-06-01') " +
    "WHEN 'Rahul' THEN DATE('2021-03-15') " +
    "WHEN 'Priya' THEN DATE('2022-01-20') " +
    "WHEN 'Zoya' THEN DATE('2023-07-12') " +
    "WHEN 'Karan' THEN DATE('2020-11-05') " +
    "WHEN 'Naveen' THEN DATE('2021-12-25') " +
    "WHEN 'Fatima' THEN DATE('2022-08-09') " +
    "END"
))
df_dates.show()


+------+-----------+------+----------+
|  Name| Department|Salary|  JoinDate|
+------+-----------+------+----------+
|Ananya|         HR| 52000|2020-06-01|
| Rahul|Engineering| 65000|2021-03-15|
| Priya|Engineering| 60000|2022-01-20|
|  Zoya|  Marketing| 48000|2023-07-12|
| Karan|         HR| 53000|2020-11-05|
|Naveen|Engineering| 70000|2021-12-25|
|Fatima|  Marketing| 45000|2022-08-09|
+------+-----------+------+----------+



In [0]:
# 10. Add column YearsWithCompany using current_date() and datediff().
from pyspark.sql.functions import current_date, datediff

df_years = df_dates.withColumn("YearsWithCompany", (datediff(current_date(), "JoinDate") / 365).cast("int"))
df_years.show()


+------+-----------+------+----------+----------------+
|  Name| Department|Salary|  JoinDate|YearsWithCompany|
+------+-----------+------+----------+----------------+
|Ananya|         HR| 52000|2020-06-01|               5|
| Rahul|Engineering| 65000|2021-03-15|               4|
| Priya|Engineering| 60000|2022-01-20|               3|
|  Zoya|  Marketing| 48000|2023-07-12|               1|
| Karan|         HR| 53000|2020-11-05|               4|
|Naveen|Engineering| 70000|2021-12-25|               3|
|Fatima|  Marketing| 45000|2022-08-09|               2|
+------+-----------+------+----------+----------------+



In [0]:
# 11. Write the full employee DataFrame to CSV with headers.
df_emp.write.mode("overwrite").option("header", "true").csv("/tmp/employee_data_csv")


In [0]:
# 12. Save the joined DataFrame to a Parquet file.
df_joined.write.mode("overwrite").parquet("/tmp/employee_performance_parquet")


In [0]:
df_read = spark.read.option("header", "true").csv("dbfs:/tmp/employee_data_csv")
df_read.show()


+------+-----------+------+
|  Name| Department|Salary|
+------+-----------+------+
| Rahul|Engineering| 65000|
| Priya|Engineering| 60000|
|Naveen|Engineering| 70000|
|Fatima|  Marketing| 45000|
|  Zoya|  Marketing| 48000|
| Karan|         HR| 53000|
|Ananya|         HR| 52000|
+------+-----------+------+



In [0]:
df_read = spark.read.option("header", "true").parquet("dbfs:/tmp/employee_performance_parquet")
df_read.show()

+------+-----------+------+----+------+
|  Name| Department|Salary|Year|Rating|
+------+-----------+------+----+------+
|Ananya|         HR| 52000|2023|   4.5|
|Fatima|  Marketing| 45000|2023|   3.9|
| Karan|         HR| 53000|2023|   4.1|
|Naveen|Engineering| 70000|2023|   4.7|
| Priya|Engineering| 60000|2023|   4.3|
| Rahul|Engineering| 65000|2023|   4.9|
|  Zoya|  Marketing| 48000|2023|   3.8|
+------+-----------+------+----+------+

