<a href="https://colab.research.google.com/github/firojahmed1313/MlAITR/blob/main/pySpark/test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession

### Create a Spark Session

In [2]:
spark = SparkSession.builder.appName("Example").getOrCreate()

### select

In [3]:
data=[
    (1, "Alice", 30, "HR", 5000),
 (2, "Bob", 25, "IT", 6000),
 (3, "Charlie", 35, "Finance", 7000),
 (4, "David", 40, "HR", 8000),
 (5, "Eve",34, "Finance", 7000)
 ]
 # Define Schema
schema = "id INT, name STRING, age INT, department STRING, salary INT"
 # Create DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 30|        HR|  5000|
|  2|    Bob| 25|        IT|  6000|
|  3|Charlie| 35|   Finance|  7000|
|  4|  David| 40|        HR|  8000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



In [4]:
 df_selected = df.select("name", "age")
 df_selected.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 30|
|    Bob| 25|
|Charlie| 35|
|  David| 40|
|    Eve| 34|
+-------+---+



### alias()

In [5]:
from pyspark.sql.functions import col

In [6]:
df_alias = df.select(col("name").alias("employee_name"), col("salary").alias("monthly_salary"))
df_alias.show()

+-------------+--------------+
|employee_name|monthly_salary|
+-------------+--------------+
|        Alice|          5000|
|          Bob|          6000|
|      Charlie|          7000|
|        David|          8000|
|          Eve|          7000|
+-------------+--------------+



In [7]:
df_all = df.select("*")  # Selects all columns
df_all.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 30|        HR|  5000|
|  2|    Bob| 25|        IT|  6000|
|  3|Charlie| 35|   Finance|  7000|
|  4|  David| 40|        HR|  8000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



### filter()

In [8]:
df_filtered = df.filter(col("salary") > 6000)
df_filtered.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  3|Charlie| 35|   Finance|  7000|
|  4|  David| 40|        HR|  8000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



In [9]:
df_filtered = df.filter((col("age") > 30) & (col("salary") > 6000))
df_filtered.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  3|Charlie| 35|   Finance|  7000|
|  4|  David| 40|        HR|  8000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



In [10]:
df_filtered = df.filter((col("age") > 35) | (col("department") == "IT"))
df_filtered.show()

+---+-----+---+----------+------+
| id| name|age|department|salary|
+---+-----+---+----------+------+
|  2|  Bob| 25|        IT|  6000|
|  4|David| 40|        HR|  8000|
+---+-----+---+----------+------+



In [11]:
df_filtered = df.filter(~(col("department") == "HR"))
df_filtered.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  2|    Bob| 25|        IT|  6000|
|  3|Charlie| 35|   Finance|  7000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



In [12]:
df_filtered = df.filter(col("department").like("%Finance%"))
df_filtered.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  3|Charlie| 35|   Finance|  7000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



In [13]:
df_filtered = df.filter(col("department").isin("HR", "Finance"))
df_filtered.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 30|        HR|  5000|
|  3|Charlie| 35|   Finance|  7000|
|  4|  David| 40|        HR|  8000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



In [14]:
df_filtered = df.filter(col("name").startswith("A"))
df_filtered.show()

+---+-----+---+----------+------+
| id| name|age|department|salary|
+---+-----+---+----------+------+
|  1|Alice| 30|        HR|  5000|
+---+-----+---+----------+------+



In [15]:
df_filtered = df.filter(col("name").endswith("A"))
df_filtered.show()

+---+----+---+----------+------+
| id|name|age|department|salary|
+---+----+---+----------+------+
+---+----+---+----------+------+



In [16]:
df_filtered = df.filter(col("salary").isNotNull())
df_filtered.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 30|        HR|  5000|
|  2|    Bob| 25|        IT|  6000|
|  3|Charlie| 35|   Finance|  7000|
|  4|  David| 40|        HR|  8000|
|  5|    Eve| 34|   Finance|  7000|
+---+-------+---+----------+------+



### Type Casting

In [17]:
df_casted = df.select(
 col("id").cast("int"),
 col("age").cast("int"),
 col("salary").cast("double")
 )
df_casted.show()

+---+---+------+
| id|age|salary|
+---+---+------+
|  1| 30|5000.0|
|  2| 25|6000.0|
|  3| 35|7000.0|
|  4| 40|8000.0|
|  5| 34|7000.0|
+---+---+------+



In [18]:
from pyspark.sql.types import DecimalType
df_casted = df.withColumn("age", col("age").cast("int")) \
 .withColumn("salary", col("salary").cast(DecimalType(10, 2)))
df_casted.show()

+---+-------+---+----------+-------+
| id|   name|age|department| salary|
+---+-------+---+----------+-------+
|  1|  Alice| 30|        HR|5000.00|
|  2|    Bob| 25|        IT|6000.00|
|  3|Charlie| 35|   Finance|7000.00|
|  4|  David| 40|        HR|8000.00|
|  5|    Eve| 34|   Finance|7000.00|
+---+-------+---+----------+-------+



### WithColumn() – Add, Modify, or Drop Columns

In [19]:
# Add a new column 'salary' with a constant value
from pyspark.sql.functions import lit
df = df.withColumn("salary", lit(5000))
df.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 30|        HR|  5000|
|  2|    Bob| 25|        IT|  5000|
|  3|Charlie| 35|   Finance|  5000|
|  4|  David| 40|        HR|  5000|
|  5|    Eve| 34|   Finance|  5000|
+---+-------+---+----------+------+



In [20]:
 # Increase age by 5 years
df = df.withColumn("age", col("age") + 5)
df.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 35|        HR|  5000|
|  2|    Bob| 30|        IT|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
+---+-------+---+----------+------+



In [21]:
 from pyspark.sql.functions import when
 df_status = df.withColumn("status", when(col("salary") >= 6500, "Good").otherwise("Bad"))
 df_status.show()

+---+-------+---+----------+------+------+
| id|   name|age|department|salary|status|
+---+-------+---+----------+------+------+
|  1|  Alice| 35|        HR|  5000|   Bad|
|  2|    Bob| 30|        IT|  5000|   Bad|
|  3|Charlie| 40|   Finance|  5000|   Bad|
|  4|  David| 45|        HR|  5000|   Bad|
|  5|    Eve| 39|   Finance|  5000|   Bad|
+---+-------+---+----------+------+------+



### Drop

In [22]:
f = df.drop("age", "name")
f.show()

+---+----------+------+
| id|department|salary|
+---+----------+------+
|  1|        HR|  5000|
|  2|        IT|  5000|
|  3|   Finance|  5000|
|  4|        HR|  5000|
|  5|   Finance|  5000|
+---+----------+------+



In [23]:
columns_to_drop = ["age", "name"]
f = df.drop(*columns_to_drop)
f.show()

+---+----------+------+
| id|department|salary|
+---+----------+------+
|  1|        HR|  5000|
|  2|        IT|  5000|
|  3|   Finance|  5000|
|  4|        HR|  5000|
|  5|   Finance|  5000|
+---+----------+------+



In [24]:
df_cleaned = df.dropna()
df_cleaned.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 35|        HR|  5000|
|  2|    Bob| 30|        IT|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
+---+-------+---+----------+------+



In [25]:
df_cleaned = df.dropna(subset=["age"])
df_cleaned.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 35|        HR|  5000|
|  2|    Bob| 30|        IT|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
+---+-------+---+----------+------+



### dropDuplicates

In [26]:
df_unique = df.dropDuplicates()
df_unique.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 35|        HR|  5000|
|  2|    Bob| 30|        IT|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  5|    Eve| 39|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
+---+-------+---+----------+------+



In [27]:
df_unique = df.dropDuplicates(["name", "age"])
df_unique.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 35|        HR|  5000|
|  2|    Bob| 30|        IT|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
+---+-------+---+----------+------+



### orderBy

In [28]:
df_sorted = df.orderBy("age")
df_sorted.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  2|    Bob| 30|        IT|  5000|
|  1|  Alice| 35|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
+---+-------+---+----------+------+



In [29]:
df_sorted = df.orderBy(col("age").desc())
df_sorted.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  4|  David| 45|        HR|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  5|    Eve| 39|   Finance|  5000|
|  1|  Alice| 35|        HR|  5000|
|  2|    Bob| 30|        IT|  5000|
+---+-------+---+----------+------+



In [30]:
df_sorted = df.orderBy(col("age").asc(), col("name").desc())
df_sorted.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  2|    Bob| 30|        IT|  5000|
|  1|  Alice| 35|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
+---+-------+---+----------+------+



In [31]:
df_sorted = df.sort(col("age").asc(), col("name").desc())
df_sorted.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  2|    Bob| 30|        IT|  5000|
|  1|  Alice| 35|        HR|  5000|
|  5|    Eve| 39|   Finance|  5000|
|  3|Charlie| 40|   Finance|  5000|
|  4|  David| 45|        HR|  5000|
+---+-------+---+----------+------+



### limit

In [32]:
df_limited = df.limit(2)
df_limited.show()

+---+-----+---+----------+------+
| id| name|age|department|salary|
+---+-----+---+----------+------+
|  1|Alice| 35|        HR|  5000|
|  2|  Bob| 30|        IT|  5000|
+---+-----+---+----------+------+



In [33]:
df_top = df.orderBy(col("age").desc()).limit(2)
df_top.show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  4|  David| 45|        HR|  5000|
|  3|Charlie| 40|   Finance|  5000|
+---+-------+---+----------+------+



### union

In [34]:
data1 = [(1, "Alice", 30), (2, "Bob", 25)]
data2 = [(3, "Charlie", 35), (4, "David", 40)]
schema = ["id", "name", "age"]
 # Create DataFrames
df1 = spark.createDataFrame(data1, schema=schema)
df2 = spark.createDataFrame(data2, schema=schema)
 # Union
df_union = df1.union(df2)
df_union.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 25|
|  3|Charlie| 35|
|  4|  David| 40|
+---+-------+---+



In [35]:
data3 = [(5, 50, "Eve"), (6, 45, "Frank")]
schema_different = ["id", "age", "name"]  # Age and Name are swapped
 # Create DataFrame
df3 = spark.createDataFrame(data3, schema=schema_different)
 # Union by Name
df_union_by_name = df1.unionByName(df3,allowMissingColumns=True)
df_union_by_name.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 30|
|  2|  Bob| 25|
|  5|  Eve| 50|
|  6|Frank| 45|
+---+-----+---+



In [36]:
from pyspark.sql.functions import count

### Agg

In [37]:
df_grouped = df.groupBy("department").agg(count("*").alias("employee_count"))
df_grouped.show()

+----------+--------------+
|department|employee_count|
+----------+--------------+
|        HR|             2|
|        IT|             1|
|   Finance|             2|
+----------+--------------+



In [38]:
from pyspark.sql.functions import sum, avg, max, min
df_grouped = df.groupBy("department").agg(
 count("*").alias("employee_count"),
 sum("salary").alias("total_salary"),
 avg("salary").alias("avg_salary"),
 max("salary").alias("max_salary"),
 min("salary").alias("min_salary")
 )
df_grouped.show()

+----------+--------------+------------+----------+----------+----------+
|department|employee_count|total_salary|avg_salary|max_salary|min_salary|
+----------+--------------+------------+----------+----------+----------+
|        HR|             2|       10000|    5000.0|      5000|      5000|
|        IT|             1|        5000|    5000.0|      5000|      5000|
|   Finance|             2|       10000|    5000.0|      5000|      5000|
+----------+--------------+------------+----------+----------+----------+



In [39]:
df_grouped = df.groupBy("department", "salary").count()
df_grouped.show()

+----------+------+-----+
|department|salary|count|
+----------+------+-----+
|        IT|  5000|    1|
|        HR|  5000|    2|
|   Finance|  5000|    2|
+----------+------+-----+



### Split

In [40]:
data2 = [("John Doe",), ("Alice Johnson",), ("Bob Smith",)]
schema = ["full_name"]
 # Create DataFrame
df = spark.createDataFrame(data2, schema=schema)
df.show()

+-------------+
|    full_name|
+-------------+
|     John Doe|
|Alice Johnson|
|    Bob Smith|
+-------------+



In [41]:
from pyspark.sql.functions import split
df_split = df.withColumn("name_parts", split(col("full_name"), " "))
df_split.show(truncate=False)

+-------------+----------------+
|full_name    |name_parts      |
+-------------+----------------+
|John Doe     |[John, Doe]     |
|Alice Johnson|[Alice, Johnson]|
|Bob Smith    |[Bob, Smith]    |
+-------------+----------------+



In [42]:
df_split = df_split.withColumn("first_name", col("name_parts")[0])
df_split.show(truncate=False)

+-------------+----------------+----------+
|full_name    |name_parts      |first_name|
+-------------+----------------+----------+
|John Doe     |[John, Doe]     |John      |
|Alice Johnson|[Alice, Johnson]|Alice     |
|Bob Smith    |[Bob, Smith]    |Bob       |
+-------------+----------------+----------+



### explode

In [43]:
from pyspark.sql.functions import explode
df_exploded = df_split.withColumn("word", explode(df_split.name_parts)).drop("name_parts")

In [44]:
data3 = [
 (1, "John Doe", ["Python", "Java", "SQL"]),
 (2, "Alice Johnson", ["Scala", "C++"]),
 (3, "Bob Smith", ["JavaScript", "Python", "Go"])
 ]
schema = ["id", "name", "skills"]
 # Create DataFrame
df = spark.createDataFrame(data3, schema=schema)
df.show(truncate=False)

+---+-------------+------------------------+
|id |name         |skills                  |
+---+-------------+------------------------+
|1  |John Doe     |[Python, Java, SQL]     |
|2  |Alice Johnson|[Scala, C++]            |
|3  |Bob Smith    |[JavaScript, Python, Go]|
+---+-------------+------------------------+



In [45]:
df_exploded = df.withColumn("skill", explode(df.skills)).drop("skills")
df_exploded.show()

+---+-------------+----------+
| id|         name|     skill|
+---+-------------+----------+
|  1|     John Doe|    Python|
|  1|     John Doe|      Java|
|  1|     John Doe|       SQL|
|  2|Alice Johnson|     Scala|
|  2|Alice Johnson|       C++|
|  3|    Bob Smith|JavaScript|
|  3|    Bob Smith|    Python|
|  3|    Bob Smith|        Go|
+---+-------------+----------+



### array_contains

In [46]:

from pyspark.sql.functions import array_contains
df_contains = df.withColumn("has_python", array_contains(col("skills"), "Python"))
df_contains.show()

+---+-------------+--------------------+----------+
| id|         name|              skills|has_python|
+---+-------------+--------------------+----------+
|  1|     John Doe| [Python, Java, SQL]|      true|
|  2|Alice Johnson|        [Scala, C++]|     false|
|  3|    Bob Smith|[JavaScript, Pyth...|      true|
+---+-------------+--------------------+----------+



In [47]:
df.select("name", array_contains(col("skills"), "Java").alias("knows_java")).show()

+-------------+----------+
|         name|knows_java|
+-------------+----------+
|     John Doe|      true|
|Alice Johnson|     false|
|    Bob Smith|     false|
+-------------+----------+



In [48]:

from pyspark.sql.functions import expr
df_multiple = df.withColumn("has_python_or_scala", expr("array_contains(skills, 'Python') OR array_contains(skills, 'Scala')"))
df_multiple.show()

+---+-------------+--------------------+-------------------+
| id|         name|              skills|has_python_or_scala|
+---+-------------+--------------------+-------------------+
|  1|     John Doe| [Python, Java, SQL]|               true|
|  2|Alice Johnson|        [Scala, C++]|               true|
|  3|    Bob Smith|[JavaScript, Pyth...|               true|
+---+-------------+--------------------+-------------------+



### collect_list

In [49]:
from pyspark.sql.functions import collect_list

In [50]:
data = [
    (1, "Alice", "Python"),
    (1, "Alice", "Java"),
    (2, "Bob", "Scala"),
    (2, "Bob", "Python"),
    (2, "Bob", "Go"),
    (3, "Charlie", "JavaScript"),
    (3, "Charlie", "C++")
]
schema = ["id", "name", "skill"]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

df.show()

+---+-------+----------+
| id|   name|     skill|
+---+-------+----------+
|  1|  Alice|    Python|
|  1|  Alice|      Java|
|  2|    Bob|     Scala|
|  2|    Bob|    Python|
|  2|    Bob|        Go|
|  3|Charlie|JavaScript|
|  3|Charlie|       C++|
+---+-------+----------+



In [51]:
 df_grouped = df.groupBy("id", "name").agg(collect_list("skill").alias("skills"))
 df_grouped.show(truncate=False)

+---+-------+-------------------+
|id |name   |skills             |
+---+-------+-------------------+
|2  |Bob    |[Scala, Python, Go]|
|1  |Alice  |[Python, Java]     |
|3  |Charlie|[JavaScript, C++]  |
+---+-------+-------------------+



In [52]:
data = [
    (1, "Alice", "Math", 85),
    (1, "Alice", "Science", 90),
    (1, "Alice", "English", 78),
    (2, "Bob", "Math", 92),
    (2, "Bob", "Science", 88),
    (2, "Bob", "English", 81),
    (3, "Charlie", "Math", 75),
    (3, "Charlie", "Science", 95),
    (3, "Charlie", "English", 85)
]
schema = ["id", "name", "subject", "marks"]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

df.show()

+---+-------+-------+-----+
| id|   name|subject|marks|
+---+-------+-------+-----+
|  1|  Alice|   Math|   85|
|  1|  Alice|Science|   90|
|  1|  Alice|English|   78|
|  2|    Bob|   Math|   92|
|  2|    Bob|Science|   88|
|  2|    Bob|English|   81|
|  3|Charlie|   Math|   75|
|  3|Charlie|Science|   95|
|  3|Charlie|English|   85|
+---+-------+-------+-----+



### pivot

In [53]:
df_pivot = df.groupBy("id", "name").pivot("subject").sum("marks")
df_pivot.show()

+---+-------+-------+----+-------+
| id|   name|English|Math|Science|
+---+-------+-------+----+-------+
|  3|Charlie|     85|  75|     95|
|  2|    Bob|     81|  92|     88|
|  1|  Alice|     78|  85|     90|
+---+-------+-------+----+-------+



### Join

In [54]:
data1 = [(1, "Alice", 85), (2, "Bob", 78), (3, "Charlie", 92), (4, "David", 65)]
columns1 = ["id", "name", "marks"]
df1 = spark.createDataFrame(data1, columns1)

# Right DataFrame (Courses)
data2 = [(1, "Math"), (2, "Science"), (3, "English"), (5, "History")]
columns2 = ["id", "course"]
df2 = spark.createDataFrame(data2, columns2)

# Show DataFrames
df1.show()
df2.show()

+---+-------+-----+
| id|   name|marks|
+---+-------+-----+
|  1|  Alice|   85|
|  2|    Bob|   78|
|  3|Charlie|   92|
|  4|  David|   65|
+---+-------+-----+

+---+-------+
| id| course|
+---+-------+
|  1|   Math|
|  2|Science|
|  3|English|
|  5|History|
+---+-------+



In [55]:
df_inner = df1.join(df2, on="id", how="inner")
df_inner.show()

+---+-------+-----+-------+
| id|   name|marks| course|
+---+-------+-----+-------+
|  1|  Alice|   85|   Math|
|  2|    Bob|   78|Science|
|  3|Charlie|   92|English|
+---+-------+-----+-------+



In [56]:
df_left = df1.join(df2, on="id", how="left")
df_left.show()

+---+-------+-----+-------+
| id|   name|marks| course|
+---+-------+-----+-------+
|  1|  Alice|   85|   Math|
|  2|    Bob|   78|Science|
|  3|Charlie|   92|English|
|  4|  David|   65|   NULL|
+---+-------+-----+-------+



In [57]:
df_right = df1.join(df2, on="id", how="right")
df_right.show()

+---+-------+-----+-------+
| id|   name|marks| course|
+---+-------+-----+-------+
|  1|  Alice|   85|   Math|
|  2|    Bob|   78|Science|
|  5|   NULL| NULL|History|
|  3|Charlie|   92|English|
+---+-------+-----+-------+



In [58]:
df_full = df1.join(df2, on="id", how="full")
df_full.show()

+---+-------+-----+-------+
| id|   name|marks| course|
+---+-------+-----+-------+
|  1|  Alice|   85|   Math|
|  2|    Bob|   78|Science|
|  3|Charlie|   92|English|
|  4|  David|   65|   NULL|
|  5|   NULL| NULL|History|
+---+-------+-----+-------+



In [59]:
df_anti = df1.join(df2, on="id", how="left_anti")
df_anti.show()

+---+-----+-----+
| id| name|marks|
+---+-----+-----+
|  4|David|   65|
+---+-----+-----+



### String

In [60]:
data = [
    (1, "  Alice  ", "PYSPARK,SQL"),
    (2, "BOB", "Java,Python"),
    (3, "Charlie", " ML,AI "),
    (4, None, "Spark, Hadoop")
]
schema = ["id", "name", "skills"]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()

+---+---------+-------------+
| id|     name|       skills|
+---+---------+-------------+
|  1|  Alice  |  PYSPARK,SQL|
|  2|      BOB|  Java,Python|
|  3|  Charlie|       ML,AI |
|  4|     NULL|Spark, Hadoop|
+---+---------+-------------+



In [61]:
from pyspark.sql.functions import upper, lower
df_transformed = df.withColumn("name_upper", upper(col("name")))\
 .withColumn("name_lower", lower(col("name")))
df_transformed.show()

+---+---------+-------------+----------+----------+
| id|     name|       skills|name_upper|name_lower|
+---+---------+-------------+----------+----------+
|  1|  Alice  |  PYSPARK,SQL|   ALICE  |   alice  |
|  2|      BOB|  Java,Python|       BOB|       bob|
|  3|  Charlie|       ML,AI |   CHARLIE|   charlie|
|  4|     NULL|Spark, Hadoop|      NULL|      NULL|
+---+---------+-------------+----------+----------+



In [62]:
from pyspark.sql.functions import trim, ltrim, rtrim
df_trimmed = df.withColumn("name_trim", trim(col("name")))\
 .withColumn("name_ltrim", ltrim(col("name")))\
 .withColumn("name_rtrim", rtrim(col("name")))
df_trimmed.show()

+---+---------+-------------+---------+----------+----------+
| id|     name|       skills|name_trim|name_ltrim|name_rtrim|
+---+---------+-------------+---------+----------+----------+
|  1|  Alice  |  PYSPARK,SQL|    Alice|   Alice  |     Alice|
|  2|      BOB|  Java,Python|      BOB|       BOB|       BOB|
|  3|  Charlie|       ML,AI |  Charlie|   Charlie|   Charlie|
|  4|     NULL|Spark, Hadoop|     NULL|      NULL|      NULL|
+---+---------+-------------+---------+----------+----------+



In [63]:
 from pyspark.sql.functions import length
 df_length = df.withColumn("name_length", length(col("name")))
 df_length.show()

+---+---------+-------------+-----------+
| id|     name|       skills|name_length|
+---+---------+-------------+-----------+
|  1|  Alice  |  PYSPARK,SQL|          9|
|  2|      BOB|  Java,Python|          3|
|  3|  Charlie|       ML,AI |          7|
|  4|     NULL|Spark, Hadoop|       NULL|
+---+---------+-------------+-----------+



In [64]:
from pyspark.sql.functions import split
df_split = df.withColumn("skills_array", split(col("skills"), ","))
df_split.show(truncate=False)

+---+---------+-------------+----------------+
|id |name     |skills       |skills_array    |
+---+---------+-------------+----------------+
|1  |  Alice  |PYSPARK,SQL  |[PYSPARK, SQL]  |
|2  |BOB      |Java,Python  |[Java, Python]  |
|3  |Charlie  | ML,AI       |[ ML, AI ]      |
|4  |NULL     |Spark, Hadoop|[Spark,  Hadoop]|
+---+---------+-------------+----------------+



In [65]:
from pyspark.sql.functions import regexp_replace
df_replace = df.withColumn("clean_skills", regexp_replace(col("skills"), ",", "/"))
df_replace.show(truncate=False)

+---+---------+-------------+-------------+
|id |name     |skills       |clean_skills |
+---+---------+-------------+-------------+
|1  |  Alice  |PYSPARK,SQL  |PYSPARK/SQL  |
|2  |BOB      |Java,Python  |Java/Python  |
|3  |Charlie  | ML,AI       | ML/AI       |
|4  |NULL     |Spark, Hadoop|Spark/ Hadoop|
+---+---------+-------------+-------------+



### Date

In [66]:
data = [
    (1, "2024-03-10", "2025-07-15 14:30:00"),
    (2, "2023-05-20", "2024-11-25 09:15:45"),
    (3, "2022-12-01", "2023-06-10 18:45:30")
]
schema = ["id", "date_str", "timestamp_str"]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()

+---+----------+-------------------+
| id|  date_str|      timestamp_str|
+---+----------+-------------------+
|  1|2024-03-10|2025-07-15 14:30:00|
|  2|2023-05-20|2024-11-25 09:15:45|
|  3|2022-12-01|2023-06-10 18:45:30|
+---+----------+-------------------+



In [67]:
from pyspark.sql.functions import to_date, to_timestamp
df_converted = df.withColumn("date", to_date(col("date_str"), "yyyy-MM-dd"))\
 .withColumn("timestamp", to_timestamp(col("timestamp_str"), "yyyy-MM-dd HH:mm:ss"))
df_converted.show()

+---+----------+-------------------+----------+-------------------+
| id|  date_str|      timestamp_str|      date|          timestamp|
+---+----------+-------------------+----------+-------------------+
|  1|2024-03-10|2025-07-15 14:30:00|2024-03-10|2025-07-15 14:30:00|
|  2|2023-05-20|2024-11-25 09:15:45|2023-05-20|2024-11-25 09:15:45|
|  3|2022-12-01|2023-06-10 18:45:30|2022-12-01|2023-06-10 18:45:30|
+---+----------+-------------------+----------+-------------------+



In [68]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second
df_extracted = df_converted.withColumn("year", year(col("date")))\
 .withColumn("month", month(col("date")))\
 .withColumn("day", dayofmonth(col("date")))\
 .withColumn("hour", hour(col("timestamp")))\
 .withColumn("minute", minute(col("timestamp")))
df_extracted.show()

+---+----------+-------------------+----------+-------------------+----+-----+---+----+------+
| id|  date_str|      timestamp_str|      date|          timestamp|year|month|day|hour|minute|
+---+----------+-------------------+----------+-------------------+----+-----+---+----+------+
|  1|2024-03-10|2025-07-15 14:30:00|2024-03-10|2025-07-15 14:30:00|2024|    3| 10|  14|    30|
|  2|2023-05-20|2024-11-25 09:15:45|2023-05-20|2024-11-25 09:15:45|2023|    5| 20|   9|    15|
|  3|2022-12-01|2023-06-10 18:45:30|2022-12-01|2023-06-10 18:45:30|2022|   12|  1|  18|    45|
+---+----------+-------------------+----------+-------------------+----+-----+---+----+------+



In [69]:
from pyspark.sql.functions import datediff, date_add, date_sub
df_dates = df_converted.withColumn("date_diff", datediff(col("timestamp"), col("date")))\
 .withColumn("plus_10_days", date_add(col("date"), 10))\
 .withColumn("minus_10_days", date_sub(col("date"), 10))
df_dates.show()

+---+----------+-------------------+----------+-------------------+---------+------------+-------------+
| id|  date_str|      timestamp_str|      date|          timestamp|date_diff|plus_10_days|minus_10_days|
+---+----------+-------------------+----------+-------------------+---------+------------+-------------+
|  1|2024-03-10|2025-07-15 14:30:00|2024-03-10|2025-07-15 14:30:00|      492|  2024-03-20|   2024-02-29|
|  2|2023-05-20|2024-11-25 09:15:45|2023-05-20|2024-11-25 09:15:45|      555|  2023-05-30|   2023-05-10|
|  3|2022-12-01|2023-06-10 18:45:30|2022-12-01|2023-06-10 18:45:30|      191|  2022-12-11|   2022-11-21|
+---+----------+-------------------+----------+-------------------+---------+------------+-------------+



### Handel Null Value

In [70]:
data = [
    (1, "Alice", 25, None),
    (2, None, 30, 50000),
    (3, "Charlie", None, 60000),
    (4, "David", 35, None)
]
schema = ["id", "name", "age", "salary"]

df = spark.createDataFrame(data, schema=schema)
df.show()

+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  1|  Alice|  25|  NULL|
|  2|   NULL|  30| 50000|
|  3|Charlie|NULL| 60000|
|  4|  David|  35|  NULL|
+---+-------+----+------+



In [71]:
df_drop = df.dropna()
df_drop.show()

+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
+---+----+---+------+



In [72]:
df_filled = df.fillna({"age": 0, "salary": 40000})
df_filled.show()

+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|  Alice| 25| 40000|
|  2|   NULL| 30| 50000|
|  3|Charlie|  0| 60000|
|  4|  David| 35| 40000|
+---+-------+---+------+



In [73]:
 df_null_salary = df.filter(col("salary").isNull())
 df_null_salary.show()

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  1|Alice| 25|  NULL|
|  4|David| 35|  NULL|
+---+-----+---+------+



In [74]:
 df_not_null_salary = df.filter(col("salary").isNotNull())
 df_not_null_salary.show()

+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  2|   NULL|  30| 50000|
|  3|Charlie|NULL| 60000|
+---+-------+----+------+



In [75]:
# df_replace = df.na.replace(None, "Unknown", subset=["name"])
# df_replace.show()


In [76]:
from pyspark.sql.functions import coalesce
df_coalesce = df.withColumn("salary", coalesce(col("salary"), lit(40000)))
df_coalesce.show()

+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  1|  Alice|  25| 40000|
|  2|   NULL|  30| 50000|
|  3|Charlie|NULL| 60000|
|  4|  David|  35| 40000|
+---+-------+----+------+



### Window Function

In [77]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, lag, lead, sum, avg

In [78]:


data = [
    (1, "Alice", "Sales", 5000),
    (2, "Bob", "Sales", 6000),
    (3, "Charlie", "Sales", 7000),
    (4, "David", "HR", 4000),
    (5, "Eve", "HR", 4500),
    (6, "Frank", "HR", 4800),
    (7, "Grace", "IT", 8000),
    (8, "Hank", "IT", 9000),
]

schema = ["id", "name", "department", "salary"]
df = spark.createDataFrame(data, schema=schema)

df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|     Sales|  5000|
|  2|    Bob|     Sales|  6000|
|  3|Charlie|     Sales|  7000|
|  4|  David|        HR|  4000|
|  5|    Eve|        HR|  4500|
|  6|  Frank|        HR|  4800|
|  7|  Grace|        IT|  8000|
|  8|   Hank|        IT|  9000|
+---+-------+----------+------+



In [79]:
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
df_row_number = df.withColumn("row_number", row_number().over(window_spec))
df_row_number.show()

+---+-------+----------+------+----------+
| id|   name|department|salary|row_number|
+---+-------+----------+------+----------+
|  6|  Frank|        HR|  4800|         1|
|  5|    Eve|        HR|  4500|         2|
|  4|  David|        HR|  4000|         3|
|  8|   Hank|        IT|  9000|         1|
|  7|  Grace|        IT|  8000|         2|
|  3|Charlie|     Sales|  7000|         1|
|  2|    Bob|     Sales|  6000|         2|
|  1|  Alice|     Sales|  5000|         3|
+---+-------+----------+------+----------+



In [80]:
 df_rank = df.withColumn("rank", rank().over(window_spec))
 df_rank.show()

+---+-------+----------+------+----+
| id|   name|department|salary|rank|
+---+-------+----------+------+----+
|  6|  Frank|        HR|  4800|   1|
|  5|    Eve|        HR|  4500|   2|
|  4|  David|        HR|  4000|   3|
|  8|   Hank|        IT|  9000|   1|
|  7|  Grace|        IT|  8000|   2|
|  3|Charlie|     Sales|  7000|   1|
|  2|    Bob|     Sales|  6000|   2|
|  1|  Alice|     Sales|  5000|   3|
+---+-------+----------+------+----+



In [81]:
df_dense_rank = df.withColumn("dense_rank", dense_rank().over(window_spec))
df_dense_rank.show()

+---+-------+----------+------+----------+
| id|   name|department|salary|dense_rank|
+---+-------+----------+------+----------+
|  6|  Frank|        HR|  4800|         1|
|  5|    Eve|        HR|  4500|         2|
|  4|  David|        HR|  4000|         3|
|  8|   Hank|        IT|  9000|         1|
|  7|  Grace|        IT|  8000|         2|
|  3|Charlie|     Sales|  7000|         1|
|  2|    Bob|     Sales|  6000|         2|
|  1|  Alice|     Sales|  5000|         3|
+---+-------+----------+------+----------+



In [82]:
df_sum = df.withColumn("total_salary", sum("salary").over(window_spec))
df_sum.show()

+---+-------+----------+------+------------+
| id|   name|department|salary|total_salary|
+---+-------+----------+------+------------+
|  6|  Frank|        HR|  4800|        4800|
|  5|    Eve|        HR|  4500|        9300|
|  4|  David|        HR|  4000|       13300|
|  8|   Hank|        IT|  9000|        9000|
|  7|  Grace|        IT|  8000|       17000|
|  3|Charlie|     Sales|  7000|        7000|
|  2|    Bob|     Sales|  6000|       13000|
|  1|  Alice|     Sales|  5000|       18000|
+---+-------+----------+------+------------+



In [83]:
df_lag = df.withColumn("prev_salary", lag("salary", 1).over(window_spec))
df_lag.show()

+---+-------+----------+------+-----------+
| id|   name|department|salary|prev_salary|
+---+-------+----------+------+-----------+
|  6|  Frank|        HR|  4800|       NULL|
|  5|    Eve|        HR|  4500|       4800|
|  4|  David|        HR|  4000|       4500|
|  8|   Hank|        IT|  9000|       NULL|
|  7|  Grace|        IT|  8000|       9000|
|  3|Charlie|     Sales|  7000|       NULL|
|  2|    Bob|     Sales|  6000|       7000|
|  1|  Alice|     Sales|  5000|       6000|
+---+-------+----------+------+-----------+



### udf

In [84]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, FloatType

In [85]:
def to_upper(name):
    return name.upper()
# Register UDF
to_upper_udf = udf(to_upper, StringType())
# Apply UDF
df_upper = df.withColumn("name_upper", to_upper_udf(df["name"]))
df_upper.show()

+---+-------+----------+------+----------+
| id|   name|department|salary|name_upper|
+---+-------+----------+------+----------+
|  1|  Alice|     Sales|  5000|     ALICE|
|  2|    Bob|     Sales|  6000|       BOB|
|  3|Charlie|     Sales|  7000|   CHARLIE|
|  4|  David|        HR|  4000|     DAVID|
|  5|    Eve|        HR|  4500|       EVE|
|  6|  Frank|        HR|  4800|     FRANK|
|  7|  Grace|        IT|  8000|     GRACE|
|  8|   Hank|        IT|  9000|      HANK|
+---+-------+----------+------+----------+



### Data Write

In [89]:
df.write.csv("output/csv_folder/roj.csv", header=True, mode="overwrite")

In [93]:
df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("output/csv_folder/roj.csv")

df_csv.show(5)
df_csv.printSchema()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|     Sales|  5000|
|  2|    Bob|     Sales|  6000|
|  3|Charlie|     Sales|  7000|
|  4|  David|        HR|  4000|
|  5|    Eve|        HR|  4500|
+---+-------+----------+------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)

