# Big Data Project - Simulated Transactions - 21/22

<strong>Andrea Rettaroli</strong> - 0000977930


### Simulated Transactions - Dataset Analysis

The goal of this notebook is to analyze and extract some useful informations from [kaggle simulated-transactions dataset](https://www.kaggle.com/datasets/conorsully1/simulated-transactions). 

The dataset contains ~22GB of data that represents random transactions.
[Notebook used to generate data](https://github.com/conorosully/medium-articles/blob/master/src/transaction_data_generator.ipynb).

Transactions are generated for 75,000 customers and are classified into 12 expenditure types:

- Groceries
- Clothing
- Housing
- Education
- Health
- Motor/Travel
- Entertainment
- Gambling
- Savings
- Bills and Utilities
- Tax
- Fines

Each transaction is represented by 10 features/columns:

- <strong> CUST_ID</strong>: unique ID for every customer
- <strong> START_DATE</strong>: the month the customer started making transactions
- <strong> END_DATE</strong>: the month the customer stopped making transactions
- <strong> TRANS_ID</strong>: unique ID for every transaction
- <strong> DATE</strong>: the date of the transaction
- <strong> YEAR</strong>: the year of the transaction
- <strong> MONTH</strong>: the month of the transaction
- <strong> DAY</strong>: the day of the transaction
- <strong> EXP_TYPE</strong>: the expenditure type (listed above)
- <strong> AMOUNT</strong>: the amount of the transaction in dollars $

# Cluster configuration

Let's start by eneble to connetct to spark ui and setting the proper cluster configuration as saw in labs. 

- 3 executors with 3 cores each (leave 1 for daemons; and there's also the AMP)
- 8G of memory per executor (recommended 11G, but it exceeds YARN's default maximum allowed in this EMR cluster)

In [1]:
sc.applicationId
"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1675870113676_0002,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res1: String = application_1675870113676_0002
res2: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1675870113676_0002/


In [2]:
%%configure -f
{"executorMemory":"8G", "numExecutors":3, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1675870113676_0003,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1675870113676_0003,spark,idle,Link,Link,✔


# Dataset Preprocessing 

After an analysis of the dataset I decided to:
- Removing the **DATE** column because it is the aggregation of the colums year, month and day that are more useful for the next jobs.
- creating some smaller datasets to test the correctness of the functions.

This section need to be executed only the first time.

In [3]:
// Name of bucket in s3
val bucketname = "unibo-bd2122-arettaroli"
// Paths of datasets
// S3 path of simulated transactions dataset
val s3_path_dataset = "s3a://"+bucketname+"/exam-dataset/transactions.csv"
// S3 path of cleaned dataset without column "date"
val s3_path_dataset_cleaned = "s3a://"+bucketname+"/exam-dataset/transactions-cleaned.csv"
// S3 path of dataset without column "date" with 30% of data for optimization
val s3_path_dataset_small = "s3a://"+bucketname+"/exam-dataset/transactions-small.csv"
// S3 path of dataset without column "date" with 15 rows (to test the correct functioning of the jobs)
val s3_path_dataset_smallest = "s3a://"+bucketname+"/exam-dataset/transactions-smallest.csv"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2122-arettaroli
s3_path_dataset: String = s3a://unibo-bd2122-arettaroli/exam-dataset/transactions.csv
s3_path_dataset_cleaned: String = s3a://unibo-bd2122-arettaroli/exam-dataset/transactions-cleaned.csv
s3_path_dataset_small: String = s3a://unibo-bd2122-arettaroli/exam-dataset/transactions-small.csv
s3_path_dataset_smallest: String = s3a://unibo-bd2122-arettaroli/exam-dataset/transactions-smallest.csv


In [4]:
// Read data from the simulated transactions dataset, drops the column DATE (_c4) then save to another csv.
spark.
  read.
  csv(s3_path_dataset).
  drop("_c4").
  write.
  csv(s3_path_dataset_cleaned)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
// Read data from csv, limit the number of rows considering the 30% of data for optimization
// Number of transactions: 261969719 => 30% of transactions is : 78.590.915,7 => 78500000 
spark.
  read.
  csv(s3_path_dataset_cleaned).
  limit(78500000). 
  write.
  csv(s3_path_dataset_small)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
// Save a small set of data for check the results of following jobs
spark.
  read.
  csv(s3_path_dataset_cleaned).
  limit(10).
  write.
  csv(s3_path_dataset_smallest)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Dataset preparation


In order to read each transaction is useful to define an object **TransactionsParser** to parse the transactions from the csv file. 

Each value in the csv is divided by ',' (comma) so for each line it's possible to retrieve values splitting on comma.


In [7]:
import java.util.Calendar
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner

object TransactionsParser {
  // Each value in the csv is divided by ','
  val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
    
  type CustomerId = String
  type StartDate = String
  type EndDate = String
  type TransactionId = String 
  // type TransactionDate = String
  type Year = Int
  type Month = Int
  type Day = Int
  type ExpenditureType = String
  type Amount = Double
  
  def parseTransactionLine(line: String): Option[(CustomerId, StartDate, EndDate, TransactionId, Year, Month, Day, ExpenditureType, Amount)] = {
    try {
      val input = line.split(commaRegex) // Splitting on comma
      if (input(0) == "CUST_ID") { // To discard the headers column line
          None
      }
      Some(input(0).trim, input(1).trim, input(2).trim, input(3).trim, 
           input(4).trim.toInt, input(5).trim.toInt, input(6).trim.toInt, input(7).trim, input(8).trim.toDouble)
      
    } catch {
      case _: Exception => None
    }
  }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.util.Calendar
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner
defined object TransactionsParser


The dataset without date column size is about ~18.5 GB. (167 partition of 111.3MB)
it was decided to save the data by serializing them in memory and disk. In this way when the memory is full, the disk is used.

Regarding partitions, by default spark creates 665 partitions for this dataset which is a lot, each task should take at least 100-200ms and with the same total execution time it is better to partition less in this case.

In [12]:
import org.apache.spark.storage.StorageLevel._

// Each row is flatMapped with parseTransactionLine method

//val rddTransactionsOriginal = sc.textFile(s3_path_dataset).flatMap(TransactionsParser.parseTransactionLine)
//val rddTransactions = sc.textFile(s3_path_dataset_small).flatMap(TransactionsParser.parseTransactionLine).coalesce(60)
val rddTransactions = sc.textFile(s3_path_dataset_cleaned).flatMap(TransactionsParser.parseTransactionLine).coalesce(300)
// Persist on memory and disk with serialization
val diskMemoryRdd = rddTransactions.persist(MEMORY_AND_DISK_SER) 

// Same for the smallest (10 rows)
val rddTransactions_smallest = sc.textFile(s3_path_dataset_smallest).flatMap(TransactionsParser.parseTransactionLine)
// Persist on memory and disk with serialization
val diskMemoryRdd_smallest = rddTransactions_smallest.persist(MEMORY_AND_DISK_SER) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.storage.StorageLevel._
rddTransactions: org.apache.spark.rdd.RDD[(TransactionsParser.CustomerId, TransactionsParser.StartDate, TransactionsParser.EndDate, TransactionsParser.TransactionId, TransactionsParser.Year, TransactionsParser.Month, TransactionsParser.Day, TransactionsParser.ExpenditureType, TransactionsParser.Amount)] = CoalescedRDD[73] at coalesce at <console>:49
diskMemoryRdd: rddTransactions.type = CoalescedRDD[73] at coalesce at <console>:49
rddTransactions_smallest: org.apache.spark.rdd.RDD[(TransactionsParser.CustomerId, TransactionsParser.StartDate, TransactionsParser.EndDate, TransactionsParser.TransactionId, TransactionsParser.Year, TransactionsParser.Month, TransactionsParser.Day, TransactionsParser.ExpenditureType, TransactionsParser.Amount)] = MapPartitionsRDD[76] at flatMap at <console>:46
diskMemoryRdd_smallest: rddTransactions_smallest.type = MapPartitionsRDD[76] at flatMap at <console>:46


In [13]:
//data preview
diskMemoryRdd_smallest.
    collect().
    foreach(transaction => print(transaction + "\n"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(CPVZ2MIAO3,2010-07-01,2017-01-01,T18P9KZ3O350W73,2012,4,5,Entertainment,21.48)
(CPVZ2MIAO3,2010-07-01,2017-01-01,T2N065QH3L11AZS,2015,5,24,Entertainment,17.25)
(CPVZ2MIAO3,2010-07-01,2017-01-01,TRNQXHP2B5T069M,2014,3,5,Education,412.9)
(CPVZ2MIAO3,2010-07-01,2017-01-01,T9PQMGLRKOAI2SJ,2012,9,13,Entertainment,19.18)
(CPVZ2MIAO3,2010-07-01,2017-01-01,TS5UMAAB2WWPNXW,2016,5,13,Entertainment,18.66)
(CPVZ2MIAO3,2010-07-01,2017-01-01,T48ZCBVO5UW71NC,2013,12,9,Groceries,63.61)
(CPVZ2MIAO3,2010-07-01,2017-01-01,T8K9FUYYV2OY9WB,2015,12,4,Entertainment,45.66)
(CPVZ2MIAO3,2010-07-01,2017-01-01,T2Y5VJ9D579QGTB,2012,8,5,Entertainment,49.35)
(CPVZ2MIAO3,2010-07-01,2017-01-01,T13QP1402CVF9PW,2012,9,22,Groceries,56.19)
(CPVZ2MIAO3,2010-07-01,2017-01-01,TY7N8BTA0ASGXO9,2011,7,18,Groceries,64.94)


**DEBUG: use to see RDDs partitions status**: the cell below show the RDDs partiotions status.

In [24]:
sc.getRDDStorageInfo.foreach(x=> print(x + "\n")) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RDD "MapPartitionsRDD" (69) StorageLevel: StorageLevel(disk, memory, 1 replicas); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 0.0 B; DiskSize: 1662.0 B
RDD "CoalescedRDD" (73) StorageLevel: StorageLevel(disk, memory, 1 replicas); CachedPartitions: 300; TotalPartitions: 300; MemorySize: 0.0 B; DiskSize: 71.3 GiB
RDD "MapPartitionsRDD" (78) StorageLevel: StorageLevel(disk, memory, 1 replicas); CachedPartitions: 300; TotalPartitions: 300; MemorySize: 0.0 B; DiskSize: 1499.0 MiB
RDD "MapPartitionsRDD" (80) StorageLevel: StorageLevel(disk, memory, 1 replicas); CachedPartitions: 300; TotalPartitions: 300; MemorySize: 0.0 B; DiskSize: 4.3 GiB
RDD "MapPartitionsRDD" (76) StorageLevel: StorageLevel(disk, memory, 1 replicas); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 0.0 B; DiskSize: 1662.0 B
RDD "MapPartitionsRDD" (77) StorageLevel: StorageLevel(disk, memory, 1 replicas); CachedPartitions: 300; TotalPartitions: 300; MemorySize: 0.0 B; DiskSize: 3.9 GiB
RDD "MapPartitionsRDD"

**Use only to clean RDDs in memory**: the cell below clean the RDDs in memory.

In [None]:
sc.getPersistentRDDs.foreach(_._2.unpersist())

# Dataset exploration

In this section various exploration queries are performed on the dataset but also more complex jobs to find useful information from data.

To improve drastically the performances data used in different jobs will be cached in memory and disk.

In [14]:
// Caching of customers, amount, year, expenditure type and amount for following jobs to improve performance
val cachedCustomer = rddTransactions.map(x=>x._1).persist(MEMORY_AND_DISK_SER)
val cachedAmount = rddTransactions.map(x=>x._9).persist(MEMORY_AND_DISK_SER)
val cachedYear = rddTransactions.map(x=>x._5).persist(MEMORY_AND_DISK_SER)
val cachedExpenditureType = rddTransactions.map(x=>x._8).persist(MEMORY_AND_DISK_SER)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cachedCustomer: org.apache.spark.rdd.RDD[TransactionsParser.CustomerId] = MapPartitionsRDD[77] at map at <console>:41
cachedYear: org.apache.spark.rdd.RDD[TransactionsParser.Year] = MapPartitionsRDD[78] at map at <console>:40
cachedExpenditureType: org.apache.spark.rdd.RDD[TransactionsParser.ExpenditureType] = MapPartitionsRDD[79] at map at <console>:40
cachedAmount: org.apache.spark.rdd.RDD[TransactionsParser.Amount] = MapPartitionsRDD[80] at map at <console>:40


To know the size of the dataset entities:
- How many transactions? 
- How many distinct customers?
- How many distinct expenditure type?
- How many distinct years?
- What is min amount and what is the max amount?
- From what year to what year?

In [19]:
"Number of transactions: " + (diskMemoryRdd.count() -1) // Each row is a transaction => 261.969.719 -1 without header line
"Number of distinct customers: " + cachedCustomer.distinct().count() // 75000
"Number of distinct expenditure type: " + cachedExpenditureType.distinct().count() // 12 
"Number of distinct years: " + cachedYear.distinct().count() // 11
"Amount goes from: " + cachedAmount.min() + " $ to " + cachedAmount.max() +" $" //0.12 to 6519.61
"From year: " + cachedYear.min() + " to " + cachedYear.max() // 2010 to 2020

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res72: String = Number of transactions: 261969718
res73: String = Number of distinct customers: 75000
res74: String = Number of distinct expenditure type: 12
res75: String = Number of distinct years: 11
res76: String = Range of amount: 0.12 $ to 6519.61 $
res77: String = From year: 2010 to 2020


Learn more about this data set by answering more complex questions such as:
- which are the years listed in the dataset in ascending order?
- What is the average amount?
- What is the average amount calculate for every year?
- What is the average amount for each type of expenditure?
- What is the maximum amount for each type of expenditure?

In [25]:
// which are the years listed in the dataset in ascending order?
cachedYear.
    distinct().
    collect().
    sorted.
    foreach(x=> print(x + "\n"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020


In [26]:
//What is the average amount?
"Amount on average: " + cachedAmount.mean()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res90: String = Amount on average: 81.56297388367226


In [27]:
// What is the average amount calculate for every year?
diskMemoryRdd.
    map(x => (x._5, x._9)). //take YEAR and AMOUNT
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)). //sum AMOUNT on fist value, and count summed AMOUNTs on the second, end it aggregate partitions
    map({case(k,v)=>(k,v._1/v._2)}). //calculate the average based on YEAR(key)
    sortByKey(). //order on YEAR
    collect().
    foreach(result => print(result + "\n"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(2010,58.13171661388979)
(2011,59.743144310610944)
(2012,62.61107660724445)
(2013,65.92462474542246)
(2014,69.75024925871854)
(2015,73.98090515674585)
(2016,78.64610882393866)
(2017,83.6344258468586)
(2018,89.35581974762604)
(2019,96.0554856040244)
(2020,103.33282605610557)


In [28]:
// Caching the pair EXPENDITURE_TYPE - AMOUNT for following jobs to improve performance 
val cachedExpenditureTypeAmount = rddTransactions.map(x => (x._8, x._9)).persist(MEMORY_AND_DISK_SER)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cachedExpenditureTypeAmount: org.apache.spark.rdd.RDD[(TransactionsParser.ExpenditureType, TransactionsParser.Amount)] = MapPartitionsRDD[116] at map at <console>:41


In [29]:
// What is the average amount for each type of expenditure?
cachedExpenditureTypeAmount.
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)). //sum AMOUNT on fist value, and count summed AMOUNTs on the second, end it aggregate partitions
    map({case(k,v)=>(k,v._1/v._2)}). //calculate the average based on EXPENDITURE_TYPE(key)
    sortBy(_._2, false). //order by AMOUNT descending 
    collect().
    foreach(result => print(result + "\n"))

// In this way is also possible to view which expenditure type is more expensive on average

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(Housing,1558.7478390875365)
(Tax,412.9514105976915)
(Education,281.19211042051637)
(Savings,223.37088318474537)
(Bills and Utilities,208.28440022266741)
(Clothing,179.36878004462235)
(Health,159.31446095332157)
(Fines,159.20079085908074)
(Motor/Travel,133.3806493405144)
(Gambling,105.02011185360799)
(Groceries,80.30824109361185)
(Entertainment,24.031486539717363)


In [30]:
//What is the maximum amount for each type of expenditure?
cachedExpenditureTypeAmount.
    reduceByKey((x,y)=>{if(x<y) y else x}). //take the maximum AMOUNT for each EXPENDITURE_TYPE
    sortBy(_._2, false). //reorder by AMOUNT descending 
    collect().
    foreach(result => print(result + "\n"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(Housing,6519.61)
(Motor/Travel,6334.35)
(Clothing,4319.49)
(Education,2787.69)
(Savings,1996.85)
(Groceries,1952.39)
(Health,1816.22)
(Bills and Utilities,1489.54)
(Tax,1333.29)
(Entertainment,1002.84)
(Fines,988.5)
(Gambling,870.31)
