<a href="https://colab.research.google.com/github/Indresh0007/PySpark-Indresh/blob/main/Advance.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Windows functions

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    (1, "Alice", 2000, ["math", "science"], {"city": "NYC", "zip": "10001"}),
    (2, "Bob", 1500, ["english"], {"city": "SF", "zip": "94105"}),
    (3, "Charlie", 2200, ["math", "history", "science"], {"city": "NYC", "zip": "10001"}),
    (4, "David", 1200, ["art"], {"city": "LA", "zip": "90001"}),
]

df = spark.createDataFrame(data, schema=["id", "name", "salary", "subjects", "address"])
df.show(truncate=False)

+---+-------+------+------------------------+---------------------------+
|id |name   |salary|subjects                |address                    |
+---+-------+------+------------------------+---------------------------+
|1  |Alice  |2000  |[math, science]         |{zip -> 10001, city -> NYC}|
|2  |Bob    |1500  |[english]               |{zip -> 94105, city -> SF} |
|3  |Charlie|2200  |[math, history, science]|{zip -> 10001, city -> NYC}|
|4  |David  |1200  |[art]                   |{zip -> 90001, city -> LA} |
+---+-------+------+------------------------+---------------------------+



In [2]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy("address.city").orderBy("salary")
df.withColumn("rank", rank().over(window_spec)).show()

+---+-------+------+--------------------+--------------------+----+
| id|   name|salary|            subjects|             address|rank|
+---+-------+------+--------------------+--------------------+----+
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|   1|
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|   1|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|   2|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|   1|
+---+-------+------+--------------------+--------------------+----+



In [3]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, max, sum, avg

# Employee Data
data = [
    (1, "John", "Sales", 3000),
    (2, "Jane", "Finance", 4000),
    (3, "Mike", "Sales", 3500),
    (4, "Alice", "Finance", 3800),
    (5, "Bob", "IT", 4500),
    (6, "Tom", "Sales", 3700),
    (7, "Jerry", "Finance", 4200),
    (8, "Sam", "IT", 4700),
    (9, "Steve", "Sales", 3100),
    (10, "Rachel", "IT", 4600)
]
columns = ["EmpID", "Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

df.show()

+-----+------+----------+------+
|EmpID|  Name|Department|Salary|
+-----+------+----------+------+
|    1|  John|     Sales|  3000|
|    2|  Jane|   Finance|  4000|
|    3|  Mike|     Sales|  3500|
|    4| Alice|   Finance|  3800|
|    5|   Bob|        IT|  4500|
|    6|   Tom|     Sales|  3700|
|    7| Jerry|   Finance|  4200|
|    8|   Sam|        IT|  4700|
|    9| Steve|     Sales|  3100|
|   10|Rachel|        IT|  4600|
+-----+------+----------+------+



In [8]:
# Define Window Spec
window_spec = Window.partitionBy("Department").orderBy(col("Salary").desc())

# Apply Rank and Show
df.withColumn("Rank", rank().over(window_spec)).show()

+-----+------+----------+------+----+
|EmpID|  Name|Department|Salary|Rank|
+-----+------+----------+------+----+
|    7| Jerry|   Finance|  4200|   1|
|    2|  Jane|   Finance|  4000|   2|
|    4| Alice|   Finance|  3800|   3|
|    8|   Sam|        IT|  4700|   1|
|   10|Rachel|        IT|  4600|   2|
|    5|   Bob|        IT|  4500|   3|
|    6|   Tom|     Sales|  3700|   1|
|    3|  Mike|     Sales|  3500|   2|
|    9| Steve|     Sales|  3100|   3|
|    1|  John|     Sales|  3000|   4|
+-----+------+----------+------+----+



In [10]:
window_spec=Window.partitionBy("Department")
df.withColumn("MaxSalaryDept",max("Salary").over(window_spec)).show()

+-----+------+----------+------+-------------+
|EmpID|  Name|Department|Salary|MaxSalaryDept|
+-----+------+----------+------+-------------+
|    2|  Jane|   Finance|  4000|         4200|
|    4| Alice|   Finance|  3800|         4200|
|    7| Jerry|   Finance|  4200|         4200|
|    5|   Bob|        IT|  4500|         4700|
|    8|   Sam|        IT|  4700|         4700|
|   10|Rachel|        IT|  4600|         4700|
|    1|  John|     Sales|  3000|         3700|
|    3|  Mike|     Sales|  3500|         3700|
|    6|   Tom|     Sales|  3700|         3700|
|    9| Steve|     Sales|  3100|         3700|
+-----+------+----------+------+-------------+



In [20]:
window_spec = Window.partitionBy("Department")
df.withColumn("MaxSalaryDept", max("Salary").over(window_spec)) \
  .select("Department", "MaxSalaryDept") \.distinct() \.show()

+----------+-------------+
|Department|MaxSalaryDept|
+----------+-------------+
|   Finance|         4200|
|        IT|         4700|
|     Sales|         3700|
+----------+-------------+



In [22]:
df.groupBy("Department") \
  .agg(max("Salary").alias("MaxSalaryDept")) \
  .show()

+----------+-------------+
|Department|MaxSalaryDept|
+----------+-------------+
|     Sales|         3700|
|   Finance|         4200|
|        IT|         4700|
+----------+-------------+



In [33]:
window_spec=Window.partitionBy("Department").orderBy("Salary").rowsBetween(Window.unboundedPreceding,0)
df.withColumn("CumulativeSalary", sum("Salary").over(window_spec)).show()

+-----+------+----------+------+----------------+
|EmpID|  Name|Department|Salary|CumulativeSalary|
+-----+------+----------+------+----------------+
|    4| Alice|   Finance|  3800|            3800|
|    2|  Jane|   Finance|  4000|            7800|
|    7| Jerry|   Finance|  4200|           12000|
|    5|   Bob|        IT|  4500|            4500|
|   10|Rachel|        IT|  4600|            9100|
|    8|   Sam|        IT|  4700|           13800|
|    1|  John|     Sales|  3000|            3000|
|    9| Steve|     Sales|  3100|            6100|
|    3|  Mike|     Sales|  3500|            9600|
|    6|   Tom|     Sales|  3700|           13300|
+-----+------+----------+------+----------------+



In [34]:
window_base=Window.partitionBy("Department").orderBy("Salary")
window_current=window_base.rowsBetween(-1,0)
df.withColumn("CumulativeSalary", sum("Salary").over(window_base)).show()

+-----+------+----------+------+----------------+
|EmpID|  Name|Department|Salary|CumulativeSalary|
+-----+------+----------+------+----------------+
|    4| Alice|   Finance|  3800|            3800|
|    2|  Jane|   Finance|  4000|            7800|
|    7| Jerry|   Finance|  4200|           12000|
|    5|   Bob|        IT|  4500|            4500|
|   10|Rachel|        IT|  4600|            9100|
|    8|   Sam|        IT|  4700|           13800|
|    1|  John|     Sales|  3000|            3000|
|    9| Steve|     Sales|  3100|            6100|
|    3|  Mike|     Sales|  3500|            9600|
|    6|   Tom|     Sales|  3700|           13300|
+-----+------+----------+------+----------------+



In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    (1, "Alice", 2000, ["math", "science"], {"city": "NYC", "zip": "10001"}),
    (2, "Bob", 1500, ["english"], {"city": "SF", "zip": "94105"}),
    (3, "Charlie", 2200, ["math", "history", "science"], {"city": "NYC", "zip": "10001"}),
    (4, "David", 1200, ["art"], {"city": "LA", "zip": "90001"}),
]

df = spark.createDataFrame(data, schema=["id", "name", "salary", "subjects", "address"])
df.show(truncate=False)

+---+-------+------+------------------------+---------------------------+
|id |name   |salary|subjects                |address                    |
+---+-------+------+------------------------+---------------------------+
|1  |Alice  |2000  |[math, science]         |{zip -> 10001, city -> NYC}|
|2  |Bob    |1500  |[english]               |{zip -> 94105, city -> SF} |
|3  |Charlie|2200  |[math, history, science]|{zip -> 10001, city -> NYC}|
|4  |David  |1200  |[art]                   |{zip -> 90001, city -> LA} |
+---+-------+------+------------------------+---------------------------+



In [37]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

@udf(IntegerType())
def subject_count(subjects):
    return len(subjects)

df.withColumn("subject_count", subject_count(col("subjects"))).show()

+---+-------+------+--------------------+--------------------+-------------+
| id|   name|salary|            subjects|             address|subject_count|
+---+-------+------+--------------------+--------------------+-------------+
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|            2|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|            1|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|            3|
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|            1|
+---+-------+------+--------------------+--------------------+-------------+



In [39]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType())
def multiply_by_two(s: pd.Series) -> pd.Series:
    return s * 2

df.withColumn("salary_doubled", multiply_by_two(col("salary"))).show()


+---+-------+------+--------------------+--------------------+--------------+
| id|   name|salary|            subjects|             address|salary_doubled|
+---+-------+------+--------------------+--------------------+--------------+
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|        4000.0|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|        3000.0|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|        4400.0|
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|        2400.0|
+---+-------+------+--------------------+--------------------+--------------+



In [40]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark Use Case Activities") \
    .getOrCreate()

from pyspark.sql.functions import col, rank, avg, udf, pandas_udf
from pyspark.sql.window import Window
import pandas as pd

employees_data = [
    (1, "Alice", "HR", 3000),
    (2, "Bob", "IT", 4000),
    (3, "Cathy", "HR", 3500),
    (4, "David", "IT", 4500),
    (5, "Eve", "Finance", 5000),
    (6, "Frank", "Finance", 4800),
]

employees_df = spark.createDataFrame(employees_data, ["id", "name", "department", "salary"])
employees_df.show()

departments_data = [
    ("HR", "New York"),
    ("IT", "San Francisco"),
    ("Finance", "Chicago"),
]

departments_df = spark.createDataFrame(departments_data, ["department", "location"])
departments_df.show()

+---+-----+----------+------+
| id| name|department|salary|
+---+-----+----------+------+
|  1|Alice|        HR|  3000|
|  2|  Bob|        IT|  4000|
|  3|Cathy|        HR|  3500|
|  4|David|        IT|  4500|
|  5|  Eve|   Finance|  5000|
|  6|Frank|   Finance|  4800|
+---+-----+----------+------+

+----------+-------------+
|department|     location|
+----------+-------------+
|        HR|     New York|
|        IT|San Francisco|
|   Finance|      Chicago|
+----------+-------------+



In [41]:
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

# Apply rank
ranked_df = employees_df.withColumn("rank", rank().over(window_spec))

# Show results
ranked_df.show()

+---+-----+----------+------+----+
| id| name|department|salary|rank|
+---+-----+----------+------+----+
|  5|  Eve|   Finance|  5000|   1|
|  6|Frank|   Finance|  4800|   2|
|  3|Cathy|        HR|  3500|   1|
|  1|Alice|        HR|  3000|   2|
|  4|David|        IT|  4500|   1|
|  2|  Bob|        IT|  4000|   2|
+---+-----+----------+------+----+



In [42]:
window_spec=Window.partitionBy("department")
avg_salary_df=employees_df.withColumn("avg_salary",avg("Salary").over(window_spec)).show()

+---+-----+----------+------+----------+
| id| name|department|salary|avg_salary|
+---+-----+----------+------+----------+
|  5|  Eve|   Finance|  5000|    4900.0|
|  6|Frank|   Finance|  4800|    4900.0|
|  1|Alice|        HR|  3000|    3250.0|
|  3|Cathy|        HR|  3500|    3250.0|
|  2|  Bob|        IT|  4000|    4250.0|
|  4|David|        IT|  4500|    4250.0|
+---+-----+----------+------+----------+



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rank, avg, udf, pandas_udf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, DoubleType
import pandas as pd

spark = SparkSession.builder.appName("PySpark Practice").getOrCreate()

# Employees Data
employees_data = [
    (1, "Alice", "HR", 3000),
    (2, "Bob", "IT", 4000),
    (3, "Cathy", "HR", 3500),
    (4, "David", "IT", 4500),
    (5, "Eve", "Finance", 5000),
    (6, "Frank", "Finance", 4800),
]
employees_df = spark.createDataFrame(employees_data, ["id", "name", "department", "salary"])

# Departments Data
departments_data = [
    ("HR", "New York"),
    ("IT", "San Francisco"),
    ("Finance", "Chicago"),
]
departments_df = spark.createDataFrame(departments_data, ["department", "location"])
employees_df.show()
departments_df.show()
### 1️⃣ Window Functions
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
employees_ranked = employees_df.withColumn("rank", rank().over(window_spec))
employees_avg = employees_ranked.withColumn("avg_salary", avg("salary").over(Window.partitionBy("department")))
employees_avg.show()

### 2️⃣ Joins
inner_join_df = employees_df.join(departments_df, "department", "inner")
left_join_df = employees_df.join(departments_df, "department", "left")
inner_join_df.show()
left_join_df.show()

### 3️⃣ DataFrame Operations
employees_df.filter(col("salary") > 4000).show()
employees_df.select("name", "department", "salary").show()
employees_df.orderBy(col("salary").desc()).show()
employees_df.withColumn("bonus_salary", col("salary") * 1.1).show()

### 4️⃣ Spark UDF for Categorization
def categorize_salary(salary):
    if salary > 4500:
        return "High"
    elif salary >= 3500:
        return "Medium"
    else:
        return "Low"

categorize_udf = udf(categorize_salary, StringType())
employees_df.withColumn("category", categorize_udf("salary")).show()

### 5️⃣ Pandas UDF for Normalization
@pandas_udf(DoubleType())
def normalize_salary(s: pd.Series) -> pd.Series:
    return (s - s.min()) / (s.max() - s.min())

employees_df.withColumn("normalized_salary", normalize_salary("salary")).show()


+---+-----+----------+------+
| id| name|department|salary|
+---+-----+----------+------+
|  1|Alice|        HR|  3000|
|  2|  Bob|        IT|  4000|
|  3|Cathy|        HR|  3500|
|  4|David|        IT|  4500|
|  5|  Eve|   Finance|  5000|
|  6|Frank|   Finance|  4800|
+---+-----+----------+------+

+----------+-------------+
|department|     location|
+----------+-------------+
|        HR|     New York|
|        IT|San Francisco|
|   Finance|      Chicago|
+----------+-------------+

+---+-----+----------+------+----+----------+
| id| name|department|salary|rank|avg_salary|
+---+-----+----------+------+----+----------+
|  5|  Eve|   Finance|  5000|   1|    4900.0|
|  6|Frank|   Finance|  4800|   2|    4900.0|
|  3|Cathy|        HR|  3500|   1|    3250.0|
|  1|Alice|        HR|  3000|   2|    3250.0|
|  4|David|        IT|  4500|   1|    4250.0|
|  2|  Bob|        IT|  4000|   2|    4250.0|
+---+-----+----------+------+----+----------+

+----------+---+-----+------+-------------+
|dep

## Complex Nested Schemas Handling


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    ("John", ["Python", "Java"]),
    ("Jane", ["SQL", "R", "Scala"]),
    ("Mike", [])
]
columns = ["Name", "Skills"]

df = spark.createDataFrame(data, columns)
df.show(truncate=False)

+----+---------------+
|Name|Skills         |
+----+---------------+
|John|[Python, Java] |
|Jane|[SQL, R, Scala]|
|Mike|[]             |
+----+---------------+



In [5]:
df_explode = df.withColumn("skill", explode(df.Skills))
df_explode.show()

+----+---------------+------+
|Name|         Skills| skill|
+----+---------------+------+
|John| [Python, Java]|Python|
|John| [Python, Java]|  Java|
|Jane|[SQL, R, Scala]|   SQL|
|Jane|[SQL, R, Scala]|     R|
|Jane|[SQL, R, Scala]| Scala|
+----+---------------+------+



In [6]:
df.createOrReplaceTempView("people")

In [7]:
df_lateral =spark.sql("""
    SELECT Name, skill
    FROM people
    LATERAL VIEW EXPLODE(Skills) AS skill
""")
df_lateral.show()


+----+------+
|Name| skill|
+----+------+
|John|Python|
|John|  Java|
|Jane|   SQL|
|Jane|     R|
|Jane| Scala|
+----+------+



## Pivot and Unpiot operations

In [8]:
data = [
    ("ProductA", "Jan", 100),
    ("ProductA", "Feb", 150),
    ("ProductA", "Mar", 120),
    ("ProductB", "Jan", 200),
    ("ProductB", "Feb", 230),
    ("ProductB", "Mar", 210),
]
columns = ["Product", "Month", "Sales"]

df = spark.createDataFrame(data, columns)
df.show()

+--------+-----+-----+
| Product|Month|Sales|
+--------+-----+-----+
|ProductA|  Jan|  100|
|ProductA|  Feb|  150|
|ProductA|  Mar|  120|
|ProductB|  Jan|  200|
|ProductB|  Feb|  230|
|ProductB|  Mar|  210|
+--------+-----+-----+



In [9]:
pivot_df = df.groupBy("Product").pivot("Month").sum("Sales")
pivot_df.show()

+--------+---+---+---+
| Product|Feb|Jan|Mar|
+--------+---+---+---+
|ProductB|230|200|210|
|ProductA|150|100|120|
+--------+---+---+---+



In [14]:
from pyspark.sql.functions import sum, expr

wide_df= df.groupBy("Product").pivot("Month").agg(sum("Sales"))
unpivot_df = wide_df.select("Product", expr("stack(3, 'Jan', Jan, 'Feb', Feb, 'Mar', Mar) as (Month, Sales)"))
unpivot_df.show()

+--------+-----+-----+
| Product|Month|Sales|
+--------+-----+-----+
|ProductB|  Jan|  200|
|ProductB|  Feb|  230|
|ProductB|  Mar|  210|
|ProductA|  Jan|  100|
|ProductA|  Feb|  150|
|ProductA|  Mar|  120|
+--------+-----+-----+

