In [None]:
#Fundamental data structure of Spark. It's an immutable, fault-tolerant, distributed collection of objects.
# Cons RDD - No Built-in Optimization,Performance (Often Slower), No Schema
# Pros of Dataframe
  # DataFrames leverage Spark's Catalyst Optimizer (for logical and physical plan optimization)
       # and Tungsten execution engine (for efficient code generation and memory management).
  # Schema Awareness
  # High-Level API
  # SQL Interoperability
  # Memory Efficiency: Tungsten optimizes memory usage by using off-heap memory and columnar storage, reducing garbage collection overhead.
  # Support for Diverse Data Sources: Integrates seamlessly with a wide range of structured
       # and semi-structured data sources (Parquet, ORC, JSON, CSV, JDBC, Hive tables, etc.).

In [None]:
# Commonality (What RDDs and DataFrames Share)
  # 1. Distributed & Immutable: Both are distributed collections of data, meaning they are partitioned across a cluster and processed in parallel.
          # Once created, neither can be changed; transformations always result in a new RDD or DataFrame.
  # 2. Fault Tolerance: Both are fault-tolerant due to their lineage graph. If a partition is lost, Spark can recompute it from its ancestors.
  # 3. Lazy Evaluation: Operations on both RDDs and DataFrames are lazy. They are not executed immediately
          # but rather build a logical plan (DAG) that is executed only when an action is called (e.g., show(), count(), collect(), write()).
  # 4. In-Memory Processing: Both can leverage in-memory caching to significantly speed up iterative algorithms and repeated data access.
  # 5. Underlying Foundation: DataFrames are essentially built on top of RDDs. A DataFrame is internally represented as an RDD of Row objects.
          # You can convert a DataFrame to an RDD using .rdd and an RDD to a DataFrame using .toDF().
  # 6. Language Support: Both support APIs in Scala, Java, Python, and R.

# When to Use Which (General Guidance):
  # Use DataFrames (preferred for most cases):
      # 1. When working with structured or semi-structured data (most common scenario).
      # 2. When you need high performance and automatic optimization.
      # 3. When you prefer a SQL-like or tabular API.
      # 4. When you need to integrate with Spark SQL or other Spark libraries (MLlib, GraphX, Structured Streaming),
          # which largely work with DataFrames.
  # Use RDDs (for specific niche cases):
      # 1. When working with truly unstructured data where defining a schema is impossible or impractical.
      # 2. When you need very low-level transformations and fine-grained control that the DataFrame API doesn't directly offer
          # (e.g., highly custom serialization, specific memory layouts). This is becoming increasingly rare as
          # DataFrames and Datasets gain more features.
      # 3. When interacting with legacy Spark codebases that predominantly use RDDs.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType, ArrayType
from pyspark.sql.functions import col,lit,expr,when,avg,sum,count,countDistinct,count, \
  approx_count_distinct,min,max,lower,upper,trim,initcap,concat_ws,substring,length,like,substring,length, \
  regexp_extract,regexp_replace,current_date,date_add,date_sub,datediff,month,year,dayofmonth,to_date,to_timestamp, \
  isnull,isnotnull,isnan,explode,array_contains,size, ceil, floor,sqrt,sqrt,round, row_number, rank, dense_rank, ntile, lead,lag, \
  format_string,format_number,udf,broadcast
from pyspark.sql.window import Window
import os
import shutil

In [None]:
spark = SparkSession.builder.appName("ComprehensivePySparkExample"). getOrCreate()

In [None]:
#Manual Creating the column name and value and creating the dataframe
manual_schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("Score", DoubleType(), True)
])
manual_data = [
    ("Mike", 28, "LA", 85.5),
    ("Sarah", 35, "SF", 92.0),
    ("John", 22, "LA", 78.0)
]
df_manual_schema = spark.createDataFrame(manual_data, schema=manual_schema)
print("\n--- DataFrame created with Manual Schema (df_manual_schema) ---")
df_manual_schema.printSchema()
df_manual_schema.show()


--- DataFrame created with Manual Schema (df_manual_schema) ---
root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Score: double (nullable = true)

+-----+---+----+-----+
| Name|Age|City|Score|
+-----+---+----+-----+
| Mike| 28|  LA| 85.5|
|Sarah| 35|  SF| 92.0|
| John| 22|  LA| 78.0|
+-----+---+----+-----+



In [None]:
df_manual_schema.describe("Age", "Score").show()
df_manual_schema.select("Age", "Score").show()

+-------+------------------+-----------------+
|summary|               Age|            Score|
+-------+------------------+-----------------+
|  count|                 3|                3|
|   mean|28.333333333333332|85.16666666666667|
| stddev| 6.506407098647712|7.005949852327901|
|    min|                22|             78.0|
|    max|                35|             92.0|
+-------+------------------+-----------------+

+---+-----+
|Age|Score|
+---+-----+
| 28| 85.5|
| 35| 92.0|
| 22| 78.0|
+---+-----+



In [None]:
#Manual Creating the CSV file and writing it and creating the dataframe out of it

csv_file_path = "temp_data.csv"
csv_data = """id,product,price,quantity
1,Laptop,1200.00,5
2,Mouse,25.50,20
3,Keyboard,75.00,10
4,Monitor,300.00,3
5,Webcam,50.00,8
"""
with open(csv_file_path, "w") as f:
    f.write(csv_data)

products_df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
products_df.printSchema()
products_df.show()

root
 |-- id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

+---+--------+------+--------+
| id| product| price|quantity|
+---+--------+------+--------+
|  1|  Laptop|1200.0|       5|
|  2|   Mouse|  25.5|      20|
|  3|Keyboard|  75.0|      10|
|  4| Monitor| 300.0|       3|
|  5|  Webcam|  50.0|       8|
+---+--------+------+--------+



In [None]:
data_rows = [
    ("Alice", 30, "NY", "2020-01-15", 100.50, ["apple", "banana"]),
    ("Bob", 45, "CA", "2019-03-20", 250.75, ["orange", "grape"]),
    ("Charlie", 25, "TX", "2021-07-01", 75.20, ["apple", "kiwi"]),
    ("David", 50, "NY", "2018-11-10", 300.00, ["banana"]),
    ("Eve", None, "CA", "2022-02-28", 120.00, ["grape", "melon"]),
    ("Frank", 35, None, "2020-05-05", None, ["apple", "orange"]),
    ("Grace", 28, "TX", "2021-09-12", 90.10, []),
    ("Heidi", 42, "NY", "2019-08-01", 180.30, ["banana", "kiwi", "apple"]),
    ("Ivan", 60, "CA", "2017-06-25", 400.00, ["orange"]),
    ("Judy", 33, "TX", "2020-12-01", 150.00, None)
]
columns = ["Name", "Age", "State", "EnrollmentDate", "Amount", "Items"]
df = spark.createDataFrame(data_rows, columns)
print("\n--- DataFrame created from Python list (df) with Schema Inference ---")
df.printSchema()
df.show()


--- DataFrame created from Python list (df) with Schema Inference ---
root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- State: string (nullable = true)
 |-- EnrollmentDate: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Items: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-------+----+-----+--------------+------+--------------------+
|   Name| Age|State|EnrollmentDate|Amount|               Items|
+-------+----+-----+--------------+------+--------------------+
|  Alice|  30|   NY|    2020-01-15| 100.5|     [apple, banana]|
|    Bob|  45|   CA|    2019-03-20|250.75|     [orange, grape]|
|Charlie|  25|   TX|    2021-07-01|  75.2|       [apple, kiwi]|
|  David|  50|   NY|    2018-11-10| 300.0|            [banana]|
|    Eve|NULL|   CA|    2022-02-28| 120.0|      [grape, melon]|
|  Frank|  35| NULL|    2020-05-05|  NULL|     [apple, orange]|
|  Grace|  28|   TX|    2021-09-12|  90.1|                  []|
|  Heidi|

In [None]:
# 3 different methods of filtering from pyspark.sql.functions import col

#one column filter, group by, count example in dataframe
#processed_df = df.where(df.State == "NY")

# 2 column filters, group by, count example in dataframe "|" = OR operator
#processed_df = df.where((df.State == "NY") | (df.State == "TX")).groupBy("State").count()

# 2 column filters, group by, count example in dataframe col, isin operator and .isNull
processed_df = df.where((col("State").isin("NY", "TX")) | (col("state").isNull())).groupBy("State").count()
processed_df.show()


+-----+-----+
|State|count|
+-----+-----+
|   NY|    3|
|   TX|    3|
| NULL|    1|
+-----+-----+



In [None]:
# Creating temporaryview and converting it as dataframe
# Registering as a temporary view to use SQL
processed_df.createOrReplaceTempView("nyc_tx_state_counts")

#Running the temp view using spark.sql and saving in dataframe
sparksqldf = spark.sql("SELECT State, count FROM nyc_tx_state_counts ORDER BY count DESC")
sparksqldf.show()

# If you want to do more operations on sparksqldf, you can:
sparksqldf.filter(col("count") > 1).show()

#Running the temp view using spark.table and saving in dataframe
sparktabledf = spark.table("nyc_tx_state_counts")
sparktabledf.show()


+-----+-----+
|State|count|
+-----+-----+
|   NY|    3|
|   TX|    3|
| NULL|    1|
+-----+-----+

+-----+-----+
|State|count|
+-----+-----+
|   NY|    3|
|   TX|    3|
+-----+-----+

+-----+-----+
|State|count|
+-----+-----+
|   NY|    3|
|   TX|    3|
| NULL|    1|
+-----+-----+



In [None]:
df.show()

+-------+----+-----+--------------+------+--------------------+
|   Name| Age|State|EnrollmentDate|Amount|               Items|
+-------+----+-----+--------------+------+--------------------+
|  Alice|  30|   NY|    2020-01-15| 100.5|     [apple, banana]|
|    Bob|  45|   CA|    2019-03-20|250.75|     [orange, grape]|
|Charlie|  25|   TX|    2021-07-01|  75.2|       [apple, kiwi]|
|  David|  50|   NY|    2018-11-10| 300.0|            [banana]|
|    Eve|NULL|   CA|    2022-02-28| 120.0|      [grape, melon]|
|  Frank|  35| NULL|    2020-05-05|  NULL|     [apple, orange]|
|  Grace|  28|   TX|    2021-09-12|  90.1|                  []|
|  Heidi|  42|   NY|    2019-08-01| 180.3|[banana, kiwi, ap...|
|   Ivan|  60|   CA|    2017-06-25| 400.0|            [orange]|
|   Judy|  33|   TX|    2020-12-01| 150.0|                NULL|
+-------+----+-----+--------------+------+--------------------+



In [None]:
#Data Type Changing
#using withColumn('column change name', col(source column), change type)

df_casted = df.withColumn("Age_Double", col("Age").cast(DoubleType())) \
              .withColumn("EnrollmentDate_Date", col("EnrollmentDate").cast(DateType()))

df_casted.select("Name", "Age", "Age_Double", "EnrollmentDate", "EnrollmentDate_Date").show()

+-------+----+----------+--------------+-------------------+
|   Name| Age|Age_Double|EnrollmentDate|EnrollmentDate_Date|
+-------+----+----------+--------------+-------------------+
|  Alice|  30|      30.0|    2020-01-15|         2020-01-15|
|    Bob|  45|      45.0|    2019-03-20|         2019-03-20|
|Charlie|  25|      25.0|    2021-07-01|         2021-07-01|
|  David|  50|      50.0|    2018-11-10|         2018-11-10|
|    Eve|NULL|      NULL|    2022-02-28|         2022-02-28|
|  Frank|  35|      35.0|    2020-05-05|         2020-05-05|
|  Grace|  28|      28.0|    2021-09-12|         2021-09-12|
|  Heidi|  42|      42.0|    2019-08-01|         2019-08-01|
|   Ivan|  60|      60.0|    2017-06-25|         2017-06-25|
|   Judy|  33|      33.0|    2020-12-01|         2020-12-01|
+-------+----+----------+--------------+-------------------+



In [None]:
#Adding 3 new columns
#col(columnname) we are give "column name" and column value simply passing through  lit("columnvalue")
#new Column name, when (condition passing, value 1).otherwise equal to else statemt

df_transformation = df.withColumn("Status", lit("Active",)) \
    .withColumn("Status1", lit("Active",)) \
    .withColumn("Status2_AmountTier",when(col("Amount") > 200, "High Amount") \
    .when((col("Amount") >= 100) & (col("Amount") <= 200), "Medium Amount") \
    .otherwise("Low Amount")
    )

df_transformation = df_transformation.withColumn("Amount1", col("Amount") + 1)

# Dropping 2 Column
df_transformation = df_transformation.drop("Status1","Amount1")

# Where and Filter keywords do the same operation

#ny_df_transformation = df_transformation.where(df_transformation.State == "NY")
ny_df_transformation = df_transformation.filter(df_transformation.State == "NY")
ny_df_transformation.show()

# Distinct States
df_distinct_states = df_transformation.select("State").distinct()
df_distinct_states.show()

# Order by (orderBy, sort) (Transformation)
df_ordered = ny_df_transformation.orderBy(col("Age").desc(), col("Name").asc())
df_ordered.show()

# Group by (Transformation)
df_transformation.groupBy("State").agg(avg("Age").alias("AverageAge"),sum("Amount").alias("TotalAmount")).show()


+-----+---+-----+--------------+------+--------------------+------+------------------+
| Name|Age|State|EnrollmentDate|Amount|               Items|Status|Status2_AmountTier|
+-----+---+-----+--------------+------+--------------------+------+------------------+
|Alice| 30|   NY|    2020-01-15| 100.5|     [apple, banana]|Active|     Medium Amount|
|David| 50|   NY|    2018-11-10| 300.0|            [banana]|Active|       High Amount|
|Heidi| 42|   NY|    2019-08-01| 180.3|[banana, kiwi, ap...|Active|     Medium Amount|
+-----+---+-----+--------------+------+--------------------+------+------------------+

+-----+
|State|
+-----+
|   CA|
|   NY|
|   TX|
| NULL|
+-----+

+-----+---+-----+--------------+------+--------------------+------+------------------+
| Name|Age|State|EnrollmentDate|Amount|               Items|Status|Status2_AmountTier|
+-----+---+-----+--------------+------+--------------------+------+------------------+
|David| 50|   NY|    2018-11-10| 300.0|            [banana]|Acti

In [None]:
df_expressions = df_transformation.select(
    col("Name"),
    col("Age"),
    when(col("Age") < 30, "Young")
    .when(col("Age") >= 30, "Adult")
    .otherwise("Senior").alias("AgeGroup_When"),
    expr("Age * 1.1").alias("AgeExpr")
)
df_expressions.show()

+-------+----+-------------+-------+
|   Name| Age|AgeGroup_When|AgeExpr|
+-------+----+-------------+-------+
|  Alice|  30|        Adult|   33.0|
|    Bob|  45|        Adult|   49.5|
|Charlie|  25|        Young|   27.5|
|  David|  50|        Adult|   55.0|
|    Eve|NULL|       Senior|   NULL|
|  Frank|  35|        Adult|   38.5|
|  Grace|  28|        Young|   30.8|
|  Heidi|  42|        Adult|   46.2|
|   Ivan|  60|        Adult|   66.0|
|   Judy|  33|        Adult|   36.3|
+-------+----+-------------+-------+



In [None]:
# Create another DataFrame for joins
orders_data = [
    ("Alice", "Order_A1", 150),
    ("Bob", "Order_B1", 200),
    ("Charlie", "Order_C1", 50),
    ("Alice", "Order_A2", 75),
    ("David", "Order_D1", 100),
    ("Frank", "Order_F1", 300),
    ("Zara", "Order_Z1", 500) # Zara is not in df
]
orders_columns = ["CustomerName", "OrderId", "OrderValue"]
orders_df = spark.createDataFrame(orders_data, orders_columns)
orders_df.show(5)
df.show(5)

+------------+--------+----------+
|CustomerName| OrderId|OrderValue|
+------------+--------+----------+
|       Alice|Order_A1|       150|
|         Bob|Order_B1|       200|
|     Charlie|Order_C1|        50|
|       Alice|Order_A2|        75|
|       David|Order_D1|       100|
+------------+--------+----------+
only showing top 5 rows

+-------+----+-----+--------------+------+---------------+
|   Name| Age|State|EnrollmentDate|Amount|          Items|
+-------+----+-----+--------------+------+---------------+
|  Alice|  30|   NY|    2020-01-15| 100.5|[apple, banana]|
|    Bob|  45|   CA|    2019-03-20|250.75|[orange, grape]|
|Charlie|  25|   TX|    2021-07-01|  75.2|  [apple, kiwi]|
|  David|  50|   NY|    2018-11-10| 300.0|       [banana]|
|    Eve|NULL|   CA|    2022-02-28| 120.0| [grape, melon]|
+-------+----+-----+--------------+------+---------------+
only showing top 5 rows



In [None]:
# Inner Join: Returns rows when there is a match in both DataFrames.
inner_join_df = df.join(orders_df, df["Name"] == orders_df["CustomerName"], "inner")
#inner_join_df.show()

left_outer_join_df = df.join(orders_df, df["Name"] == orders_df["CustomerName"], "left_outer")
#left_outer_join_df.show()

right_outer_join_df = df.join(orders_df, df["Name"] == orders_df["CustomerName"], "right_outer")
#right_outer_join_df.show()

full_outer_join_df = df.join(orders_df, df["Name"] == orders_df["CustomerName"], "full_outer")
#full_outer_join_df.show()

#Returns only left table and also returns only right matching column values
left_semi_join_df = df.join(orders_df, df["Name"] == orders_df["CustomerName"], "left_semi")
#left_semi_join_df.show()

#Returns only left table and also returns only not right matching column values
left_anti_join_df = df.join(orders_df, df["Name"] == orders_df["CustomerName"], "left_anti")
#left_anti_join_df.show()

In [None]:
#---Aggregate Functions ---
df.agg(
    count("*").alias("TotalRecords"),
    count("Age").alias("NonNullAges"), # count non-null values
    sum("Amount").alias("TotalAmount"),
    avg("Amount").alias("AverageAmount"),
    min("Age").alias("MinAge"),
    max("Age").alias("MaxAge"),
    countDistinct("State").alias("DistinctStates"), # Count distinct values
    approx_count_distinct("State").alias("ApproxDistinctStates") # Approximate distinct count (faster for large datasets)
).show()

+------------+-----------+-----------+------------------+------+------+--------------+--------------------+
|TotalRecords|NonNullAges|TotalAmount|     AverageAmount|MinAge|MaxAge|DistinctStates|ApproxDistinctStates|
+------------+-----------+-----------+------------------+------+------+--------------+--------------------+
|          10|          9|    1666.85|185.20555555555555|    25|    60|             3|                   3|
+------------+-----------+-----------+------------------+------+------+--------------+--------------------+



In [None]:
# String Functions ---
string_df = df.select(
    col("Name"),
    lower(col("Name")).alias("Name_Lower"),
    upper(col("Name")).alias("Name_Upper"),
    trim(lit("  Trim Me  ")).alias("Trimmed_String"),
    concat_ws("-", col("Name"), col("State")).alias("Name_State_Concat"),
    substring(col("Name"), 1, 3).alias("Name_Substr"), # 1-based index
    length(col("Name")).alias("Name_Length"),
    col("Name").like("A%").alias("Name_Starts_With_A_Like") # Using like for pattern matching
)
string_df.show()

+-------+----------+----------+--------------+-----------------+-----------+-----------+-----------------------+
|   Name|Name_Lower|Name_Upper|Trimmed_String|Name_State_Concat|Name_Substr|Name_Length|Name_Starts_With_A_Like|
+-------+----------+----------+--------------+-----------------+-----------+-----------+-----------------------+
|  Alice|     alice|     ALICE|       Trim Me|         Alice-NY|        Ali|          5|                   true|
|    Bob|       bob|       BOB|       Trim Me|           Bob-CA|        Bob|          3|                  false|
|Charlie|   charlie|   CHARLIE|       Trim Me|       Charlie-TX|        Cha|          7|                  false|
|  David|     david|     DAVID|       Trim Me|         David-NY|        Dav|          5|                  false|
|    Eve|       eve|       EVE|       Trim Me|           Eve-CA|        Eve|          3|                  false|
|  Frank|     frank|     FRANK|       Trim Me|            Frank|        Fra|          5|        

In [None]:
# --- 8. Regex Functions ---
print("\n--- Regex Functions ---")
regex_df = df.withColumn("Email", lit("user@example.com")) \
             .withColumn("Phone", lit("123-456-7890"))

regex_df_processed = regex_df.select(
    col("Name"),
    col("Email"),

    #.+ means "match one or more of any character."
    regexp_extract(col("Email"), r"(.+)@", 1).alias("User"),      # Extracts everything before @
    #\w+ means "fetching words after the @. + incrmenting \start and end
    regexp_extract(col("Email"), r"@(\w+)\.com", 1).alias("Domain"), # Extract domain
    col("Phone"),
    regexp_replace(col("Phone"), r"-", "").alias("Phone_No_Dashes") # Remove dashes
)
regex_df_processed.show()


--- Regex Functions ---
+-------+----------------+----+-------+------------+---------------+
|   Name|           Email|User| Domain|       Phone|Phone_No_Dashes|
+-------+----------------+----+-------+------------+---------------+
|  Alice|user@example.com|user|example|123-456-7890|     1234567890|
|    Bob|user@example.com|user|example|123-456-7890|     1234567890|
|Charlie|user@example.com|user|example|123-456-7890|     1234567890|
|  David|user@example.com|user|example|123-456-7890|     1234567890|
|    Eve|user@example.com|user|example|123-456-7890|     1234567890|
|  Frank|user@example.com|user|example|123-456-7890|     1234567890|
|  Grace|user@example.com|user|example|123-456-7890|     1234567890|
|  Heidi|user@example.com|user|example|123-456-7890|     1234567890|
|   Ivan|user@example.com|user|example|123-456-7890|     1234567890|
|   Judy|user@example.com|user|example|123-456-7890|     1234567890|
+-------+----------------+----+-------+------------+---------------+



In [None]:
# --- Date Functions ---
date_df = df.select(
    col("Name"),
    col("EnrollmentDate").cast(DateType()).alias("EnrollmentDate_Date"),
    current_date().alias("CurrentDate"),
    date_add(col("EnrollmentDate"), 7).alias("Date_Plus_7_Days"),
    date_sub(col("EnrollmentDate"), 3).alias("Date_Minus_3_Days"),
    datediff(current_date(), col("EnrollmentDate")).alias("DaysSinceEnrollment"),
    month(col("EnrollmentDate")).alias("EnrollmentMonth"),
    year(col("EnrollmentDate")).alias("EnrollmentYear"),
    dayofmonth(col("EnrollmentDate")).alias("EnrollmentDay"),
    to_date(lit("2023-10-26"), "yyyy-MM-dd").alias("ParsedDate"),
    to_timestamp(lit("2023-10-26 14:30:00"), "yyyy-MM-dd HH:mm:ss").alias("ParsedTimestamp")
)
date_df.show()
date_df.printSchema()

+-------+-------------------+-----------+----------------+-----------------+-------------------+---------------+--------------+-------------+----------+-------------------+
|   Name|EnrollmentDate_Date|CurrentDate|Date_Plus_7_Days|Date_Minus_3_Days|DaysSinceEnrollment|EnrollmentMonth|EnrollmentYear|EnrollmentDay|ParsedDate|    ParsedTimestamp|
+-------+-------------------+-----------+----------------+-----------------+-------------------+---------------+--------------+-------------+----------+-------------------+
|  Alice|         2020-01-15| 2025-07-29|      2020-01-22|       2020-01-12|               2022|              1|          2020|           15|2023-10-26|2023-10-26 14:30:00|
|    Bob|         2019-03-20| 2025-07-29|      2019-03-27|       2019-03-17|               2323|              3|          2019|           20|2023-10-26|2023-10-26 14:30:00|
|Charlie|         2021-07-01| 2025-07-29|      2021-07-08|       2021-06-28|               1489|              7|          2021|        

In [None]:
# --------Null Handling (na, isnull, isnotnull, isnan) ---

# Replacing null value withthe 0 and unknown
df_filled = df.na.fill({"Age": 0, "State": "Unknown", "Amount": 0.0})
df_filled.show()

# Drop rows with any null values (dropna)
df_dropped_any_null = df.na.drop()
df_dropped_any_null.show()

# Drop rows with nulls in specific columns
df_dropped_age_state_null = df.na.drop(subset=["Age", "State"])
df_dropped_age_state_null.show()

# Replace specific values (e.g., replace 0 in Age with 1)
df_replaced = df_filled.na.replace(0, 1, "Age")
df_replaced.show()

# Check for null/NaN values using functions (isnull, isnotnull, isnan)
df_null_check = df.select(
    col("Name"),
    isnull(col("Age")).alias("IsAgeNull"),
    isnotnull(col("Age")).alias("IsAgeNotNull"),
    isnull(col("State")).alias("IsStateNull"),
    isnull(col("Amount")).alias("IsAmountNull"),
    isnan(col("Amount")).alias("IsAmountNaN") # isnan only works for numeric types
)
df_null_check.show()

+-------+---+-------+--------------+------+--------------------+
|   Name|Age|  State|EnrollmentDate|Amount|               Items|
+-------+---+-------+--------------+------+--------------------+
|  Alice| 30|     NY|    2020-01-15| 100.5|     [apple, banana]|
|    Bob| 45|     CA|    2019-03-20|250.75|     [orange, grape]|
|Charlie| 25|     TX|    2021-07-01|  75.2|       [apple, kiwi]|
|  David| 50|     NY|    2018-11-10| 300.0|            [banana]|
|    Eve|  0|     CA|    2022-02-28| 120.0|      [grape, melon]|
|  Frank| 35|Unknown|    2020-05-05|   0.0|     [apple, orange]|
|  Grace| 28|     TX|    2021-09-12|  90.1|                  []|
|  Heidi| 42|     NY|    2019-08-01| 180.3|[banana, kiwi, ap...|
|   Ivan| 60|     CA|    2017-06-25| 400.0|            [orange]|
|   Judy| 33|     TX|    2020-12-01| 150.0|                NULL|
+-------+---+-------+--------------+------+--------------------+

+-------+---+-----+--------------+------+--------------------+
|   Name|Age|State|Enrollm

In [None]:
df.show()

+-------+----+-----+--------------+------+--------------------+
|   Name| Age|State|EnrollmentDate|Amount|               Items|
+-------+----+-----+--------------+------+--------------------+
|  Alice|  30|   NY|    2020-01-15| 100.5|     [apple, banana]|
|    Bob|  45|   CA|    2019-03-20|250.75|     [orange, grape]|
|Charlie|  25|   TX|    2021-07-01|  75.2|       [apple, kiwi]|
|  David|  50|   NY|    2018-11-10| 300.0|            [banana]|
|    Eve|NULL|   CA|    2022-02-28| 120.0|      [grape, melon]|
|  Frank|  35| NULL|    2020-05-05|  NULL|     [apple, orange]|
|  Grace|  28|   TX|    2021-09-12|  90.1|                  []|
|  Heidi|  42|   NY|    2019-08-01| 180.3|[banana, kiwi, ap...|
|   Ivan|  60|   CA|    2017-06-25| 400.0|            [orange]|
|   Judy|  33|   TX|    2020-12-01| 150.0|                NULL|
+-------+----+-----+--------------+------+--------------------+



In [None]:
# Explode: Creates a new row for each element in an array column.
# If the array is null or empty, the row is not generated.
df_exploded = df.select("Name", explode("Items").alias("Item"))
print("DataFrame after exploding 'Items' array:")
df_exploded.show(5)

# array_contains: Checks if an array column contains a specific value.
df_contains_apple = df.withColumn("HasApple", array_contains(col("Items"), "apple"))
print("DataFrame with 'HasApple' column:")
df_contains_apple.show(5)

# size: Returns the size of an array or map.
df_item_count = df.withColumn("ItemCount", size(col("Items")))
print("DataFrame with 'ItemCount' column (size of Items array):")
df_item_count.show(5)

DataFrame after exploding 'Items' array:
+-------+------+
|   Name|  Item|
+-------+------+
|  Alice| apple|
|  Alice|banana|
|    Bob|orange|
|    Bob| grape|
|Charlie| apple|
+-------+------+
only showing top 5 rows

DataFrame with 'HasApple' column:
+-------+----+-----+--------------+------+---------------+--------+
|   Name| Age|State|EnrollmentDate|Amount|          Items|HasApple|
+-------+----+-----+--------------+------+---------------+--------+
|  Alice|  30|   NY|    2020-01-15| 100.5|[apple, banana]|    true|
|    Bob|  45|   CA|    2019-03-20|250.75|[orange, grape]|   false|
|Charlie|  25|   TX|    2021-07-01|  75.2|  [apple, kiwi]|    true|
|  David|  50|   NY|    2018-11-10| 300.0|       [banana]|   false|
|    Eve|NULL|   CA|    2022-02-28| 120.0| [grape, melon]|   false|
+-------+----+-----+--------------+------+---------------+--------+
only showing top 5 rows

DataFrame with 'ItemCount' column (size of Items array):
+-------+----+-----+--------------+------+-----------

In [None]:

# --- 12. Math Functions ---
print("\n--- Math Functions ---")
math_df = products_df.select(
    col("product"),
    col("price"),
    round(col("price"), 0).alias("Price_Rounded"),
    ceil(col("price")).alias("Price_Ceil"),
    floor(col("price")).alias("Price_Floor"),
    sqrt(col("price")).alias("Price_Sqrt"),
    pow(col("price"), 2).alias("Price_Squared")
)
math_df.show()



--- Math Functions ---
+--------+------+-------------+----------+-----------+------------------+-------------+
| product| price|Price_Rounded|Price_Ceil|Price_Floor|        Price_Sqrt|Price_Squared|
+--------+------+-------------+----------+-----------+------------------+-------------+
|  Laptop|1200.0|       1200.0|      1200|       1200| 34.64101615137755|    1440000.0|
|   Mouse|  25.5|         26.0|        26|         25| 5.049752469181039|       650.25|
|Keyboard|  75.0|         75.0|        75|         75| 8.660254037844387|       5625.0|
| Monitor| 300.0|        300.0|       300|        300|17.320508075688775|      90000.0|
|  Webcam|  50.0|         50.0|        50|         50|7.0710678118654755|       2500.0|
+--------+------+-------------+----------+-----------+------------------+-------------+



In [None]:
# --- Window Functions ---

#window_spec = Window.partitionBy("State").orderBy("Age")

# Add row number, rank, dense rank, and NTILE
df_window = df.withColumn("row_number", row_number().over(Window.partitionBy("State").orderBy("Age"))) \
              .withColumn("rank", rank().over(Window.partitionBy("State").orderBy("Age"))) \
              .withColumn("dense_rank", dense_rank().over(Window.partitionBy("State").orderBy("Age"))) \
              .withColumn("ntile_2", ntile(2).over(Window.partitionBy("State").orderBy("Age")))
df_window.show()

+-------+----+-----+--------------+------+--------------------+----------+----+----------+-------+
|   Name| Age|State|EnrollmentDate|Amount|               Items|row_number|rank|dense_rank|ntile_2|
+-------+----+-----+--------------+------+--------------------+----------+----+----------+-------+
|  Frank|  35| NULL|    2020-05-05|  NULL|     [apple, orange]|         1|   1|         1|      1|
|    Eve|NULL|   CA|    2022-02-28| 120.0|      [grape, melon]|         1|   1|         1|      1|
|    Bob|  45|   CA|    2019-03-20|250.75|     [orange, grape]|         2|   2|         2|      1|
|   Ivan|  60|   CA|    2017-06-25| 400.0|            [orange]|         3|   3|         3|      2|
|  Alice|  30|   NY|    2020-01-15| 100.5|     [apple, banana]|         1|   1|         1|      1|
|  Heidi|  42|   NY|    2019-08-01| 180.3|[banana, kiwi, ap...|         2|   2|         2|      1|
|  David|  50|   NY|    2018-11-10| 300.0|            [banana]|         3|   3|         3|      2|
|Charlie| 

In [None]:
# Lead: Accesses a value from a subsequent row.
# Lag: Accesses a value from a previous row.
df_window = df_window.withColumn("next_enrollment_date", lead(col("EnrollmentDate"), 1).over(Window.partitionBy("State").orderBy("EnrollmentDate"))) \
                     .withColumn("prev_enrollment_amount", lag(col("Amount"), 1).over(Window.partitionBy("State").orderBy("EnrollmentDate")))

print("DataFrame with Window Functions:")
df_window.select("Name", "Age", "State", "EnrollmentDate", "Amount",
                 "row_number", "rank", "dense_rank", "ntile_2",
                 "next_enrollment_date", "prev_enrollment_amount").orderBy("State", "Age").show()

DataFrame with Window Functions:
+-------+----+-----+--------------+------+----------+----+----------+-------+--------------------+----------------------+
|   Name| Age|State|EnrollmentDate|Amount|row_number|rank|dense_rank|ntile_2|next_enrollment_date|prev_enrollment_amount|
+-------+----+-----+--------------+------+----------+----+----------+-------+--------------------+----------------------+
|  Frank|  35| NULL|    2020-05-05|  NULL|         1|   1|         1|      1|                NULL|                  NULL|
|    Eve|NULL|   CA|    2022-02-28| 120.0|         1|   1|         1|      1|                NULL|                250.75|
|    Bob|  45|   CA|    2019-03-20|250.75|         2|   2|         2|      1|          2022-02-28|                 400.0|
|   Ivan|  60|   CA|    2017-06-25| 400.0|         3|   3|         3|      2|          2019-03-20|                  NULL|
|  Alice|  30|   NY|    2020-01-15| 100.5|         1|   1|         1|      1|                NULL|               

In [None]:
format_df = df.select(
    col("Name"),
    col("Amount"),
    format_number(col("Amount"), 2).alias("Amount_Formatted"), # Format to 2 decimal places
    format_string("Name: %s, Age: %d", col("Name"), col("Age")).alias("Formatted_String")
)
format_df.show()

+-------+------+----------------+--------------------+
|   Name|Amount|Amount_Formatted|    Formatted_String|
+-------+------+----------------+--------------------+
|  Alice| 100.5|          100.50|Name: Alice, Age: 30|
|    Bob|250.75|          250.75|  Name: Bob, Age: 45|
|Charlie|  75.2|           75.20|Name: Charlie, Ag...|
|  David| 300.0|          300.00|Name: David, Age: 50|
|    Eve| 120.0|          120.00|Name: Eve, Age: null|
|  Frank|  NULL|            NULL|Name: Frank, Age: 35|
|  Grace|  90.1|           90.10|Name: Grace, Age: 28|
|  Heidi| 180.3|          180.30|Name: Heidi, Age: 42|
|   Ivan| 400.0|          400.00| Name: Ivan, Age: 60|
|   Judy| 150.0|          150.00| Name: Judy, Age: 33|
+-------+------+----------------+--------------------+



In [None]:
# User Defined Functions (UDF) ---

# Define a Python function
def get_age_category(age):
    if age is None:
        return "Unknown"
    elif age < 18:
        return "Minor"
    elif 18 <= age < 60:
        return "Adult"
    else:
        return "Senior"

# Register the Python function as a UDF using the decorator syntax (@udf)
# This is syntactic sugar for udf(get_age_category, StringType())

@udf(StringType())
def get_age_category_udf(age):
    return get_age_category(age)

# Apply the UDF to the DataFrame
df_with_udf = df.withColumn("AgeCategory", get_age_category_udf(col("Age")))
print("DataFrame with UDF 'AgeCategory':")
df_with_udf.show()


DataFrame with UDF 'AgeCategory':
+-------+----+-----+--------------+------+--------------------+-----------+
|   Name| Age|State|EnrollmentDate|Amount|               Items|AgeCategory|
+-------+----+-----+--------------+------+--------------------+-----------+
|  Alice|  30|   NY|    2020-01-15| 100.5|     [apple, banana]|      Adult|
|    Bob|  45|   CA|    2019-03-20|250.75|     [orange, grape]|      Adult|
|Charlie|  25|   TX|    2021-07-01|  75.2|       [apple, kiwi]|      Adult|
|  David|  50|   NY|    2018-11-10| 300.0|            [banana]|      Adult|
|    Eve|NULL|   CA|    2022-02-28| 120.0|      [grape, melon]|    Unknown|
|  Frank|  35| NULL|    2020-05-05|  NULL|     [apple, orange]|      Adult|
|  Grace|  28|   TX|    2021-09-12|  90.1|                  []|      Adult|
|  Heidi|  42|   NY|    2019-08-01| 180.3|[banana, kiwi, ap...|      Adult|
|   Ivan|  60|   CA|    2017-06-25| 400.0|            [orange]|     Senior|
|   Judy|  33|   TX|    2020-12-01| 150.0|            

In [None]:
# Another UDF example using the explicit udf() function
def calculate_discount(amount, is_senior):
    if amount is None:
        return None
    if is_senior == "Senior":
        return amount * 0.90 # 10% discount for seniors
    return amount

# Register the UDF with appropriate return type
calculate_discount_udf = udf(calculate_discount, DoubleType())

df_with_udf_discount = df_with_udf.withColumn(
    "DiscountedAmount",
    calculate_discount_udf(col("Amount"), col("AgeCategory"))
)
print("DataFrame with UDF 'DiscountedAmount':")
df_with_udf_discount.show()

DataFrame with UDF 'DiscountedAmount':
+-------+----+-----+--------------+------+--------------------+-----------+----------------+
|   Name| Age|State|EnrollmentDate|Amount|               Items|AgeCategory|DiscountedAmount|
+-------+----+-----+--------------+------+--------------------+-----------+----------------+
|  Alice|  30|   NY|    2020-01-15| 100.5|     [apple, banana]|      Adult|           100.5|
|    Bob|  45|   CA|    2019-03-20|250.75|     [orange, grape]|      Adult|          250.75|
|Charlie|  25|   TX|    2021-07-01|  75.2|       [apple, kiwi]|      Adult|            75.2|
|  David|  50|   NY|    2018-11-10| 300.0|            [banana]|      Adult|           300.0|
|    Eve|NULL|   CA|    2022-02-28| 120.0|      [grape, melon]|    Unknown|           120.0|
|  Frank|  35| NULL|    2020-05-05|  NULL|     [apple, orange]|      Adult|            NULL|
|  Grace|  28|   TX|    2021-09-12|  90.1|                  []|      Adult|            90.1|
|  Heidi|  42|   NY|    2019-08

In [None]:
# Add some configurations (example: setting shuffle partitions)
# Spark.sql.shuffle.partitions: Controls the number of partitions used in shuffle operations.
# A higher number can reduce partition size but increase overhead.
spark_builder.config("spark.sql.shuffle.partitions", "8") # Increased for demonstration
spark_builder.config("spark.driver.memory", "2g")

In [None]:
spark.conf.get("spark.sql.shuffle.partitions")
#To set configurations
#spark_builder.conf.set("spark.sql.shuffle.partitions", "8")

'200'

In [None]:
spark.conf.get("spark.driver.memory")

Py4JJavaError: An error occurred while calling o635.get.
: org.apache.spark.SparkNoSuchElementException: [SQL_CONF_NOT_FOUND] The SQL config "spark.driver.memory" cannot be found. Please verify that the config exists.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.sqlConfigNotFoundError(QueryExecutionErrors.scala:1984)
	at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:5274)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:5274)
	at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:81)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
spark.sparkContext.getConf().get("spark.driver.memory")

In [None]:
# Create a sample DataFrame
data = [
    ("Alice", 25, "NY", 50000.0, True),
    ("Bob", 30, "CA", 60000.5, False),
    ("Charlie", 35, "TX", 70000.0, True),
    ("David", 28, "NY", 55000.0, False)
]
columns = ["Name", "Age", "State", "Salary", "IsActive"]
sample_df = spark.createDataFrame(data, columns)

print("--- Original DataFrame to be saved ---")
sample_df.show()
sample_df.printSchema()

--- Original DataFrame to be saved ---
+-------+---+-----+-------+--------+
|   Name|Age|State| Salary|IsActive|
+-------+---+-----+-------+--------+
|  Alice| 25|   NY|50000.0|    true|
|    Bob| 30|   CA|60000.5|   false|
|Charlie| 35|   TX|70000.0|    true|
|  David| 28|   NY|55000.0|   false|
+-------+---+-----+-------+--------+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- State: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [None]:
# Define paths for saving
parquet_path = "temp_data/sample.parquet"
csv_path = "temp_data/sample.csv"
json_path = "temp_data/sample.json"
orc_path = "temp_data/sample.orc"

# 3. Save the DataFrame in different formats (for loading demonstrations)
# Note: coalesce(1) is used to create a single file for simplicity,
# but it's not recommended for large-scale production writes.
sample_df.write.mode("overwrite").format("parquet").save(parquet_path)
sample_df.write.mode("overwrite").format("csv").option("header", "true").save(csv_path)
sample_df.write.mode("overwrite").format("json").save(json_path)
sample_df.write.mode("overwrite").format("orc").save(orc_path)

In [None]:
#Loading Parquet
print("\n--- Loading Parquet Data ---")
df_parquet = spark.read.load( path=parquet_path, format="parquet" )
df_parquet.show()
df_parquet.printSchema()


--- Loading Parquet Data ---
+-------+---+-----+-------+--------+
|   Name|Age|State| Salary|IsActive|
+-------+---+-----+-------+--------+
|Charlie| 35|   TX|70000.0|    true|
|  David| 28|   NY|55000.0|   false|
|  Alice| 25|   NY|50000.0|    true|
|    Bob| 30|   CA|60000.5|   false|
+-------+---+-----+-------+--------+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- State: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [None]:
print("\n--- Loading CSV Data ---")
df_csv = spark.read.load(
    path=csv_path,
    format="csv",
    header="true",       # Specify that the first row is a header
    inferSchema="true"   # Infer data types automatically
    # Or, to manually specify schema for better control:
    # schema=your_manual_schema
)
df_csv.show()
df_csv.printSchema()


--- Loading CSV Data ---
+-------+---+-----+-------+--------+
|   Name|Age|State| Salary|IsActive|
+-------+---+-----+-------+--------+
|Charlie| 35|   TX|70000.0|    true|
|  David| 28|   NY|55000.0|   false|
|  Alice| 25|   NY|50000.0|    true|
|    Bob| 30|   CA|60000.5|   false|
+-------+---+-----+-------+--------+

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [None]:
print("\n--- Loading JSON Data ---")
df_json = spark.read.load(
    path=json_path,
    format="json",
    inferSchema="true" # inferSchema can be useful for JSON
)
df_json.show()
df_json.printSchema()


--- Loading JSON Data ---
+---+--------+-------+-------+-----+
|Age|IsActive|   Name| Salary|State|
+---+--------+-------+-------+-----+
| 35|    true|Charlie|70000.0|   TX|
| 28|   false|  David|55000.0|   NY|
| 25|    true|  Alice|50000.0|   NY|
| 30|   false|    Bob|60000.5|   CA|
+---+--------+-------+-------+-----+

root
 |-- Age: long (nullable = true)
 |-- IsActive: boolean (nullable = true)
 |-- Name: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- State: string (nullable = true)



In [None]:
print("\n--- Loading ORC Data ---")
df_orc = spark.read.load(
    path=orc_path,
    format="orc"
)
df_orc.show()
df_orc.printSchema()


--- Loading ORC Data ---
+-------+---+-----+-------+--------+
|   Name|Age|State| Salary|IsActive|
+-------+---+-----+-------+--------+
|Charlie| 35|   TX|70000.0|    true|
|  David| 28|   NY|55000.0|   false|
|  Alice| 25|   NY|50000.0|    true|
|    Bob| 30|   CA|60000.5|   false|
+-------+---+-----+-------+--------+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- State: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [None]:
print("\n--- Loading CSV Data using options() ---")
csv_options = {
    "header": "true",
    "inferSchema": "true",
    "delimiter": "," # explicitly define delimiter if needed
}
df_csv_options = spark.read.load(
    path=csv_path,
    format="csv",
    **csv_options # Unpack the dictionary
)
df_csv_options.show()
df_csv_options.printSchema()


--- Loading CSV Data using options() ---
+-------+---+-----+-------+--------+
|   Name|Age|State| Salary|IsActive|
+-------+---+-----+-------+--------+
|Charlie| 35|   TX|70000.0|    true|
|  David| 28|   NY|55000.0|   false|
|  Alice| 25|   NY|50000.0|    true|
|    Bob| 30|   CA|60000.5|   false|
+-------+---+-----+-------+--------+

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- IsActive: boolean (nullable = true)



In [None]:
#Database Functions

#To check the current Database
spark.catalog.currentDatabase
#To set the current Database
spark.catalog.setCurrentDatabase('test2')
#To set the list of Databases
spark.catalog.listDatabases[0]

In [None]:
#Table Functions

#To check the list of  columns
spark.catalog.listcolumns
#To check the list of tables in current database
spark.catalog.listTables
#To check the list of cache table
spark.catalog.cacheTable
#To check the list of un cache table
spark.catalog.uncacheTable
#To check the list of iscached
spark.catalog.iscache
#To check the list of clear cache
spark.catalog.clearCache
#To check the list of recoverPartitions
spark.catalog.recoverPartitions
#To check the list of refreshTable
spark.catalog.refreshTable
#To check the list of refreshpath
spark.catalog.refreshpath

In [None]:
# View Functions
# drop temporaryview
# drop globaltemporaryview


In [None]:
# Broadcasting Pros:
  # 1. Eliminates Shuffle: The primary benefit is avoiding the network I/O and disk writes associated with shuffling the larger table.
      # This is a huge performance gain.
  # 2. Faster for Small-Large Joins: Ideal when one table is small enough to fit comfortably in executor memory.

# Broadcasting Cons:
  # 1. Memory Intensive: If the "small" table is too large, broadcasting it can cause OutOfMemoryError (OOM) on the driver or the executors.
  # 2. Driver Bottleneck: The driver might struggle to collect and serialize a very large table for broadcasting.
  # 3. joined_df = large_df.join(broadcast(small_df), "key", "inner")

In [None]:
# Bucketing / Partitioning / Salting

In [None]:
# repartition() vs coalesce()