In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col,mean
from pyspark.sql.types import (
    StructType, StructField, IntegerType, LongType,
    StringType, FloatType, TimestampType
)

# Create Spark session
spark = SparkSession.builder \
    .appName("JapanTradeStats") \
    .master("local[*]") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


In [None]:
schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("category_id", StringType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

# Load CSV with schema
df = spark.read.csv(
    "file:///home/eng-mohammed/Desktop/Data.csv",
    header=True,
    schema=schema
)
data =df
data.cache()
data.printSchema()

In [None]:
#---------------------------------------------------------------part2--------------------------------------
# understant data
print("Total data is : ",data.count())
print("describe data : ",data.describe())
# data.show(5)



In [None]:
null_counts = data.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in data.columns
])
null_counts.show()

In [None]:
from pyspark.sql.functions import col, when, mean 
df = data.withColumn("product_id", when(col("product_id")=="NULL",None).otherwise(col("product_id"))) \
         .withColumn("brand", when(col("brand")=="NULL",None).otherwise(col("brand"))) \
         .withColumn("price", when(col("price")=="NULL",None).otherwise(col("price")))\
         .withColumn("category_code",when(col("category_code")=="NULL",None).otherwise(col("category_code")))

# Drop duplicates
df = df.dropDuplicates()

# Filter invalid product_id
df = df.filter(col("product_id").isNotNull())

# Fill missing prices with mean
mean_price = df.select(mean("price")).collect()[0][0]

df_clean = df.withColumn(
    "price",
    when(col("price").isNull(), mean_price).otherwise(col("price"))
)

df_clean.show(10)

In [None]:
df_fe = (df_clean
    .withColumn("event_date", F.to_date("event_time"))
    .withColumn("event_month",F.month("event_time"))
    .withColumn("event_hour", F.hour("event_time"))
    .withColumn("category_main",
                F.when(F.col("category_code").isNotNull(),
                       F.split("category_code", "\\.")[0])
                .otherwise(None))
)
df_fe.show(5)

In [None]:
events_dist = df_fe.groupBy("event_type").count()
events_dist.show()

In [None]:
top_viewed_products = (df_fe
    .filter(F.col("event_type") == "view")
    .groupBy(["product_id","category_main"])
    .count()
    .orderBy(F.desc("count"))
)

top_viewed_products.show(10)


                                                                                

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F

# 1️⃣ Clean & cast price to double
df_fe = df_fe.withColumn(
    "price",
    F.regexp_replace("price", ",", "")  # remove commas if any
     .cast(DoubleType())
)

# 2️⃣ Optional: remove duplicates based on user_session + product_id + event_time
df_fe = df_fe.dropDuplicates(["user_session", "product_id", "event_time"])

# 3️⃣ Filter for valid purchases
df_purchase = df_fe.filter(
    (F.col("event_type") == "purchase") &
    (F.col("brand").isNotNull()) &
    (F.col("price").isNotNull()) &
    (F.col("price") > 0)
)

# 4️⃣ Aggregate revenue by brand
brand_revenue = (
    df_purchase
    .groupBy("brand")
    .agg(F.round(F.sum("price"),2).alias("total_revenue"))
    .orderBy(F.desc("total_revenue"))
)

# 5️⃣ Show top 10 brands by revenue
brand_revenue.show(10, truncate=False)

# 6️⃣ Check schema (optional)
brand_revenue.printSchema()


In [None]:

hourly_traffic = (
    df_fe
    .groupBy("event_hour")   # only by hour
    .count()                 # count all events in that hour
    .orderBy("event_hour")   # order by hour
)

hourly_traffic.show(24)


In [None]:
hourly_revenue = (
    df_fe
    .filter(F.col("event_type") == "purchase")
    .groupBy("event_hour")
    .agg(
        F.count("*").alias("num_purchases"),
        F.round(F.sum("price"),2).alias("total_revenue")
    )
    .orderBy("event_hour")
)

hourly_revenue.show(24)
