<a href="https://colab.research.google.com/github/Rachelllle/Spark-Core/blob/main/Session05.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark import SparkContext

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrame").getOrCreate()

data = [
    (1, "Alice",   "FR", 100),
    (2, "Bob",     "FR", 200),
    (3, "Charlie", "UK", 150),
    (4, "David",   "FR",  50),
    (5, "Eve",     "UK", 300)
]

# Create DataFrame with column names
df = spark.createDataFrame(data, ["id", "name", "country", "amount"])

df.show()

+---+-------+-------+------+
| id|   name|country|amount|
+---+-------+-------+------+
|  1|  Alice|     FR|   100|
|  2|    Bob|     FR|   200|
|  3|Charlie|     UK|   150|
|  4|  David|     FR|    50|
|  5|    Eve|     UK|   300|
+---+-------+-------+------+



In [3]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- amount: long (nullable = true)



In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema explicitly
schema = StructType([
    StructField("id",      IntegerType(), nullable=False),
    StructField("name",    StringType(),  nullable=False),
    StructField("country", StringType(),  nullable=False),
    StructField("amount",  IntegerType(), nullable=False)
])

# Create DataFrame with explicit schema
df_typed = spark.createDataFrame(data, schema)

df_typed.printSchema()

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- country: string (nullable = false)
 |-- amount: integer (nullable = false)



In [14]:
df.select("name", "amount") \
  .filter(df.amount > 100) \
  .orderBy("amount", ascending=False) \
  .show()

+-------+------+
|   name|amount|
+-------+------+
|    Eve|   300|
|    Bob|   200|
|Charlie|   150|
+-------+------+



In [6]:
from pyspark.sql.functions import sum

df.groupBy("country").agg(sum("amount").alias("total")).show()

+-------+-----+
|country|total|
+-------+-----+
|     FR|  350|
|     UK|  450|
+-------+-----+



In [7]:
# Observe the execution plan
df.groupBy("country").agg(sum("amount")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[country#2], functions=[sum(amount#3L)])
   +- Exchange hashpartitioning(country#2, 200), ENSURE_REQUIREMENTS, [plan_id=87]
      +- HashAggregate(keys=[country#2], functions=[partial_sum(amount#3L)])
         +- Project [country#2, amount#3L]
            +- Scan ExistingRDD[id#0L,name#1,country#2,amount#3L]




In [9]:
from pyspark.sql.functions import sum, col # Ensure col is imported if not already

# ✅ Filter FIRST, then groupBy (better)
df.filter(df.amount > 100) \
  .groupBy("country") \
  .agg(sum("amount").alias("total")) \
  .explain()

# ❌ groupBy FIRST, then filter (worse - original intent, but with corrected syntax)
df_grouped = df.groupBy("country") \
               .agg(sum("amount").alias("total"))

# The error was trying to filter on df.amount which is not in df_grouped.
# This correction makes the second part runnable by filtering on the aggregated 'total' column.
df_grouped.filter(col("total") > 100) \
          .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[country#2], functions=[sum(amount#3L)])
   +- Exchange hashpartitioning(country#2, 200), ENSURE_REQUIREMENTS, [plan_id=129]
      +- HashAggregate(keys=[country#2], functions=[partial_sum(amount#3L)])
         +- Project [country#2, amount#3L]
            +- Filter (isnotnull(amount#3L) AND (amount#3L > 100))
               +- Scan ExistingRDD[id#0L,name#1,country#2,amount#3L]


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(total#74L) AND (total#74L > 100))
   +- HashAggregate(keys=[country#2], functions=[sum(amount#3L)])
      +- Exchange hashpartitioning(country#2, 200), ENSURE_REQUIREMENTS, [plan_id=150]
         +- HashAggregate(keys=[country#2], functions=[partial_sum(amount#3L)])
            +- Project [country#2, amount#3L]
               +- Scan ExistingRDD[id#0L,name#1,country#2,amount#3L]




In [10]:
# Create a temporary view
df.createOrReplaceTempView("transactions")

# Now use pure SQL !
result = spark.sql("""
    SELECT country, SUM(amount) as total
    FROM transactions
    GROUP BY country
    ORDER BY total DESC
""")

result.show()

+-------+-----+
|country|total|
+-------+-----+
|     UK|  450|
|     FR|  350|
+-------+-----+



In [13]:
# 1. DataFrame API
df.groupBy("country").agg(sum("amount"))

# 2. SQL on a temp view
spark.sql("SELECT country, SUM(amount) FROM transactions GROUP BY country")

# 3. RDD (old way, much more verbose)
# rdd.map(...).reduceByKey(...)

DataFrame[country: string, sum(amount): bigint]