In [122]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import date

spark = SparkSession.builder.appName("Practice").getOrCreate()

# Define Schema
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("dept", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("join_date", DateType(), True)
])

# Sample Data
data = [
    (1, "Amit", "IT", 70000, date(2022,1,10)),
    (2, "Rahul", "IT", 60000, date(2021,5,12)),
    (3, "Priya", "IT", 60000, date(2023,3,15)),
    (4, "Neha", "HR", 50000, date(2020,7,19)),
    (5, "Karan", "HR", 80000, date(2019,11,1)),
    (6, "Rohit", "Finance", 90000, date(2018,4,23)),
    (7, "Simran", "Finance", 40000, date(2022,8,30)),
    (8, "Arjun", "IT", 75000, date(2020,9,14))
]

employee = spark.createDataFrame(data, schema)

employee.show()
employee.printSchema()

+------+--------+-------+------+----------+
|emp_id|emp_name|   dept|salary| join_date|
+------+--------+-------+------+----------+
|     1|    Amit|     IT| 70000|2022-01-10|
|     2|   Rahul|     IT| 60000|2021-05-12|
|     3|   Priya|     IT| 60000|2023-03-15|
|     4|    Neha|     HR| 50000|2020-07-19|
|     5|   Karan|     HR| 80000|2019-11-01|
|     6|   Rohit|Finance| 90000|2018-04-23|
|     7|  Simran|Finance| 40000|2022-08-30|
|     8|   Arjun|     IT| 75000|2020-09-14|
+------+--------+-------+------+----------+

root
 |-- emp_id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- join_date: date (nullable = true)



In [123]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

order_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("amount", IntegerType(), True)
])

order_data = [
    (101, 1, "Laptop", 50000),
    (102, 2, "Mobile", 20000),
    (103, 1, "Keyboard", 2000),
    (104, 3, "Laptop", 50000),
    (105, 2, "Mouse", 1000),
    (106, 3, "Monitor", 10000)
]

orders = spark.createDataFrame(order_data, ["order_id", "emp_id", "product", "amount"])

orders.show()

+--------+------+--------+------+
|order_id|emp_id| product|amount|
+--------+------+--------+------+
|     101|     1|  Laptop| 50000|
|     102|     2|  Mobile| 20000|
|     103|     1|Keyboard|  2000|
|     104|     3|  Laptop| 50000|
|     105|     2|   Mouse|  1000|
|     106|     3| Monitor| 10000|
+--------+------+--------+------+



In [124]:
employee.show(),orders.show()

+------+--------+-------+------+----------+
|emp_id|emp_name|   dept|salary| join_date|
+------+--------+-------+------+----------+
|     1|    Amit|     IT| 70000|2022-01-10|
|     2|   Rahul|     IT| 60000|2021-05-12|
|     3|   Priya|     IT| 60000|2023-03-15|
|     4|    Neha|     HR| 50000|2020-07-19|
|     5|   Karan|     HR| 80000|2019-11-01|
|     6|   Rohit|Finance| 90000|2018-04-23|
|     7|  Simran|Finance| 40000|2022-08-30|
|     8|   Arjun|     IT| 75000|2020-09-14|
+------+--------+-------+------+----------+

+--------+------+--------+------+
|order_id|emp_id| product|amount|
+--------+------+--------+------+
|     101|     1|  Laptop| 50000|
|     102|     2|  Mobile| 20000|
|     103|     1|Keyboard|  2000|
|     104|     3|  Laptop| 50000|
|     105|     2|   Mouse|  1000|
|     106|     3| Monitor| 10000|
+--------+------+--------+------+



(None, None)

In [125]:
employee.rdd.getNumPartitions()

8

In [126]:
from pyspark.sql.functions import sum
from pyspark.sql.window import Window

window_range = Window.partitionBy("dept").orderBy("salary")

window_rows = Window.partitionBy("dept") \
                    .orderBy("salary") \
                    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

employee.withColumn("range_sum", sum("salary").over(window_range)) \
        .withColumn("rows_sum", sum("salary").over(window_rows)) \
        .show()

+------+--------+-------+------+----------+---------+--------+
|emp_id|emp_name|   dept|salary| join_date|range_sum|rows_sum|
+------+--------+-------+------+----------+---------+--------+
|     4|    Neha|     HR| 50000|2020-07-19|    50000|   50000|
|     5|   Karan|     HR| 80000|2019-11-01|   130000|  130000|
|     7|  Simran|Finance| 40000|2022-08-30|    40000|   40000|
|     6|   Rohit|Finance| 90000|2018-04-23|   130000|  130000|
|     2|   Rahul|     IT| 60000|2021-05-12|   120000|   60000|
|     3|   Priya|     IT| 60000|2023-03-15|   120000|  120000|
|     1|    Amit|     IT| 70000|2022-01-10|   190000|  190000|
|     8|   Arjun|     IT| 75000|2020-09-14|   265000|  265000|
+------+--------+-------+------+----------+---------+--------+



In [127]:
from pyspark.sql.functions import col,dense_rank,row_number

window = Window.partitionBy('dept').orderBy(col('salary').desc())
highest_sal_per_dept = employee.withColumn('rnk',row_number().over(window)).filter(col('rnk') == 3).select('emp_name','dept','salary')
highest_sal_per_dept.show()

highest_sal_per_dept.rdd.getNumPartitions()

+--------+----+------+
|emp_name|dept|salary|
+--------+----+------+
|   Rahul|  IT| 60000|
+--------+----+------+



200

In [128]:
from pyspark.sql.functions import max, when

result = employee \
    .withColumn("rn", row_number().over(window)) \
    .groupBy("dept") \
    .agg(
        max(when(col("rn") == 2, col("salary"))).alias("third_salary")
    )

result.show()

result.rdd.getNumPartitions()
spark.conf.get("spark.sql.shuffle.partitions")

+-------+------------+
|   dept|third_salary|
+-------+------------+
|     HR|       50000|
|Finance|       40000|
|     IT|       70000|
+-------+------------+



'200'

In [129]:
from pyspark.sql.functions import sum, col

result = employee \
    .groupBy("dept") \
    .agg(sum("salary").alias("total_salary")) \
    .orderBy(col("total_salary").desc()) \
    .limit(1)

result.show()

+----+------------+
|dept|total_salary|
+----+------------+
|  IT|      265000|
+----+------------+



In [130]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg

window = Window.partitionBy("dept")

result = employee \
    .withColumn("dept_avg", avg("salary").over(window)) \
    .filter(col("salary") > col("dept_avg")) \
    .select("emp_name", "dept", "salary", "dept_avg")

result.show()
result.rdd.getNumPartitions()

+--------+-------+------+--------+
|emp_name|   dept|salary|dept_avg|
+--------+-------+------+--------+
|   Karan|     HR| 80000| 65000.0|
|   Rohit|Finance| 90000| 65000.0|
|    Amit|     IT| 70000| 66250.0|
|   Arjun|     IT| 75000| 66250.0|
+--------+-------+------+--------+



200

In [131]:
spark.conf.set("spark.sql.adaptive.enabled", "False")
#spark.conf.set("spark.sql.shuffle.partitions", "200")

In [135]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.range(100000)

print("Initial partitions:", df.rdd.getNumPartitions())

df2 = df.groupBy((df.id % 5)).count()
print(df2.rdd.getNumPartitions())


print("After groupBy partitions:", df2.rdd.getNumPartitions())
df.explain(True)

Initial partitions: 8
200
After groupBy partitions: 200
== Parsed Logical Plan ==
Range (0, 100000, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
Range (0, 100000, step=1, splits=Some(8))

== Optimized Logical Plan ==
Range (0, 100000, step=1, splits=Some(8))

== Physical Plan ==
*(1) Range (0, 100000, step=1, splits=8)



In [133]:
spark.conf.get("spark.sql.adaptive.enabled")

'False'