# Simulated transactions analysis - Main Job

In [11]:
import org.apache.spark

import org.apache.spark


In [12]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner

import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner


In [13]:
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit


## Case classes 

After making imports, Transaction and CustomerSpending case classes are instanciated, in order to maintain a more readable code. 

In [14]:
// case class definition for data clarity

case class Transaction(
  custId: String,
  startDate: String,
  endDate: String,
  transId: String,
  date: LocalDate,
  year: Int,
  month: Int,
  day: Int,
  expType: String,
  amount: Double
)

case class CustomerSpending(totalSpend: Double, spendingClass: String)

defined class Transaction
defined class CustomerSpending


In [7]:
// rdd mapping with created custom case classes 

val rdd = sc.textFile("sample_data_big_data.csv")
val header = rdd.first()
val dataRdd = rdd.filter(row => row != header)


val transactionsRdd = dataRdd.map { line =>
    val fields = line.split(",")
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    Transaction(
        custId   = fields(0),
        startDate = fields(1),
        endDate   = fields(2),
        transId   = fields(3),
        date      = LocalDate.parse(fields(4), formatter),
        year      = fields(5).toInt,
        month     = fields(6).toInt,
        day       = fields(7).toInt,
        expType   = fields(8),
        amount    = fields(9).toDouble
    )
}

rdd: org.apache.spark.rdd.RDD[String] = sample_data_big_data.csv MapPartitionsRDD[1] at textFile at <console>:34
header: String = CUST_ID,START_DATE,END_DATE,TRANS_ID,DATE,YEAR,MONTH,DAY,EXP_TYPE,AMOUNT
dataRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:36
transactionsRdd: org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[3] at map at <console>:39


## Classified spendings

First thing first, it's needed to understand every single customer spends, in order to obtain the spending class.
After that, a join is made between transactions and classified spendings, in order to obtain a more complete dataset.

In [8]:
// total spendings per customer
val customerSpendingsRdd = transactionsRdd
    .map(t => (t.custId, t.amount))
    .reduceByKey(_ + _)

// customer spendings classification
val classifiedSpendingsRdd = customerSpendingsRdd.map { case (custId, totalSpend) =>
    val spendingClass = if (totalSpend < 1000.0) "Poor"
                      else if (totalSpend < 10000.0) "Middle"
                      else "Rich"
    (custId, CustomerSpending(totalSpend, spendingClass))
}

// inner join transactions with classified spendings
val transactionsJoinedRdd = transactionsRdd
    .map(t => (t.custId, t))
    .join(classifiedSpendingsRdd)  // (custId, (Transaction, CustomerSpending))
    .map { case (custId, (transaction, custSpending)) =>
        (transaction.custId, transaction.transId, custSpending.spendingClass, transaction.year, transaction.expType, transaction.amount)
    }

customerSpendingsRdd: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[5] at reduceByKey at <console>:35
classifiedSpendingsRdd: org.apache.spark.rdd.RDD[(String, CustomerSpending)] = MapPartitionsRDD[6] at map at <console>:38
transactionsJoinedRdd: org.apache.spark.rdd.RDD[(String, String, String, Int, String, Double)] = MapPartitionsRDD[11] at map at <console>:49


### Spending Analysis by Category and Year

This job aims to aggregate customer transactions based on **spending class** (`spendingClass`) and **year** (`year`), computing key metrics for each combination.

#### **Main Steps**:
1. **Transaction Aggregation**  
   - Transactions are grouped by (`spendingClass`, `year`).  
   - The total spending amount (`totalAmount`) is calculated.  
   - The unique set of customers who made purchases (`custSet`) is collected.  

2. **Metric Calculation**  
   - The total number of customers (`customerCount`) for each (`spendingClass`, `year`).  
   - The average spending per customer (`avgSpend = totalAmount / customerCount`).

In [9]:
// criteria: (spending class, year)

// Mapping: ((spendingClass, year), (amount, Set(customerId)))
val aggregatedByCategoryYear = transactionsJoinedRdd
    .map { case (custId, transId, spendingClass, year, expType, amount) =>
        ((spendingClass, year), (amount, Set(custId)))
    }
    .reduceByKey { case ((amount1, custSet1), (amount2, custSet2)) =>
        (amount1 + amount2, custSet1 ++ custSet2)
    }

val metricsByCategoryYear = aggregatedByCategoryYear.map { case ((spendingClass, year), (totalAmount, custSet)) =>
  val customerCount = custSet.size
  val avgSpend = totalAmount / customerCount
  ((spendingClass, year), (totalAmount, customerCount, avgSpend))
}

aggregatedByCategoryYear: org.apache.spark.rdd.RDD[((String, Int), (Double, scala.collection.immutable.Set[String]))] = ShuffledRDD[13] at reduceByKey at <console>:37
metricsByCategoryYear: org.apache.spark.rdd.RDD[((String, Int), (Double, Int, Double))] = MapPartitionsRDD[14] at map at <console>:41


## Spending trends

Previous calculated metrics can be useful, and I choose to use them in order to calculate spending trends, based on growth rate and retention rate.

#### **Main Steps**:  
1. **Grouping Data by Spending Class**:  
   - dataset mapped to `(spendingClass, (year, totalAmount, customerCount, avgSpend))` to facilitate trend analysis.  
   - Then, we use `groupByKey()` to aggregate all years for each `spendingClass`.
2. **Sorting by Year**:  
   - Inside each spending class group, data is sorted by `year` to track changes over time.  
3. **Calculating Growth Rate and Retention Rate**:  
   - consecutive years are confronted in order to get significant indexes.  
   - **Growth Rate** is computed as the percentage change in `avgSpend` from the previous year.  
   - **Retention Rate** is calculated as the percentage of customers retained from the previous year.  
   - If there is no previous year, the values default to `0.0%` growth and `100%` retention.  
4. **Flattening the Output**:  
   - structured dataset `(spendingClass, year, totalAmount, customerCount, avgSpend, growthRate, retentionRate)`, which can be further analyzed or visualized.  


In [10]:
val spendingTrends = metricsByCategoryYear
    .map { case ((spendingClass, year), (totalAmount, customerCount, avgSpend)) =>
        (spendingClass, (year, totalAmount, customerCount, avgSpend))
    }
    .groupByKey() 
    .flatMap { case (spendingClass, data) =>
        val sortedData = data.toList.sortBy(_._1)
    
        var prevAvgSpend = 0.0
        var prevCustCount = 0
        var result = List.empty[((String, Int), (Double, Int, Double, Double, Double))]
        
        sortedData.map { case (year, totalAmount, customerCount, avgSpend) =>
          val growthRate = if (prevAvgSpend > 0) ((avgSpend - prevAvgSpend) / prevAvgSpend) * 100 else 0.0
          val retentionRate = if (prevCustCount > 0) (customerCount.toDouble / prevCustCount) * 100 else 100.0
          
          prevAvgSpend = avgSpend
          prevCustCount = customerCount
    
          result = result :+ ((spendingClass, year), (totalAmount, customerCount, avgSpend, growthRate, retentionRate))
        }
    
        result
      }

spendingTrends: org.apache.spark.rdd.RDD[((String, Int), (Double, Int, Double, Double, Double))] = MapPartitionsRDD[17] at flatMap at <console>:35


In [9]:
spendingTrends.collect().toSeq
    .sortBy { case ((spendingClass, year), _) => (spendingClass, year) }
    .foreach(println)


((Middle,2010),(3079.2499999999995,10,307.92499999999995,0.0,100.0))
((Middle,2011),(32507.629999999997,39,833.5289743589743,170.69220568611655,390.0))
((Middle,2012),(56130.0,62,905.3225806451613,8.613210637506622,158.97435897435898))
((Middle,2013),(66541.70999999999,81,821.5025925925925,-9.258576980686389,130.64516129032256))
((Middle,2014),(84076.24999999999,105,800.7261904761904,-2.529073225543152,129.62962962962962))
((Middle,2015),(102454.96000000002,116,883.2324137931037,10.303924649679185,110.47619047619048))
((Middle,2016),(114106.84000000001,138,826.8611594202899,-6.38238061607402,118.96551724137932))
((Middle,2017),(146244.90000000002,161,908.3534161490685,9.855615516625857,116.66666666666667))
((Middle,2018),(161478.65,156,1035.1195512820511,13.955596233721792,96.8944099378882))
((Middle,2019),(182600.3,161,1134.163354037267,9.56834431661008,103.20512820512822))
((Middle,2020),(221567.11999999997,165,1342.83103030303,18.39837934482465,102.48447204968944))
((Poor,2010),(295

# Job optimization

## caching transactionsRdd

In [15]:
// rdd mapping with created custom case classes 

val rdd = sc.textFile("sample_data_big_data.csv")
val header = rdd.first()
val dataRdd = rdd.filter(row => row != header)

val transactionsRdd = dataRdd.map { line =>
    val fields = line.split(",")
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    Transaction(
        custId   = fields(0),
        startDate = fields(1),
        endDate   = fields(2),
        transId   = fields(3),
        date      = LocalDate.parse(fields(4), formatter),
        year      = fields(5).toInt,
        month     = fields(6).toInt,
        day       = fields(7).toInt,
        expType   = fields(8),
        amount    = fields(9).toDouble
    )
}.cache()

rdd: org.apache.spark.rdd.RDD[String] = sample_data_big_data.csv MapPartitionsRDD[19] at textFile at <console>:44
header: String = CUST_ID,START_DATE,END_DATE,TRANS_ID,DATE,YEAR,MONTH,DAY,EXP_TYPE,AMOUNT
dataRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at filter at <console>:46
transactionsRdd: org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[21] at map at <console>:48


## Using broadcast for segmentation

Since it's just a simple treshold, broadcast can drastically improve the performance

In [16]:
// total spendings per customer
val customerSpendingsRdd = transactionsRdd
    .map(t => (t.custId, t.amount))
    .reduceByKey(_ + _)

val spendingThresholds = sc.broadcast(Map(
    "Poor" -> 1000.0,
    "Middle" -> 10000.0
))

val classifiedSpendingsRdd = customerSpendingsRdd.map { case (custId, totalSpend) =>
  val spendingClass = if (totalSpend < spendingThresholds.value("Poor")) "Poor"
                      else if (totalSpend < spendingThresholds.value("Middle")) "Middle"
                      else "Rich"
  (custId, spendingClass)
}

customerSpendingsRdd: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[23] at reduceByKey at <console>:42
spendingThresholds: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,Double]] = Broadcast(4)
classifiedSpendingsRdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[24] at map at <console>:49


# Using broadcast for joining 

In this case, I considered broadcasting classifiedSpendingsRddd, because it has 75000 records (unique customer id's), while transactionsRdd has more than 200 millions. Other than that, a more selective field projection has been made, to reduce computational time.

In [17]:
val classifiedSpendingsMap = classifiedSpendingsRdd.collectAsMap()
val broadcastspendings = sc.broadcast(classifiedSpendingsMap)

val transactionsJoinedRdd = transactionsRdd.map { t =>
    val spendingClass = broadcastspendings.value.getOrElse(t.custId, "Unknown")
    ((spendingClass, t.year), (t.amount, 1))
}


classifiedSpendingsMap: scala.collection.Map[String,String] = Map(C6HWPBA456 -> Poor, CSAME0IGFA -> Poor, CVKB0FW9XD -> Poor, CB6ZIL1FOE -> Poor, CAKHPDBEYK -> Poor, CEV9KHT8PH -> Poor, CEXLHX94GW -> Poor, C7RERY5SLC -> Poor, C7ZKY7VORL -> Poor, C2SYBFN10E -> Poor, C8H2G6WL39 -> Poor, CW9OO8ACPX -> Poor, CIYCXH14HL -> Poor, CU68AVKSGS -> Poor, CH6WHATYFG -> Poor, CWXB07DHWH -> Poor, C9OK7OCTOO -> Poor, CPCDVVV6GT -> Poor, CIYOINA6LX -> Poor, CLMO3I9RBB -> Poor, CZR1FMJIO6 -> Poor, CS54URQ7ZE -> Poor, CLR1OUDAH0 -> Poor, CUIXFN8TD9 -> Poor, C1WQZF7SMV -> Poor, CZ5IXCDEOA -> Poor, C5YT7KLA4R -> Poor, C8AO6TI076 -> Poor, C7V213LH7A -> Poor, CO2JDKV6TY -> Poor, C9QT9NZM7V -> Poor, CNR371OZGW -> Poor, CDBJSATHXU -> Poor, CTJ9E9HHUC -> Poor, CHQGM5Q96H -> Poor, C96J0FWPQ7 -> Poor, C508JHG3EL ...


## Removing useless fields (no broadcast alternative)

This optimization is pretty simple but effective, this rdd only has 4 fields, since other ones are not used. 

In [18]:
val transactionsJoinedRdd = transactionsRdd
    .map(t => (t.custId, (t.transId, t.year, t.expType, t.amount)))
    .join(classifiedSpendingsRdd)
    .map { case (custId, ((transId, year, expType, amount), spendingClass)) =>
        ((spendingClass, year), (amount, 1))
    }

transactionsJoinedRdd: org.apache.spark.rdd.RDD[((String, Int), (Double, Int))] = MapPartitionsRDD[30] at map at <console>:41


In [19]:
val metricsByCategoryYear = transactionsJoinedRdd
  .reduceByKey { case ((amount1, count1), (amount2, count2)) =>
    (amount1 + amount2, count1 + count2)
  }
  .map { case ((spendingClass, year), (totalAmount, customerCount)) =>
    val avgSpend = totalAmount / customerCount
    ((spendingClass, year), (totalAmount, customerCount, avgSpend))
  }

metricsByCategoryYear: org.apache.spark.rdd.RDD[((String, Int), (Double, Int, Double))] = MapPartitionsRDD[32] at map at <console>:41


## GroupByKey (optimization 1)

As it was stated in classes, even though GroupByKey is simple to understand, it can be bad performance wise, that's why in the optimization, aggregate by key was preferred. 

In [20]:
val spendingTrends = metricsByCategoryYear
    .map { case ((spendingClass, year), (totalAmount, customerCount, avgSpend)) =>
        (spendingClass, (year, totalAmount, customerCount, avgSpend))
    }
    .aggregateByKey(List.empty[(Int, Double, Int, Double)])(
        (acc, value) => acc :+ value,  
        
        (acc1, acc2) => (acc1 ++ acc2).sortBy(_._1)  
    )
    .flatMap { case (spendingClass, sortedData) =>
        var prevAvgSpend = 0.0
        var prevCustCount = 0
        
        sortedData.map { case (year, totalAmount, customerCount, avgSpend) =>
          val growthRate = if (prevAvgSpend > 0) ((avgSpend - prevAvgSpend) / prevAvgSpend) * 100 else 0.0
          val retentionRate = if (prevCustCount > 0) (customerCount.toDouble / prevCustCount) * 100 else 100.0
        
          prevAvgSpend = avgSpend
          prevCustCount = customerCount
    
        ((spendingClass, year), (totalAmount, customerCount, avgSpend, growthRate, retentionRate))
    }
    }


spendingTrends: org.apache.spark.rdd.RDD[((String, Int), (Double, Int, Double, Double, Double))] = MapPartitionsRDD[35] at flatMap at <console>:46


In [21]:
spendingTrends.collect().toSeq
    .sortBy { case ((spendingClass, year), _) => (spendingClass, year) }
    .foreach(println)


((Middle,2010),(3079.2499999999995,11,279.93181818181813,0.0,100.0))
((Middle,2011),(32507.629999999997,42,773.9911904761905,176.49275295081907,381.8181818181818))
((Middle,2012),(56130.0,69,813.4782608695652,5.1017467484456445,164.28571428571428))
((Middle,2013),(66541.70999999999,93,715.502258064516,-12.044083722694454,134.7826086956522))
((Middle,2014),(84076.24999999999,121,694.8450413223139,-2.887093158599011,130.10752688172042))
((Middle,2015),(102454.96000000002,133,770.3380451127821,10.864725125878769,109.91735537190081))
((Middle,2016),(114106.84000000001,162,704.3632098765432,-8.56440048038647,121.80451127819549))
((Middle,2017),(146244.90000000002,190,769.7100000000002,9.27742806653836,117.28395061728396))
((Middle,2018),(161478.65,192,841.0346354166667,9.266429618514312,101.05263157894737))
((Middle,2019),(182600.3,196,931.6341836734694,10.77239205635303,102.08333333333333))
((Middle,2020),(221567.11999999997,186,1191.221075268817,27.863607427089736,94.89795918367348))
((Po