# Project Big Data (2024/2025)

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://127.0.0.1:4040
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1736590217756)
SparkSession available as 'spark'


import org.apache.spark


In [None]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

In [2]:
val spark = org.apache.spark.sql.SparkSession.builder.appName("BigDataExam").getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@14be94b4


## Parser for IMDb datasets

In [3]:
import java.util.Calendar
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner

object IMDbParser{

    def parseAkasLine(line: String): Option[
        (String, Int, String, String, String, Array[String], Array[String], Boolean)] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
            input(0), // titleId
            input(1).toInt, // ordering
            input(2), // title
            if (input(3) == "\\N") "" else input(3), // region
            if (input(4) == "\\N") "" else input(4), // language
            if (input(5) == "\\N") Array.empty[String] else input(5).split(",").map(_.trim), // types as Array[String]
            if (input(6) == "\\N") Array.empty[String] else input(6).split(",").map(_.trim), // attributes as Array[String]
            input(7) == "1" // isOriginalTitle
            )
        } catch {
            case e: Exception => None
        }
    }
    
    def parseCrewLine(line: String): Option[(String, Array[String], Array[String])] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
                input(0), // tconst
                if (input(1) == "\\N") Array.empty[String] else input(1).split(",").map(_.trim), // directors as Array[String]
                if (input(2) == "\\N") Array.empty[String] else input(2).split(",").map(_.trim) // writers as Array[String]
            )
        } catch {
        case e: Exception => None
        }
    }
    
    def parseEpisodeLine(line: String): Option[(String, String, Int, Int)] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
                input(0), // tconst
                input(1), // parentTconst
                if (input(2) == "\\N") 0 else input(2).toInt, // seasonNumber
                if (input(3) == "\\N") 0 else input(3).toInt // episodeNumber
            )
        } catch {
            case e: Exception => None
        }
    }
    
    def parsePrincipalsLine(line: String): Option[(String, Int, String, String, String, String)] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
                input(0), // tconst
                input(1).toInt, // ordering
                input(2), // nconst
                input(3), // category
                if (input(4) == "\\N") "" else input(4), // job
                if (input(5) == "\\N") "" else input(5) // characters
            )
        } catch {
            case e: Exception => None
        }
    }
    
    def parseRatingsLine(line: String): Option[(String, Double, Int)] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
                input(0), // tconst
                input(1).toDouble, // averageRating
                input(2).toInt // numVotes
            )
        } catch {
            case e: Exception => None
        }
    }
    
    def parseNameBasicsLine(line: String): Option[(String, String, Int, Int, Array[String], Array[String])] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
                input(0), // nconst
                input(1), // primaryName
                input(2).toInt, // birthYear
                if (input(3) == "\\N") 0 else input(3).toInt, // deathYear
                input(4).split(",").map(_.trim), // primaryProfession as Array[String]
                input(5).split(",").map(_.trim) // knownForTitles as Array[String]
            )
        } catch {
            case e: Exception => None
        }
    }
    
    def parseTitleBasicsLine(line: String): Option[
        (String, String, String, String, Boolean, Int, Int, Int, Array[String])] = {
        val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
        val input = line.split(commaRegex).map(_.trim.replaceAll("^\"|\"$", ""))
        try {
            Some(
                input(0), // tconst
                input(1), // titleType
                input(2), // primaryTitle
                input(3), // originalTitle
                input(4) == "1", // isAdult
                input(5).toInt, // startYear
                if (input(6) == "\\N") 0 else input(6).toInt, // endYear
                if (input(7) == "\\N") 0 else input(7).toInt, // runtimeMinutes
                input(8).split(",").map(_.trim) // genres as Array[String]
            )
        } catch {
            case e: Exception => None
        }
    }
}

import java.util.Calendar
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner
defined object IMDbParser


In [4]:
val rddTitleBasics = sc.textFile("../../../../datasets/project/csv/title.basics_half.csv").flatMap(IMDbParser.parseTitleBasicsLine)
val rddPrincipals = sc.textFile("../../../../datasets/project/csv/title.principals_half.csv").flatMap(IMDbParser.parsePrincipalsLine)
val rddRatings = sc.textFile("../../../../datasets/project/csv/title.ratings_half.csv").flatMap(IMDbParser.parseRatingsLine)
val rddNameBasics = sc.textFile("../../../../datasets/project/csv/name.basics.csv").flatMap(IMDbParser.parseNameBasicsLine)

rddTitleBasics: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Int, Int, Int, Array[String])] = MapPartitionsRDD[2] at flatMap at <console>:29
rddPrincipals: org.apache.spark.rdd.RDD[(String, Int, String, String, String, String)] = MapPartitionsRDD[5] at flatMap at <console>:30
rddRatings: org.apache.spark.rdd.RDD[(String, Double, Int)] = MapPartitionsRDD[8] at flatMap at <console>:31
rddNameBasics: org.apache.spark.rdd.RDD[(String, String, Int, Int, Array[String], Array[String])] = MapPartitionsRDD[11] at flatMap at <console>:32


## Valutazione dei file del dataset che vengono usati

### dimenzione file originali

![dimezione file originali](resorc/graf4.png)

### dimenzione file ridotti per l'elaborazione locale

![dimezione file ridotti per l'elaborazione locale](resorc/graf5.png)

## Job: Valutazione Attori per Genere

### **Metriche Considerate**
1. **Valutazione del film**:
   - Vengono considerate solo le valutazioni di film con più di 500 valutazioni.
2. **Genere**:
   - Genere cinematografico dei film.
3. **Nome Attore**:
   - Nome dell'attore che ha partecipato ai film.
4. **Film Partecipati**:
   - Elenco dei film a cui l'attore ha partecipato.

### **Obiettivo del Job**
Valutare ogni attore per ogni genere, assegnando una classificazione basata sui seguenti criteri:

#### **Criteri di Valutazione**
1. **Inclassificabile**:
   - Se l'attore non ha partecipato a nessun film di quel genere.
2. **Insufficiente**:
   - Se l'attore ha partecipato a meno di 2 film di quel genere **oppure** ha una valutazione media inferiore a 6.
3. **Sufficiente**:
   - Se l'attore ha partecipato ad almeno 2 film di quel genere **oppure** ha una valutazione media superiore a 6.

---

## Job0

vesione senza alcuna ottimizzazione tranne per la groupByKey e la seguente valutazione eseguita nella prima occazione possibile evitando di eseguire ulteriori join prima della valutazione.


In [5]:
val actorsAndActresses = rddNameBasics.filter { case (_, _, _, _, primaryProfession, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.filter(_._4 == 0).map { case (nconst, primaryName, _, _, _, _) => (nconst, primaryName) }

val moviesRating = rddPrincipals.map { case (tconst, _, nconst, category, _, _) => (tconst, nconst) }
    .join(rddRatings.filter(_._3 > 500).map { case (tconst, rating, _) => (tconst, rating) })

val actorGenreRatings = rddTitleBasics.filter(_._2 == "movie").flatMap {
    case (tconst, _, _, _, _, _, _, _, genres) => genres.map(genre => (tconst, genre))}
    .join(moviesRating)
    .map { case (tconst, (genre, (nconst, rating))) => ((nconst, genre), rating) }
    .groupByKey()
    .mapValues(ratings => (ratings.sum / ratings.size, ratings.size))

val job = actorsAndActresses.cartesian(rddTitleBasics.filter(_._2 == "movie").flatMap(_._9).filter(x => x!= "\\N").distinct())
    .leftOuterJoin(actorGenreRatings)
    .map { case ((nconst, primaryName), (genre, maybeRatings)) =>
        val sufficiency = maybeRatings match {
            case Some((avgRating, count)) => if (avgRating > 6 && count >= 2) "sufficiente" else "insufficiente"
            case None => "inclassificabile"
        }
    (nconst, primaryName, genre, sufficiency)
    }
    
spark.time {
    job.collect()
}

Time taken: 225180 ms


actorsAndActresses: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[14] at map at <console>:35
moviesRating: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[20] at join at <console>:38
actorGenreRatings: org.apache.spark.rdd.RDD[((String, String), (Double, Int))] = MapPartitionsRDD[28] at mapValues at <console>:45
job: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[39] at map at <console>:49
res0: Array[(String, String, String, String)] = Array((nm0360025,Jay Hanks,War,inclassificabile), (nm0360025,Jay Hanks,Fantasy,inclassificabile), (nm0360025,Jay Hanks,Western,inclassificabile), (nm0360025,Jay Hanks,Musical,inclassificabile), (nm0360025,Jay Hanks,Family,inclassificabile), (nm0360025,Jay Hanks,Horror,inclassifica...


## Job1

versione con ottimizzazione tramite l'utilizzo di broadcast variables per evitare di eseguire join riducendo cosi gli shuffle che devono essere (in questo caso vi è un utilizzo eccessivo delle broadcast variables).

è stato anche provato a utilizzare il metodo aggregateByKey per calcolare la media e il conteggio delle valutazioni per ogni attore e genere e a seguire gia una valutazione sul per assognare la valutazione agli attori.

In [6]:
val rddMovie = rddTitleBasics.filter(_._2 == "movie").flatMap {
    case (tconst, _, _, _, _, _, _, _, genres) => genres.map(genre => (tconst, genre))}

val actorsAndActresses = rddNameBasics.filter { case (_, _, _, _, primaryProfession, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.filter(_._4 == 0).map { case (nconst, primaryName, _, _, _, _) => (nconst, primaryName) }

val broadcastRating = sc.broadcast(rddRatings.filter(_._3 > 500).map { case (tconst, rating, _) => (tconst, rating)}.collectAsMap())

val moviesRating = sc.broadcast(rddPrincipals.filter { case (_, _, _, primaryProfession, _, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.map { case (tconst, _, nconst, category, _, _) => (tconst, nconst) }
    .flatMap { case (tconst, nconst) =>
        broadcastRating.value.get(tconst) match {
            case Some(rating) => Some(tconst, (nconst, rating))
            case None => None
        }
    }.collectAsMap())

val actorGenreRatings = sc.broadcast(rddMovie.flatMap { case (tconst, genre) =>
    moviesRating.value.get(tconst) match {
        case Some((nconst, rating)) => Some((nconst, genre), rating)
        case None => None
    }
    }.aggregateByKey((0.0, 0))(
        (acc, rating) => (acc._1 + rating, acc._2 + 1),
        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).map{
        case ((nconst, genre), (sum, count)) =>
        val sufficiency = if (sum / count > 6 && count >= 2) "sufficiente" else "insufficiente"
        ((nconst, genre), sufficiency)
    }.collectAsMap())

val broadcastGenres = sc.broadcast(rddTitleBasics.filter(_._2 == "movie").flatMap(_._9).filter(x => x!= "\\N").distinct().collect())

val job1 = actorsAndActresses.flatMap { case (nconst, primaryName) =>
    broadcastGenres.value.map(genre => ((nconst, genre), primaryName))
    }.map{ case ((nconst, genre), primaryName) =>
    actorGenreRatings.value.get((nconst, genre)) match {
        case Some(sufficiency) => (nconst, primaryName, genre, sufficiency)
        case None => (nconst, primaryName, genre, "inclassificabile")
        }
    }

spark.time {
    job1.collect()
}

Time taken: 60535 ms


rddMovie: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[41] at flatMap at <console>:37
actorsAndActresses: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[44] at map at <console>:42
broadcastRating: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,Double]] = Broadcast(13)
moviesRating: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,(String, Double)]] = Broadcast(15)
actorGenreRatings: org.apache.spark.broadcast.Broadcast[scala.collection.Map[(String, String),String]] = Broadcast(18)
broadcastGenres: org.apache.spark.broadcast.Broadcast[Array[String]] = Broadcast(21)
job1: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[60] at map at <console>:74
res1: Array[(String, String, String, Strin...


In [7]:
import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner


## Job2

questa versione non utiliazza le broadcast variables ma utilizza un HashPartitioner.

ci si è soffermati sul rimuovere l'operazione di cartesian la quale risulta essere la più costosa in termini di tempo e di risorse.

In [8]:
val p = new HashPartitioner(5)

val distinctGenres = rddTitleBasics.filter(_._2 == "movie").flatMap(_._9).filter(x => x!= "\\N").distinct().collect()

val actorsAndActresses = rddNameBasics.filter { case (_, _, _, _, primaryProfession, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.filter(_._4 == 0).map { case (nconst, primaryName, _, _, _, _) => (nconst, primaryName) }

val moviesRating = rddPrincipals.filter { case (_, _, _, primaryProfession, _, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.map { case (tconst, _, nconst, category, _, _) => (tconst, nconst) }
    .join(rddRatings.filter(_._3 > 500).map { case (tconst, rating, _) => (tconst, rating) }.partitionBy(p))

val actorGenreRatings = rddTitleBasics.filter(_._2 == "movie").flatMap {
    case (tconst, _, _, _, _, _, _, _, genres) => genres.map(genre => (tconst, genre))
    }.join(moviesRating.partitionBy(p))
    .map { case (tconst, (genre, (nconst, rating))) => ((nconst, genre), rating) }
    .groupByKey()
    .mapValues(ratings => (ratings.sum / ratings.size, ratings.size))

val job2 = actorsAndActresses.flatMap { case (nconst, primaryName) =>
    distinctGenres.map(genre => (nconst, primaryName, genre))
    }.map { case (nconst, primaryName, genre) =>
        ((nconst, genre), (primaryName, genre))
    }.leftOuterJoin(actorGenreRatings)
    .map { case ((nconst, genre), ((primaryName, _), maybeRatings)) =>
        val sufficiency = maybeRatings match {
        case Some((avgRating, count)) =>
        if (avgRating > 6 && count >= 2) "sufficiente" else "insufficiente"
        case None => "inclassificabile"
    }
    (nconst, primaryName, genre, sufficiency)
    }
            
spark.time {
    job2.collect()
}

Time taken: 86585 ms


p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@5
distinctGenres: Array[String] = Array(War, Fantasy, Western, Musical, Family, Horror, Crime, Animation, Sport, Adult, History, Thriller, Adventure, Talk-Show, Action, Music, News, Biography, Sci-Fi, Comedy, Documentary, Mystery, Reality-TV, Romance, Drama, Film-Noir)
actorsAndActresses: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[69] at map at <console>:44
moviesRating: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[77] at join at <console>:49
actorGenreRatings: org.apache.spark.rdd.RDD[((String, String), (Double, Int))] = MapPartitionsRDD[85] at mapValues at <console>:56
job2: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[91] at map at...


## Job3

questa versione è sostanzialmente uguale alla precedente ma si è utilizzato un broadcast variable per sostituire le join che implicavano rdd di grandi dimenzioni.

In [9]:
val distinctGenres = sc.broadcast(rddTitleBasics.filter(_._2 == "movie").flatMap(_._9).filter(x => x != "\\N").distinct().collect())

val actorGenrePairs = rddNameBasics.filter { case (_, _, _, _, primaryProfession, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.filter(_._4 == 0).map { case (nconst, primaryName, _, _, _, _) => (nconst, primaryName) }.partitionBy(p).flatMap { case (nconst, primaryName) =>
    distinctGenres.value.map(genre => (nconst, primaryName, genre))
    }

val ratingBroadcast = sc.broadcast(rddRatings.filter(_._3 > 500).map { case (tconst, rating, _) => (tconst, rating) }.collectAsMap())

val moviesRating = rddPrincipals.filter { case (_, _, _, primaryProfession, _, _) =>
    primaryProfession.contains("actor") || primaryProfession.contains("actress")
    }.flatMap { case (tconst, _, nconst, category, _, _) => ratingBroadcast.value.get(tconst).map(rating => (tconst, (nconst, rating))) }

val actorGenreRatings = rddTitleBasics.filter(_._2 == "movie").flatMap {
    case (tconst, _, _, _, _, _, _, _, genres) => genres.map(genre => (tconst, genre))
    }.join(moviesRating.partitionBy(p))
    .map { case (tconst, (genre, (nconst, rating))) => ((nconst, genre), rating) }
    .groupByKey()
    .mapValues(ratings => (ratings.sum / ratings.size, ratings.size))

val job3 = actorGenrePairs.map { case (nconst, primaryName, genre) =>
        ((nconst, genre), (primaryName, genre))
    }.leftOuterJoin(actorGenreRatings)
    .map { case ((nconst, genre), ((primaryName, _), maybeRatings)) =>
        val sufficiency = maybeRatings match {
        case Some((avgRating, count)) =>
        if (avgRating > 6 && count >= 2) "sufficiente" else "insufficiente"
        case None => "inclassificabile"
    }
    (nconst, primaryName, genre, sufficiency)
    }
            
spark.time {
    job3.collect()
}

Time taken: 107267 ms


distinctGenres: org.apache.spark.broadcast.Broadcast[Array[String]] = Broadcast(33)
actorGenrePairs: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[102] at flatMap at <console>:44
ratingBroadcast: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,Double]] = Broadcast(35)
moviesRating: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[106] at flatMap at <console>:52
actorGenreRatings: org.apache.spark.rdd.RDD[((String, String), (Double, Int))] = MapPartitionsRDD[115] at mapValues at <console>:59
job3: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[120] at map at <console>:64
res3: Array[(String, String, String, String)] = Array((nm0769925,Djamchid 'Jim' Soheili,Musical,inclassificabile), (nm4...


## Valutazione dei job in esecuzione sulla macchina locale

![Grafico dei processi](resorc/graf1.png)

### **Confronto delle Prestazioni**
Rispetto a **Job0**, sono stati ottenuti discreti miglioramenti in termini di velocità di esecuzione:

1. **Job1**:
   - Risulta **3,72 volte più veloce** di Job0.

2. **Job2**:
   - Risulta **2,60 volte più veloce** di Job0.

3. **Job3**:
   - Risulta **2,10 volte più veloce** di Job0.
      

è risultato inaspettato che il Job1 fosse poi efficente del Job2 e Job3, questo probabilmente è dovuto alla riduzione del dataset che è stata eseguita per poter eseguire i Jobs sul dispositivo locale.


## Valutazione dei job in esecuzione sul cluster

In [None]:
aws emr create-cluster \
    --name "Big Data Cluster" \
    --release-label "emr-7.3.0" \
    --applications Name=Hadoop Name=Spark \
    --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=5,InstanceType=m4.large \
    --service-role EMR_DefaultRole \
    --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName=<my_key_pair_name> \
    --region "us-east-1"

il cluster che è stato utilizzato, è stata scelta questa configuarazione perche con configurazioni poù piccole era quasi impossibile eseguire il Job0, in quanto il cluster impiegavano troppo tempo.

---

### **Confronto delle Prestazioni**

![Grafico dei processi](resorc/graf2.png)

il Job0 risulta cosi inefficiente rispetto agli altri Jobs, questo è dovuto al fatto che il Job0 esegue un cartesian tra due rdd di grandi dimensioni, questo comporta un grande numero di shuffle e quindi un tempo di esecuzione molto elevato.
La sola operazione di cartesian impiega 2,2 h e ha come input 24,9 GB, è il motivo per il quale nelle successive versioni si è cercato di evitare di eseguire questa operazione.

### grafico delle prestazioni senza Job0

![Grafico dei processi](resorc/graf3.png)

concentrandoci solamente sui Job1, Job2 e Job3 si può notare come la situazione si sia invertita rispetto all'esecuzione locale.

### valutazione Job1

nel Job1 risultano esserci stati 5 job ognuno dei quale formato da un solo stage, tranne nel caso della presenza delle operazioni di distinct e aggregateByKey che hanno formato 2 stage, dovuto al fatto che queste operazioni richiedono uno shuffle.

Job1 ha uno speed-up di 16,23 rispetto al Job0, questo è dovuto al fatto che si è evitato di eseguire il cartesian tra due rdd di grandi dimensioni, sostituendolo con una broadcast variable.

### valutazione Job2

nel Job2 risultano esserci stati 2 job, uno per calcolare i vari genere, formato da 2 stage dovuti alla operazione di distinct, e l'altro che contiene tutte le restanti operazioni, formato da 8 stage. Questa differenza rispetto a job1 è dovuta al fatto che non vi è l'utilizzo di broadcast variables.

Job2 ha uno speed-up di 20,09 rispetto al Job0, questo è dovuto quasi interamente al fatto che si è evitato di eseguire il cartesian tra due rdd di grandi dimensioni, sostituendolo con una join.

Job2 risulta a vere uno speed-up di 1,29 rispetto al Job1, questo è dovuto al fatto che non utilizzando broadcast variables si è riuscito a parallelizzare meglio il calcolo.

### valutazione Job3

nel job3 risultano esserci stati 3 job, uno per calcolare i vari genere salvati in una broadcast variable, formato da 2 stage dovuti alla operazione di distinct, un altro per valvare in un'altra broadcast variable i le valutazioni dei film e l'altro che contiene tutte le restanti operazioni, formato da 6 stage.

Job3 ha uno speed-up di 21,56 rispetto al Job0, dovuto come nei casi precedenti quasi interamente all'aver evitato l'utilizzo della operazione di cartesian.

Job3 risulta essere leggermente più efficente del Job2, dovuto all'utilizzo delle broadcast variables dove risultava maggiormente necessario