In [0]:
data = [
    (1, "Alice", "Electronics", 1200),
    (2, "Bob", "Clothing", 800),
    (3, "Charlie", "Electronics", 1500),
    (4, "Alice", "Clothing", 700),
    (5, "Bob", "Electronics", 0),      # invalid amount
    (6, "Alice", "Electronics", -200)  # invalid amount
]

columns = ["order_id", "customer", "category", "amount"]

df = spark.createDataFrame(data, columns)
df.show()


+--------+--------+-----------+------+
|order_id|customer|   category|amount|
+--------+--------+-----------+------+
|       1|   Alice|Electronics|  1200|
|       2|     Bob|   Clothing|   800|
|       3| Charlie|Electronics|  1500|
|       4|   Alice|   Clothing|   700|
|       5|     Bob|Electronics|     0|
|       6|   Alice|Electronics|  -200|
+--------+--------+-----------+------+



In [0]:
df.printSchema()
print("Total rows:", df.count())


root
 |-- order_id: long (nullable = true)
 |-- customer: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amount: long (nullable = true)

Total rows: 6


In [0]:
from pyspark.sql.functions import col

# Null check
print("Null count per column:")
for c in df.columns:
    print(c, ":", df.filter(col(c).isNull()).count())

# Invalid amount check
invalid_amount = df.filter(col("amount") <= 0).count()
print("Invalid amount rows:", invalid_amount)


Null count per column:
order_id : 0
customer : 0
category : 0
amount : 0
Invalid amount rows: 2


In [0]:
clean_df = df.filter(col("amount") > 0)
clean_df.show()


+--------+--------+-----------+------+
|order_id|customer|   category|amount|
+--------+--------+-----------+------+
|       1|   Alice|Electronics|  1200|
|       2|     Bob|   Clothing|   800|
|       3| Charlie|Electronics|  1500|
|       4|   Alice|   Clothing|   700|
+--------+--------+-----------+------+



In [0]:
from pyspark.sql.functions import sum, count

result_df = (
    clean_df
    .groupBy("category")
    .agg(
        sum("amount").alias("total_revenue"),
        count("order_id").alias("total_orders")
    )
)

result_df.show()


+-----------+-------------+------------+
|   category|total_revenue|total_orders|
+-----------+-------------+------------+
|Electronics|         2700|           2|
|   Clothing|         1500|           2|
+-----------+-------------+------------+



In [0]:
clean_df.createOrReplaceTempView("sales")


In [0]:
%sql
SELECT category, SUM(amount) AS revenue
FROM sales
GROUP BY category;


category,revenue
Electronics,2700
Clothing,1500


In [0]:
top_category = (
    result_df
    .orderBy(col("total_revenue").desc())
    .first()
)

print("📊 Business Insight")
print("Top revenue category:", top_category["category"])
print("Revenue:", top_category["total_revenue"])


📊 Business Insight
Top revenue category: Electronics
Revenue: 2700
