# Simulated transactions analysis - Main Job

In [8]:
import org.apache.spark

import org.apache.spark


In [2]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner

import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner


In [3]:
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit


## Case classes 

After making imports, Transaction and CustomerSpending case classes are instanciated, in order to maintain a more readable code. 

In [6]:
// case class definition for data clarity

case class Transaction(
  custId: String,
  startDate: String,
  endDate: String,
  transId: String,
  date: LocalDate,
  year: Int,
  month: Int,
  day: Int,
  expType: String,
  amount: Double
)

case class CustomerSpending(totalSpend: Double, spendingClass: String)

defined class Transaction
defined class CustomerSpending


In [22]:
// rdd mapping with created custom case classes 

val rdd = sc.textFile("sample_data_big_data.csv")
val header = rdd.first()
val dataRdd = rdd.filter(row => row != header)


val transactionsRdd = dataRdd.map { line =>
    val fields = line.split(",")
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    Transaction(
        custId   = fields(0),
        startDate = fields(1),
        endDate   = fields(2),
        transId   = fields(3),
        date      = LocalDate.parse(fields(4), formatter),
        year      = fields(5).toInt,
        month     = fields(6).toInt,
        day       = fields(7).toInt,
        expType   = fields(8),
        amount    = fields(9).toDouble
    )
}

rdd: org.apache.spark.rdd.RDD[String] = sample_data_big_data.csv MapPartitionsRDD[31] at textFile at <console>:39
header: String = CUST_ID,START_DATE,END_DATE,TRANS_ID,DATE,YEAR,MONTH,DAY,EXP_TYPE,AMOUNT
dataRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at filter at <console>:41
transactionsRdd: org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[33] at map at <console>:44


## Classified spendings

First thing first, it's needed to understand every single customer spends, in order to obtain the spending class.
After that, a join is made between transactions and classified spendings, in order to obtain a more complete dataset.

In [29]:
// total spendings per customer
val customerSpendingsRdd = transactionsRdd
  .map(t => (t.custId, t.amount))
  .reduceByKey(_ + _)

// customer spendings classification
val classifiedSpendingsRdd = customerSpendingsRdd.map { case (custId, totalSpend) =>
  val spendingClass = if (totalSpend < 1000.0) "Poor"
                      else if (totalSpend < 10000.0) "Middle"
                      else "Rich"
  (custId, CustomerSpending(totalSpend, spendingClass))
}

// inner join transactions with classified spendings
val transactionsJoinedRdd = transactionsRdd
  .map(t => (t.custId, t))
  .join(classifiedSpendingsRdd)  // (custId, (Transaction, CustomerSpending))
  .map { case (custId, (transaction, custSpending)) =>
    (transaction.custId, transaction.transId, custSpending.spendingClass, transaction.year, transaction.expType, transaction.amount)
  }

customerSpendingsRdd: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[51] at reduceByKey at <console>:39
classifiedSpendingsRdd: org.apache.spark.rdd.RDD[(String, CustomerSpending)] = MapPartitionsRDD[52] at map at <console>:42
transactionsJoinedRdd: org.apache.spark.rdd.RDD[(String, String, String, Int, String, Double)] = MapPartitionsRDD[57] at map at <console>:53


### Spending Analysis by Category and Year

This job aims to aggregate customer transactions based on **spending class** (`spendingClass`) and **year** (`year`), computing key metrics for each combination.

#### **Main Steps**:
1. **Transaction Aggregation**  
   - Transactions are grouped by (`spendingClass`, `year`).  
   - The total spending amount (`totalAmount`) is computed.  
   - The unique set of customers who made purchases (`custSet`) is collected.  

2. **Metric Calculation**  
   - The total number of customers (`customerCount`) for each (`spendingClass`, `year`).  
   - The average spending per customer (`avgSpend = totalAmount / customerCount`).

In [30]:
// criteria: (spending class, year)

// Mapping: ((spendingClass, year), (amount, Set(customerId)))
val aggregatedByCategoryYear = transactionsJoinedRdd
  .map { case (custId, transId, spendingClass, year, expType, amount) =>
    ((spendingClass, year), (amount, Set(custId)))
  }
  .reduceByKey { case ((amount1, custSet1), (amount2, custSet2)) =>
    (amount1 + amount2, custSet1 ++ custSet2)
  }

val metricsByCategoryYear = aggregatedByCategoryYear.map { case ((spendingClass, year), (totalAmount, custSet)) =>
  val customerCount = custSet.size
  val avgSpend = totalAmount / customerCount
  ((spendingClass, year), (totalAmount, customerCount, avgSpend))
}

aggregatedByCategoryYear: org.apache.spark.rdd.RDD[((String, Int), (Double, scala.collection.immutable.Set[String]))] = ShuffledRDD[59] at reduceByKey at <console>:40
metricsByCategoryYear: org.apache.spark.rdd.RDD[((String, Int), (Double, Int, Double))] = MapPartitionsRDD[60] at map at <console>:44


## Spending trends

Previous calculated metrics can be useful, and I choose to use them in order to calculate spending trends, based on growth rate and retention rate.

#### **Main Steps**:  
1. **Grouping Data by Spending Class**:  
   - We first map the dataset to `(spendingClass, (year, totalAmount, customerCount, avgSpend))` to enable time-based trend analysis.  
   - Then, we use `groupByKey()` to aggregate all years for each `spendingClass`.  

2. **Sorting by Year**:  
   - Inside each spending class group, we sort data by `year` to track changes over time.  

3. **Calculating Growth Rate and Retention Rate**:  
   - We use a **sliding window of size 2** to compare consecutive years.  
   - **Growth Rate** is computed as the percentage change in `avgSpend` from the previous year.  
   - **Retention Rate** is calculated as the percentage of customers retained from the previous year.  
   - If there is no previous year, the values default to `0.0%` growth and `100%` retention.  

4. **Flattening the Output**:  
   - We return a structured dataset `(spendingClass, year, totalAmount, customerCount, avgSpend, growthRate, retentionRate)`, which can be further analyzed or visualized.  


In [31]:
val spendingTrends = metricsByCategoryYear
  .map { case ((spendingClass, year), (totalAmount, customerCount, avgSpend)) =>
    (spendingClass, (year, totalAmount, customerCount, avgSpend))
  }
  .groupByKey() 
  .flatMap { case (spendingClass, data) =>
    val sortedData = data.toList.sortBy(_._1) 

    val trendData = sortedData.sliding(2).map {
      case List((prevYear, prevTotal, prevCust, prevAvg), (year, totalAmount, customerCount, avgSpend)) =>
        val growthRate = (avgSpend - prevAvg) / prevAvg * 100
        val retentionRate = customerCount.toDouble / prevCust * 100
        ((spendingClass, year), (totalAmount, customerCount, avgSpend, growthRate, retentionRate))
      case List((year, totalAmount, customerCount, avgSpend)) =>
        ((spendingClass, year), (totalAmount, customerCount, avgSpend, 0.0, 100.0))
    }
    
    trendData
  }


spendingTrends: org.apache.spark.rdd.RDD[((String, Int), (Double, Int, Double, Double, Double))] = MapPartitionsRDD[63] at flatMap at <console>:37


In [18]:
spendingTrends.take(10)

res0: Array[((String, Int), (Double, Int, Double, Double, Double))] = Array(((Poor,2010),(29576.680000000004,639,46.28588419405322,0.0,100.0)), ((Poor,2011),(83349.50000000004,1709,48.77091866588651,5.368881928267389,267.4491392801252)), ((Poor,2012),(134143.23999999996,2652,50.581915535444935,3.713272005321379,155.17846693973084)), ((Poor,2013),(201802.95000000007,3538,57.03870830977956,12.765022253477271,133.40874811463047)), ((Poor,2014),(234921.54000000027,4055,57.9337953144267,1.5692624029735929,114.6127755794234)), ((Poor,2015),(272362.26999999984,4639,58.71141840913987,1.34226161171167,114.4019728729963)), ((Poor,2016),(342843.3500000002,5263,65.14219076572302,10.953188546339124,113.45117482215994)), ((Poor,2017),(370305.6299999997,5553,66.6856888168557,2.369429141067291,105.5101...


# Job optimization

## caching transactionsRdd

In [26]:
transactionsRdd = dataRdd.map { line =>
    val fields = line.split(",")
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    Transaction(
        custId   = fields(0),
        startDate = fields(1),
        endDate   = fields(2),
        transId   = fields(3),
        date      = LocalDate.parse(fields(4), formatter),
        year      = fields(5).toInt,
        month     = fields(6).toInt,
        day       = fields(7).toInt,
        expType   = fields(8),
        amount    = fields(9).toDouble
    )
}.cache()

transactionsRdd: org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[48] at map at <console>:35


## Using broadcast for segmentation

Since it's just a simple treshold, broadcast can drastically improve the performance

In [28]:
val spendingThresholds = sc.broadcast(Map(
  "Poor" -> 1000.0,
  "Middle" -> 10000.0
))

val classifiedSpendingsRdd = customerSpendingsRdd.map { case (custId, totalSpend) =>
  val spendingClass = if (totalSpend < spendingThresholds.value("Poor")) "Poor"
                      else if (totalSpend < spendingThresholds.value("Middle")) "Middle"
                      else "Rich"
  (custId, spendingClass)
}

spendingThresholds: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,Double]] = Broadcast(10)
classifiedSpendingsRdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[49] at map at <console>:38


## Removing useless fields

This optimization is pretty simple but effective, this rdd only has 4 fields, since other ones are not used. 

In [33]:
val transactionsJoinedRdd = transactionsRdd
  .map(t => (t.custId, (t.transId, t.year, t.expType, t.amount)))
  .join(classifiedSpendingsRdd)
  .map { case (custId, ((transId, year, expType, amount), spendingClass)) =>
    ((spendingClass, year), (amount, 1))
  }

transactionsJoinedRdd: org.apache.spark.rdd.RDD[((CustomerSpending, Int), (Double, Int))] = MapPartitionsRDD[68] at map at <console>:36


## Caching rdd for reuse

It has been preferred to cache only metricsByCategory year, in order to gain balance and avoid an excessive memory usage, that's why transactionsJoinedRdd hasn't been cached

In [35]:
val metricsByCategoryYear = transactionsJoinedRdd
  .reduceByKey { case ((amount1, count1), (amount2, count2)) =>
    (amount1 + amount2, count1 + count2)
  }
  .map { case ((spendingClass, year), (totalAmount, customerCount)) =>
    val avgSpend = totalAmount / customerCount
    ((spendingClass, year), (totalAmount, customerCount, avgSpend))
  }
  .cache()

metricsByCategoryYear: org.apache.spark.rdd.RDD[((CustomerSpending, Int), (Double, Int, Double))] = MapPartitionsRDD[70] at map at <console>:36


In [None]:
## GroupByKey

As it was stated in classes, even though GroupByKey is simple to understand, it can be bad performance wise, that's why in the optimization, reduceByKey was preferred. 

In [38]:
val spendingTrends = metricsByCategoryYear
  .map { case ((spendingClass, year), (totalAmount, customerCount, avgSpend)) =>
    (spendingClass, List((year, totalAmount, customerCount, avgSpend)))
  }
  .reduceByKey(_ ++ _)
  .mapValues { data =>
    val sortedData = data.sortBy { case (year, _, _, _) => year }
    
    var prevAvgSpend = 0.0
    var prevCustCount = 0
    sortedData.map { case (year, totalAmount, customerCount, avgSpend) =>
      val growthRate = if (prevAvgSpend > 0) ((avgSpend - prevAvgSpend) / prevAvgSpend) * 100 else 0.0
      val retentionRate = if (prevCustCount > 0) (customerCount.toDouble / prevCustCount) * 100 else 100.0

      prevAvgSpend = avgSpend
      prevCustCount = customerCount
      (year, totalAmount, customerCount, avgSpend, growthRate, retentionRate)
    }
  }
  .flatMap { case (spendingClass, trends) =>
    trends.map { case (year, totalAmount, customerCount, avgSpend, growthRate, retentionRate) =>
      ((spendingClass, year), (totalAmount, customerCount, avgSpend, growthRate, retentionRate))
    }
  }


spendingTrends: org.apache.spark.rdd.RDD[((CustomerSpending, Int), (Double, Int, Double, Double, Double))] = MapPartitionsRDD[81] at flatMap at <console>:51
