In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat,lit,upper,col,max,avg,dense_rank

In [0]:
# function --> Create SparkSESSION
def create_session():
    spark_ses = SparkSession.builder\
        .master("local") \
        .appName("create DataFrame") \
        .getOrCreate()
    return spark_ses

# def create_df(spark,data,schema):
#     df1 = spark.createDataFrame(data,schema)
#     return df1


if __name__ =="__main__":

    spark = create_session()
    input_data = [
   (1, "Alice", "HR", 3000, "2021-01-10"),
   (2, "Bob", "IT", 4000, "2021-02-15"),
   (3, "Charlie", "IT", 5000, "2021-03-01"),
   (4, "David", "Finance", 6000, "2021-04-12"),
   (5, "Eva", "Finance", 3500, "2021-05-20"),
   (6, "Frank", "HR", 2800, "2021-06-10"),
   (7, "Grace", "IT", 4500, "2021-07-30"),]
    schema = ["id", "name", "dept", "salary", "joining_date"]

    df = spark.createDataFrame(input_data,schema)
    display(df)

id,name,dept,salary,joining_date
1,Alice,HR,3000,2021-01-10
2,Bob,IT,4000,2021-02-15
3,Charlie,IT,5000,2021-03-01
4,David,Finance,6000,2021-04-12
5,Eva,Finance,3500,2021-05-20
6,Frank,HR,2800,2021-06-10
7,Grace,IT,4500,2021-07-30


In [0]:
# Concatenate name and department into a single column.

new_df= df.withColumn("employee_designation", concat(col("name"), lit(","), col("dept")))
new_df.show()

+---+-------+-------+------+------------+--------------------+
| id|   name|   dept|salary|joining_date|employee_designation|
+---+-------+-------+------+------------+--------------------+
|  1|  Alice|     HR|  3000|  2021-01-10|            Alice,HR|
|  2|    Bob|     IT|  4000|  2021-02-15|              Bob,IT|
|  3|Charlie|     IT|  5000|  2021-03-01|          Charlie,IT|
|  4|  David|Finance|  6000|  2021-04-12|       David,Finance|
|  5|    Eva|Finance|  3500|  2021-05-20|         Eva,Finance|
|  6|  Frank|     HR|  2800|  2021-06-10|            Frank,HR|
|  7|  Grace|     IT|  4500|  2021-07-30|            Grace,IT|
+---+-------+-------+------+------------+--------------------+



In [0]:
# Convert all employee names to uppercase.
upper_employee = df.withColumn("upper_employee", upper(col("name")))
upper_employee.show()

+---+-------+-------+------+------------+--------------+
| id|   name|   dept|salary|joining_date|upper_employee|
+---+-------+-------+------+------------+--------------+
|  1|  Alice|     HR|  3000|  2021-01-10|         ALICE|
|  2|    Bob|     IT|  4000|  2021-02-15|           BOB|
|  3|Charlie|     IT|  5000|  2021-03-01|       CHARLIE|
|  4|  David|Finance|  6000|  2021-04-12|         DAVID|
|  5|    Eva|Finance|  3500|  2021-05-20|           EVA|
|  6|  Frank|     HR|  2800|  2021-06-10|         FRANK|
|  7|  Grace|     IT|  4500|  2021-07-30|         GRACE|
+---+-------+-------+------+------------+--------------+



In [0]:
# Find employees whose name starts with “A”.
df.filter(col("name").startswith("A")).show()

+---+-----+----+------+------------+
| id| name|dept|salary|joining_date|
+---+-----+----+------+------------+
|  1|Alice|  HR|  3000|  2021-01-10|
+---+-----+----+------+------------+



In [0]:
#  Find the highest salary in each department.
max_salary = df.groupby("dept").agg(max("salary").alias("highest_Salary"))
result_df = df.join(max_salary, (df.dept == max_salary.dept) & (df.salary == max_salary.highest_Salary), "inner") \
              .select(df["*"]).show()

+---+-------+-------+------+------------+
| id|   name|   dept|salary|joining_date|
+---+-------+-------+------+------------+
|  1|  Alice|     HR|  3000|  2021-01-10|
|  3|Charlie|     IT|  5000|  2021-03-01|
|  4|  David|Finance|  6000|  2021-04-12|
+---+-------+-------+------+------------+



In [0]:


# Count how many employees joined after March 2021.
count_2021= df.filter(df.joining_date > "2021-03").count()
print(f'no.of employees joined after marc 2021:{count_2021}')

no.of employees joined after marc 2021:5


In [0]:
# Show employees earning more than department average.
from pyspark.sql.window import Window
window_spec = Window.partitionBy("dept")
df_with_avg = df.withColumn("avg_salary", avg("salary").over(window_spec))
earn_more = df_with_avg.filter(col("salary") > col("avg_salary"))
earn_more.show()

+---+-------+-------+------+------------+----------+
| id|   name|   dept|salary|joining_date|avg_salary|
+---+-------+-------+------+------------+----------+
|  4|  David|Finance|  6000|  2021-04-12|    4750.0|
|  1|  Alice|     HR|  3000|  2021-01-10|    2900.0|
|  3|Charlie|     IT|  5000|  2021-03-01|    4500.0|
+---+-------+-------+------+------------+----------+



In [0]:
# Add a new column with a 10% bonus to each employee’s salary.
new_column = df.withColumn("bonus_salary",col('salary')*0.10 )
new_column.show()

+---+-------+-------+------+------------+------------+
| id|   name|   dept|salary|joining_date|bonus_salary|
+---+-------+-------+------+------------+------------+
|  1|  Alice|     HR|  3000|  2021-01-10|       300.0|
|  2|    Bob|     IT|  4000|  2021-02-15|       400.0|
|  3|Charlie|     IT|  5000|  2021-03-01|       500.0|
|  4|  David|Finance|  6000|  2021-04-12|       600.0|
|  5|    Eva|Finance|  3500|  2021-05-20|       350.0|
|  6|  Frank|     HR|  2800|  2021-06-10|       280.0|
|  7|  Grace|     IT|  4500|  2021-07-30|       450.0|
+---+-------+-------+------+------------+------------+



In [0]:
# Find the 2nd highest salary in the company.
window_specific = Window.orderBy(col("salary").desc())
highest_salry = df.withColumn("dense_rank", dense_rank().over(window_specific))
# highest_salry.show()
second_hight= highest_salry.filter("dense_rank = 2")
second_hight.show()

+---+-------+----+------+------------+----------+
| id|   name|dept|salary|joining_date|dense_rank|
+---+-------+----+------+------------+----------+
|  3|Charlie|  IT|  5000|  2021-03-01|         2|
+---+-------+----+------+------------+----------+



In [0]:
# 9. Count employees per department.
df.groupBy('dept').count().show()


+-------+-----+
|   dept|count|
+-------+-----+
|     HR|    2|
|     IT|    3|
|Finance|    2|
+-------+-----+



In [0]:
# 10. Find average salary per department.

df.groupBy('dept').agg(avg('salary').alias("avg_salary")).show()

+-------+----------+
|   dept|avg_salary|
+-------+----------+
|     HR|    2900.0|
|     IT|    4500.0|
|Finance|    4750.0|
+-------+----------+

