# Testing and Debugging on Sampled dataset
This notebook is used to test and debug the code on a sample of the whole dataset for faster execution, before production code is run on the whole dataset.

In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner

import java.io.{File, PrintWriter}
import java.util.Calendar

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner
import java.io.{File, PrintWriter}
import java.util.Calendar


In [2]:
val spark = SparkSession.builder()
.appName("EcommerceDataAnalysis")
.getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5349c998


In [None]:
# val sc = spark.SparkContext

In [None]:
def logJobExecution(startTime: Long, jobName: String, outputPath: String): Unit = {
    """
    Logs the execution time of a job.

    Args:
        startTime (Long): The start time of the job in milliseconds.
        jobName (String): The name of the job being executed.
        outputPath (String): The path where the job's output is stored.
    """
    val endTime = System.currentTimeMillis()
    val duration = endTime - startTime
    val dateTime = Calendar.getInstance().getTime()
    println(s"\n[$dateTime] Job '$jobName' completed in $duration ms.")

    val writer = new PrintWriter(new File(outputPath))
    writer.write(s"[$dateTime] Job '$jobName' completed in $duration ms.\n")
    writer.close()
}

In [None]:
def parseDataset(row: String) = {
    """
    Parses a dataset row into individual components.
    Args:
        row (String): A comma-separated string representing a row of the dataset.
    Returns:
        tuple: A tuple containing the parsed values:
            - event_time (str): The time of the event.
            - event_type (str): The type of the event.
            - product_id (long): The ID of the product.
            - category_id (long): The ID of the category.
            - category_code (str): The code of the category.
            - brand (str): The brand of the product.
            - price (double): The price of the product.
            - user_id (long): The ID of the user.
            - user_session_id (str): The session ID of the user.
            - session_duration (int): The duration of the session.
            - visit_count (int): The count of visits.
            - age (int): The age of the user.
            - gender (str): The gender of the user.
    If an error occurs during parsing, the function prints an error message and returns a tuple with default values:
        (None, None, None, None, None, None, 0.0, None, None, 0, 0, 0, None)
    """
    val columns = row.split(",")
    try {
        val event_time = columns(0)
        val event_type = columns(1)
        val product_id = columns(2).toLong
        val category_id = columns(3).toLong
        val category_code = columns(4)
        val brand = columns(5)
        val price = columns(6).toDouble
        val user_id = columns(7).toLong
        val user_session_id = columns(8)
        val session_duration = columns(9).toInt
        val visit_count = columns(10).toInt
        val age = columns(11).toInt
        val gender = columns(12)

        (event_time, event_type, product_id, category_id, category_code, brand, price, user_id, user_session_id, session_duration, visit_count, age, gender)
    } catch {
        case e: Exception =>
        println(s"Error parsing row: $row")
        (null, null, null, null, null, null, 0.0, null, null, 0, 0, 0, null)
    }
}

When parsing the two dataset files, we remove the header and any rows with missing values.

In [5]:
val rddOctRaw = sc.textFile("../../../datasets/enriched_2019-Oct-sample.csv")
val headerOct = rddOctRaw.first()
val rddOctAll = rddOctRaw.filter(_ != headerOct).map(x => parseDataset(x))
val rddOct = rddOctAll.filter { rec =>
    rec._1 != "" && rec._2 != "" && rec._3 != "" && rec._4 != "" && rec._5 != "" && rec._6 != "" && rec._7 != 0.0 &&
    rec._8 != "" && rec._9 != "" && rec._10 != 0 && rec._11 != 0 && rec._12 != 0 && rec._13 != ""
}

val rddNovRaw = sc.textFile("../../../datasets/enriched_2019-Nov-sample.csv")
val headerNov = rddNovRaw.first()
val rddNovAll = rddNovRaw.filter(_ != headerNov).map(x => parseDataset(x))
val rddNov = rddNovAll.filter { rec =>
    rec._1 != "" && rec._2 != "" && rec._3 != "" && rec._4 != "" && rec._5 != "" && rec._6 != "" && rec._7 != 0.0 &&
    rec._8 != "" && rec._9 != "" && rec._10 != 0 && rec._11 != 0 && rec._12 != 0 && rec._13 != ""
}

rddOctRaw: org.apache.spark.rdd.RDD[String] = ../../../datasets/enriched_2019-Oct-sample.csv MapPartitionsRDD[1] at textFile at <console>:31
headerOct: String = event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,session_duration,visit_count,age,gender
rddOctAll: org.apache.spark.rdd.RDD[(String, String, Any, Any, String, String, Double, Any, String, Int, Int, Int, String)] = MapPartitionsRDD[3] at map at <console>:33
rddOct: org.apache.spark.rdd.RDD[(String, String, Any, Any, String, String, Double, Any, String, Int, Int, Int, String)] = MapPartitionsRDD[4] at filter at <console>:34
rddNovRaw: org.apache.spark.rdd.RDD[String] = ../../../datasets/enriched_2019-Nov-sample.csv MapPartitionsRDD[6] at textFile at <console>:39
headerNov: String = ...


In [6]:
rddOct.take(5).foreach(println)

(2019-10-28 16:42:04 UTC,view,3700869,2053013565983425517,appliances.environment.vacuum,samsung,89.78,513466365,9c2f486c-b182-47b4-9881-4e7ff7c719ca,258,1,26,Female)
(2019-10-27 11:56:21 UTC,view,28716595,2053013565782098913,apparel.shoes,baden,53.8,560216021,3cdc94bf-2329-445d-a8ae-f09080880474,122,2,29,Female)
(2019-10-26 03:59:54 UTC,view,13200145,2053013557192163841,furniture.bedroom.bed,brw,217.25,535492001,d206fdc5-cb78-4999-9279-d46e116f3049,137,1,19,Female)
(2019-10-14 04:54:57 UTC,view,1005105,2053013555631882655,electronics.smartphone,apple,1428.31,513265089,62067164-2a1a-4446-b176-b9c49893effc,215,2,25,Male)
(2019-10-30 03:27:24 UTC,view,1004428,2053013555631882655,electronics.smartphone,huawei,785.09,565610507,40b5d006-4a67-43ba-88ac-610c86b38d1f,270,1,35,Female)


## Non-Optimized version

In [10]:
val jobStartTime = System.currentTimeMillis()
println(s"[${Calendar.getInstance().getTime()}] Starting job: Non-optimized version")

// --- Top Products by Total Revenue generated ---
// The October and November data are filtered to include only "purchase" events. For each product, the total revenue is calculated by summing the revenue for each product ID.

val totalRevenueOct = rddOct.filter(rec => rec._2 == "purchase")
.map(rec => (rec._3, (rec._7, rec._5, rec._6)))
.reduceByKey((a, b) => (a._1 + b._1, a._2, a._3))

val topRevenueOct = totalRevenueOct
.map { case (productId, (revenue, categoryCode, brand)) => (revenue, productId, categoryCode, brand) }
.sortBy(r => r._1, ascending = false)

val leastRevenueOct = totalRevenueOct
.map { case (productId, (revenue, categoryCode, brand)) => (revenue, productId, categoryCode, brand) }
.sortBy(r => r._1, ascending = true)

val totalRevenueNov = rddNov.filter(rec => rec._2 == "purchase")
.map(rec => (rec._3, (rec._7, rec._5, rec._6)))
.reduceByKey((a, b) => (a._1 + b._1, a._2, a._3))

val topRevenueNov = totalRevenueNov
.map { case (productId, (revenue, categoryCode, brand)) => (revenue, productId, categoryCode, brand) }
.sortBy(r => r._1, ascending = false)

val leastRevenueNov = totalRevenueNov
.map { case (productId, (revenue, categoryCode, brand)) => (revenue, productId, categoryCode, brand) }
.sortBy(r => r._1, ascending = true)

 // --- Sorting Products by Conversion Rates ---
 // The data is filtered for "purchase" or "view" events. A map operation is applied to count the purchases and views for each product. Conversion rates are calculated by dividing the number of purchases by views.

val conversionRateOct = rddOct.filter(rec => rec._2 == "purchase" || rec._2 == "view")
                                .map{ rec =>
                                    val purchaseCount = if (rec._2 == "purchase") 1 else 0
                                    val viewCount = if (rec._2 == "view") 1 else 0
                                    (rec._3, (purchaseCount, viewCount, rec._5, rec._6))
                                }
                                .reduceByKey((a, b) =>  (a._1 + b._1, a._2 + b._2, a._3, a._4))
                                .mapValues { case (purchaseCount, viewCount, categoryCode, brand) =>
                                    val conversionRate = if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                                    (conversionRate, categoryCode, brand)
                                }

val topConversionRateOct = conversionRateOct
                                .map { case (productId, (conversionRate, categoryCode, brand)) => (conversionRate, productId, categoryCode, brand) }
                                .sortBy(r => r._1, ascending = false)

val bottomConversionRateOct = conversionRateOct
                                .map { case (productId, (conversionRate, categoryCode, brand)) => (conversionRate, productId, categoryCode, brand) }
                                .sortBy(r => r._1, ascending = true)

val conversionRateNov = rddNov.filter(rec => rec._2 == "purchase" || rec._2 == "view")
                                .map { rec =>
                                    val purchaseCount = if (rec._2 == "purchase") 1 else 0
                                    val viewCount = if (rec._2 == "view") 1 else 0
                                    (rec._3, (purchaseCount, viewCount, rec._5, rec._6))
                                }
                                .reduceByKey ((a, b) => (a._1 + b._1, a._2 + b._2, a._3, a._4))
                                .mapValues { case (purchaseCount, viewCount, categoryCode, brand) =>
                                    val conversionRate = if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                                    (conversionRate, categoryCode, brand)
                                }

val topConversionRateNov = conversionRateNov
                            .map { case (productId, (conversionRate, categoryCode, brand)) =>
                                (conversionRate, productId, categoryCode, brand)
                            }
                            .sortBy(r => r._1, ascending = false)

val bottomConversionRateNov = conversionRateNov
                                .map { case (productId, (conversionRate, categoryCode, brand)) =>
                                    (conversionRate, productId, categoryCode, brand)
                                }
                                .sortBy(r => r._1, ascending = true)

// --- Average Visit Count per User ---
// For each user, the total number of visits is calculated and averaged by user for both October and November.

val avgVisitCountOct = rddOct
                        .map(rec => (rec._8, rec._10))
                        .groupByKey()
                        .mapValues(visitCounts => visitCounts.sum.toDouble / visitCounts.size)

val avgVisitCountNov = rddNov
                        .map(rec => (rec._8, rec._10))
                        .groupByKey()
                        .mapValues(visitCounts => visitCounts.sum.toDouble / visitCounts.size)

// Average Visit Count across all Users ---
// The total number of visits per user is summed and divided by the total number of distinct users to calculate the average visit count across all users for both October and November.

val totalVisitCountOct = rddOct.filter(rec => rec._2 == "view")
                                .map(rec => (rec._8, rec._10))

val totalVisitCountNov = rddNov.filter(rec => rec._2 == "view")
                                .map(rec => (rec._8, rec._10))

val totalUsers = combinedRDD.map(rec => rec._8)
                            .distinct()
                            .count()

val totalAvgVisitCountOct = if (totalUsers > 0) totalVisitCountOct.map { case (_, visits) => visits }.sum() / totalUsers else 0.0

val totalAvgVisitCountNov = if (totalUsers > 0) totalVisitCountNov.map { case (_, visits) => visits }.sum() / totalUsers else 0.0

// --- Conversion Rate by Age Group ---
// Data is grouped by age ranges, and for each group, we cound the number of purchases and views. Conversion rates are calculated for each age group.

val ageGroupsOct = rddOct
                    .map { rec => val ageGroup = rec._12 match {
                        case age if age >= 18 && age <= 24 => "18-24"
                        case age if age >= 25 && age <= 34 => "25-34"
                        case age if age >= 35 && age <= 44 => "35-44"
                        case age if age >= 45 && age <= 54 => "45-54"
                        case age if age >= 55 && age <= 64 => "55-64"
                        case age if age >= 65 => "65+"
                        case _ => "Unknown"
                        }
                        (ageGroup, (if (rec._2 == "purchase") 1 else 0, if (rec._2 == "view") 1 else 0))
                    }
                    .reduceByKey ( (a, b) => (a._1 + b._1, a._2 + b._2) )
                    .mapValues { case (purchaseCount, viewCount) =>
                        if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                    }

val ageGroupsNov = rddNov
                    .map { rec => val ageGroup = rec._12 match {
                        case age if age >= 18 && age <= 24 => "18-24"
                        case age if age >= 25 && age <= 34 => "25-34"
                        case age if age >= 35 && age <= 44 => "35-44"
                        case age if age >= 45 && age <= 54 => "45-54"
                        case age if age >= 55 && age <= 64 => "55-64"
                        case age if age >= 65 => "65+"
                        case _ => "Unknown"
                        }
                        (ageGroup, (if (rec._2 == "purchase") 1 else 0, if (rec._2 == "view") 1 else 0))
                    }
                    .reduceByKey ( (a, b) => (a._1 + b._1, a._2 + b._2) )
                    .mapValues { case (purchaseCount, viewCount) =>
                        if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                    }

// --- Conversion Rate by Gender ---
// Data is grouped by gender, and for each gender, we count the number of purchases and views. Conversion rates are calculated for each gender group.

val genderGroupsOct = rddOct
                        .map ( rec => (rec._13, (if (rec._2 == "purchase") 1 else 0, if (rec._2 == "view") 1 else 0)) )
                        .reduceByKey ( (a, b) => (a._1 + b._1, a._2 + b._2) )
                        .mapValues { case (purchaseCount, viewCount) =>
                            if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                        }

val genderGroupsNov = rddNov
                        .map ( rec => (rec._13, (if (rec._2 == "purchase") 1 else 0, if (rec._2 == "view") 1 else 0)) )
                        .reduceByKey ( (a, b) => (a._1 + b._1, a._2 + b._2) )
                        .mapValues { case (purchaseCount, viewCount) =>
                            if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                        }

// --- User Retention Rate ---
// The retention rate is calculated as the ratio of returning users (users with more than one view) to total users for both October and November.

val userViewsOct = rddOct.filter(rec => rec._2 == "view")
                            .map(rec => (rec._8, 1))
                            .reduceByKey(_ + _)

val totalUsersOct = rddOct
                        .map(rec => rec._8)
                        .distinct()
                        .count()

 val returningUsersOct = userViewsOct
                        .filter { case (_, viewCount) => viewCount > 1 }
                        .count()

val retentionRateOct = if (totalUsersOct > 0) returningUsersOct.toDouble / totalUsersOct else 0.0

val userViewsNov = rddNov.filter(rec => rec._2 == "view")
                            .map(rec => (rec._8, 1))
                            .reduceByKey(_ + _)

val totalUsersNov = rddNov
                        .map(rec => rec._8)
                        .distinct()
                        .count()

val returningUsersNov = userViewsNov
                        .filter { case (_, viewCount) => viewCount > 1 }
                        .count()

val retentionRateNov = if (totalUsersNov > 0) returningUsersNov.toDouble / totalUsersNov else 0.0

// --- Printing Results ---
println("\n [LOG] Top 5 total revenue products (October):")
topRevenueOct.take(5).foreach { case (revenue, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Revenue: $revenue, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Top 5 total revenue products (November):")
topRevenueNov.take(5).foreach { case (revenue, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Revenue: $revenue, Category Code: $categoryCode, Brand: $brand")
}

println("\n [LOG] Bottom 5 total revenue products (October):")
leastRevenueOct.take(5).foreach { case (revenue, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Revenue: $revenue, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Bottom 5 total revenue products (November):")
leastRevenueNov.take(5).foreach { case (revenue, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Revenue: $revenue, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Top 5 products by conversion rates (October):")
topConversionRateOct.take(5).foreach { case (conversionRate, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Conversion Rate: $conversionRate, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Top 5 products by conversion rates (November):")
topConversionRateNov.take(5).foreach { case (conversionRate, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Conversion Rate: $conversionRate, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Bottom 5 products by conversion rates (October):")
bottomConversionRateOct.take(5).foreach { case (conversionRate, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Conversion Rate: $conversionRate, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Bottom 5 products by conversion rates (November):")
bottomConversionRateNov.take(5).foreach { case (conversionRate, productId, categoryCode, brand) =>
println(s"Product ID: $productId, Conversion Rate: $conversionRate, Category Code: $categoryCode, Brand: $brand")
}

println("\n[LOG] Average Visit Count per User (October):")
avgVisitCountOct.take(5).foreach { case (userId, avgVisitCount) =>
println(s"User ID: $userId, Avg Visit Count: $avgVisitCount")
}

println("\n[LOG] Average Visit Count per User (November):")
avgVisitCountNov.take(5).foreach { case (userId, avgVisitCount) =>
println(s"User ID: $userId, Avg Visit Count: $avgVisitCount")
}

println(s"\n[LOG] Average Visit Count Across All Users (October): $totalAvgVisitCountOct")

println(s"\n[LOG] Average Visit Count Across All Users (November): $totalAvgVisitCountNov")

println("\n[LOG] Conversion Rate by Age Group (October):")
ageGroupsOct.collect().foreach { case (ageGroup, conversionRate) =>
println(s"Age Group: $ageGroup, Conversion Rate: $conversionRate")
}

println("\n[LOG] Conversion Rate by Age Group (November):")
ageGroupsNov.collect().foreach { case (ageGroup, conversionRate) =>
println(s"Age Group: $ageGroup, Conversion Rate: $conversionRate")
}

println("\n[LOG] Conversion Rate by Gender (October):")
genderGroupsOct.collect().foreach { case (gender, conversionRate) =>
println(s"Gender: $gender, Conversion Rate: $conversionRate")
}

println("\n[LOG] Conversion Rate by Gender (November):")
genderGroupsNov.collect().foreach { case (gender, conversionRate) =>
println(s"Gender: $gender, Conversion Rate: $conversionRate")
}

println(s"\n[LOG] User Retention Rate (October): $retentionRateOct")

println(s"\n[LOG] User Retention Rate (November): $retentionRateNov")

// --- Logging execution time
logJobExecution(jobStartTime, "Non-optimized Version", "../../outputs/execution_time_non_optimized_version_local.txt")

[Mon Jan 20 16:35:47 CET 2025] Starting job: Non-optimized version

 [LOG] Top 5 total revenue products (October):
Product ID: 1005115, Revenue: 618101.6799999996, Category Code: electronics.smartphone, Brand: apple
Product ID: 1005105, Revenue: 523865.86000000016, Category Code: electronics.smartphone, Brand: apple
Product ID: 1004249, Revenue: 314269.08, Category Code: electronics.smartphone, Brand: apple
Product ID: 1005135, Revenue: 280159.98000000004, Category Code: electronics.smartphone, Brand: apple
Product ID: 1004767, Revenue: 260926.47000000003, Category Code: electronics.smartphone, Brand: samsung

[LOG] Top 5 total revenue products (November):
Product ID: 1005115, Revenue: 832369.2100000004, Category Code: electronics.smartphone, Brand: apple
Product ID: 1005105, Revenue: 465145.2199999999, Category Code: electronics.smartphone, Brand: apple
Product ID: 1004249, Revenue: 293418.78, Category Code: electronics.smartphone, Brand: apple
Product ID: 1005135, Revenue:

jobStartTime: Long = 1737387347162
totalRevenueOct: org.apache.spark.rdd.RDD[(Any, (Double, String, String))] = ShuffledRDD[215] at reduceByKey at <console>:44
topRevenueOct: org.apache.spark.rdd.RDD[(Double, Any, String, String)] = MapPartitionsRDD[221] at sortBy at <console>:48
leastRevenueOct: org.apache.spark.rdd.RDD[(Double, Any, String, String)] = MapPartitionsRDD[227] at sortBy at <console>:52
totalRevenueNov: org.apache.spark.rdd.RDD[(Any, (Double, String, String))] = ShuffledRDD[230] at reduceByKey at <console>:56
topRevenueNov: org.apache.spark.rdd.RDD[(Double, Any, String, String)] = MapPartitionsRDD[236] at sortBy at <console>:60
leastRevenueNov: org.apache.spark.rdd.RDD[(Double, Any, String, String)] = MapPartitionsRDD[242] at sortBy at <console>:64
conversionRateOct...


## Optimized version

For the optimized version, the two datasets are combined through the `union` function, so to distinguish between the two datasets, we add a column `month` with values `October` and `November` respectively.

In [6]:
val rddOctWithMonth = rddOct.map(rec => (rec._1, rec._2, rec._3, rec._4, rec._5, rec._6, rec._7, rec._8, rec._9, rec._10, rec._11, rec._12, rec._13, "October"))
val rddNovWithMonth = rddNov.map(rec => (rec._1, rec._2, rec._3, rec._4, rec._5, rec._6, rec._7, rec._8, rec._9, rec._10, rec._11, rec._12, rec._13, "November"))

rddOctWithMonth: org.apache.spark.rdd.RDD[(String, String, Any, Any, String, String, Double, Any, String, Int, Int, Int, String, String)] = MapPartitionsRDD[10] at map at <console>:31
rddNovWithMonth: org.apache.spark.rdd.RDD[(String, String, Any, Any, String, String, Double, Any, String, Int, Int, Int, String, String)] = MapPartitionsRDD[11] at map at <console>:32


In [17]:
rddOctWithMonth.take(5).foreach(println)

(2019-10-28 16:42:04 UTC,view,3700869,2053013565983425517,appliances.environment.vacuum,samsung,89.78,513466365,9c2f486c-b182-47b4-9881-4e7ff7c719ca,258,1,26,Female,October)
(2019-10-27 11:56:21 UTC,view,28716595,2053013565782098913,apparel.shoes,baden,53.8,560216021,3cdc94bf-2329-445d-a8ae-f09080880474,122,2,29,Female,October)
(2019-10-26 03:59:54 UTC,view,13200145,2053013557192163841,furniture.bedroom.bed,brw,217.25,535492001,d206fdc5-cb78-4999-9279-d46e116f3049,137,1,19,Female,October)
(2019-10-14 04:54:57 UTC,view,1005105,2053013555631882655,electronics.smartphone,apple,1428.31,513265089,62067164-2a1a-4446-b176-b9c49893effc,215,2,25,Male,October)
(2019-10-30 03:27:24 UTC,view,1004428,2053013555631882655,electronics.smartphone,huawei,785.09,565610507,40b5d006-4a67-43ba-88ac-610c86b38d1f,270,1,35,Female,October)


In [11]:
val jobStartTime = System.currentTimeMillis()
println(s"[${Calendar.getInstance().getTime()}] Starting job: Optimized version")

val combinedRDD = rddOctWithMonth.union(rddNovWithMonth).cache()

// --- Top Products by Total Revenue generated ---

val totalRevenueCombined = combinedRDD.filter(rec => rec._2 == "purchase")
                                        .map(rec => (rec._3, (rec._7, rec._5, rec._6, rec._14)))
                                        .reduceByKey((a, b) => (a._1 + b._1, a._2, a._3, a._4))

val sortedByRevenue = totalRevenueCombined
                        .map { case (productId, (revenue, categoryCode, brand, month)) => (revenue, productId, categoryCode, brand, month) }
                        .sortBy(r => r._1, ascending = false)

// --- Conversion rates (Products) ---
val conversionRates = combinedRDD.filter(rec => rec._2 == "purchase" || rec._2 == "view")
                                    .map { rec =>
                                    val purchaseCount = if (rec._2 == "purchase") 1 else 0
                                    val viewCount = if (rec._2 == "view") 1 else 0
                                        (rec._3, (purchaseCount, viewCount, rec._5, rec._6, rec._14))
                                    }
                                    .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2, a._3, a._4, a._5))
                                    .mapValues { case (purchaseCount, viewCount, categoryCode, brand, month) =>
                                    val conversionRate = if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                                        (conversionRate, categoryCode, brand, month)
                                    }

val sortedConversionRates = conversionRates
                            .map { case (productId, (conversionRate, categoryCode, brand, month)) => (conversionRate, productId, categoryCode, brand, month) }
                            .sortBy(r => r._1, ascending = false)

// userVisitCounts is used both for calculating the average visit count per user and the total average visit count across all users.
val userVisitCounts = combinedRDD.filter(rec => rec._2 == "view")
                                    .map(rec => (rec._8, rec._10))
                                    .combineByKey(
                                        visitCount => (visitCount, 1),
                                        (acc: (Int, Int), visitCount: Int) => (acc._1 + visitCount, acc._2 + 1),
                                        (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
                                    )

// --- Average Visit Count per User ---
val avgVisitCount = userVisitCounts
                        .mapValues { case (sum, count) => sum.toDouble / count }

// --- Average Visit Count Across All Users ---
val totalAvgVisitCount = userVisitCounts
                            .mapValues { case (totalVisitCount, userCount) => totalVisitCount.toDouble / userCount }
                            .map { case (_, avg) => avg }
                            .mean()

// --- Conversion Rate by Age Group  ---
val ageGroups = combinedRDD
                .map { rec =>
                    val ageGroup = rec._12 match {
                        case age if age >= 18 && age <= 24 => "18-24"
                        case age if age >= 25 && age <= 34 => "25-34"
                        case age if age >= 35 && age <= 44 => "35-44"
                        case age if age >= 45 && age <= 54 => "45-54"
                        case age if age >= 55 && age <= 64 => "55-64"
                        case age if age >= 65 => "65+"
                        case _ => "Unknown"
                    }
                    (ageGroup, (if (rec._2 == "purchase") 1 else 0, if (rec._2 == "view") 1 else 0))
                }
                .reduceByKey { (a, b) =>
                (a._1 + b._1, a._2 + b._2)
                }
                .mapValues { case (purchaseCount, viewCount) =>
                    if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                }

// --- Conversion Rate by Gender ---
val genderGroups = combinedRDD
                    .map { rec => (rec._13, (if (rec._2 == "purchase") 1 else 0, if (rec._2 == "view") 1 else 0)) }
                    .reduceByKey { (a, b) => (a._1 + b._1, a._2 + b._2) }
                    .mapValues { case (purchaseCount, viewCount) =>
                        if (viewCount > 0) purchaseCount.toDouble / viewCount else 0.0
                    }

// --- User Retention Rate ---
val userViews = combinedRDD.filter(rec => rec._2 == "view")
                            .map(rec => (rec._8, 1))
                            .reduceByKey(_ + _)

val totalUsers = combinedRDD.map(rec => rec._8)
                            .distinct()
                            .count()

val returningUsers = userViews.filter { case (_, viewCount) => viewCount > 1 }
                                .count()

val retentionRate = if (totalUsers > 0) returningUsers.toDouble / totalUsers else 0.0

// --- Printing Results ---
println("\n[LOG] Top 5 Total Revenue Products (Combined):")
sortedByRevenue.sortBy(r => r._1, ascending = false).take(5).foreach { case (revenue, productId, categoryCode, brand, month) =>
println(s"Product ID: $productId, Revenue: $revenue, Category Code: $categoryCode, Brand: $brand, Month: $month")
}

println("\n[LOG] Least 5 Total Revenue Products (Combined):")
sortedByRevenue.sortBy(r => r._1, ascending = true).take(5).foreach { case (revenue, productId, categoryCode, brand, month) =>
println(s"Product ID: $productId, Revenue: $revenue, Category Code: $categoryCode, Brand: $brand, Month: $month")
}

println("\n[LOG] Top 5 products by conversion rates (Combined):")
sortedConversionRates.sortBy(r => r._1, ascending = false).take(5).foreach { case (conversionRate, productId, categoryCode, brand, month) =>
println(s"Product ID: $productId, Conversion Rate: $conversionRate, Category Code: $categoryCode, Brand: $brand, Month: $month")
}

println("\n[LOG] Bottom 5 products by conversion rates (Combined):")
sortedConversionRates.sortBy(r => r._1, ascending = true).take(5).foreach { case (conversionRate, productId, categoryCode, brand, month) =>
println(s"Product ID: $productId, Conversion Rate: $conversionRate, Category Code: $categoryCode, Brand: $brand, Month: $month")
}

println("\n[LOG] Average Visit Count per User (Combined):")
avgVisitCount.take(5).foreach { case (userId, avgVisitCount) =>
println(s"User ID: $userId, Avg Visit Count: $avgVisitCount")
}

println(s"\n[LOG] Average Visit Count Across All Users (Optimized): $totalAvgVisitCount")

println("\n[LOG] Conversion Rate by Age Group (Combined):")
ageGroups.collect().foreach { case (ageGroup, conversionRate) =>
println(s"Age Group: $ageGroup, Conversion Rate: $conversionRate")
}

println("\n[LOG] Conversion Rate by Gender (Combined):")
genderGroups.collect().foreach { case (gender, conversionRate) =>
println(s"Gender: $gender, Conversion Rate: $conversionRate")
}

println(s"\n[LOG] User Retention Rate: $retentionRate")

// --- Logging execution time
logJobExecution(jobStartTime, "Optimized Version", "../../outputs/execution_time_optimized_version_local.txt")

[Mon Jan 20 16:39:32 CET 2025] Starting job: Optimized version

[LOG] Top 5 Total Revenue Products (Combined):
Product ID: 1005115, Revenue: 1450470.89, Category Code: electronics.smartphone, Brand: apple, Month: October
Product ID: 1005105, Revenue: 989011.0800000002, Category Code: electronics.smartphone, Brand: apple, Month: October
Product ID: 1004249, Revenue: 607687.86, Category Code: electronics.smartphone, Brand: apple, Month: October
Product ID: 1005135, Revenue: 564573.6900000001, Category Code: electronics.smartphone, Brand: apple, Month: October
Product ID: 1004767, Revenue: 483508.5600000001, Category Code: electronics.smartphone, Brand: samsung, Month: October

[LOG] Least 5 Total Revenue Products (Combined):
Product ID: 4804536, Revenue: 0.88, Category Code: electronics.audio.headphone, Brand: ritmix, Month: October
Product ID: 4804539, Revenue: 0.88, Category Code: electronics.audio.headphone, Brand: ritmix, Month: November
Product ID: 4803175, Revenue: 1.0, C

jobStartTime: Long = 1737387572370
combinedRDD: org.apache.spark.rdd.RDD[(String, String, Any, Any, String, String, Double, Any, String, Int, Int, Int, String, String)] = UnionRDD[321] at union at <console>:50
totalRevenueCombined: org.apache.spark.rdd.RDD[(Any, (Double, String, String, String))] = ShuffledRDD[324] at reduceByKey at <console>:56
sortedByRevenue: org.apache.spark.rdd.RDD[(Double, Any, String, String, String)] = MapPartitionsRDD[330] at sortBy at <console>:60
conversionRates: org.apache.spark.rdd.RDD[(Any, (Double, String, String, String))] = MapPartitionsRDD[334] at mapValues at <console>:70
sortedConversionRates: org.apache.spark.rdd.RDD[(Double, Any, String, String, String)] = MapPartitionsRDD[340] at sortBy at <console>:77
userVisitCounts: org.apache.spark.rdd.R...
