In [None]:
import $file.sparksession
import sparksession._
import spark.implicits._
import org.apache.spark._
import org.apache.spark.sql.{functions => func, _}
import org.apache.spark.sql.types._
import scala.util.Random
import akka.stream.scaladsl.{Flow, Source}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

implicit val as = ActorSystem()
implicit val am = ActorMaterializer()

val random = new Random()

spark.read.json("spanish-la-liga/data/*.json")
    .select(
        func.explode(
            func.array_repeat(
                func.struct(
                    $"HomeTeam".alias("home"), 
                    $"AwayTeam".alias("away"), 
                    $"FTHG".alias("homeGoals"), 
                    $"FTAG".alias("awayGoals"), 
                    $"HTHG".alias("homeGoalsHT"), 
                    $"HTAG".alias("awayGoalsHT"),
                    func.current_date().alias("date"),
                    func.lit(random.alphanumeric.take(1000).toString).alias("description")
                ),
                1000
            )
        ).alias("value")
    )
    .selectExpr("value.*")
    .write.mode(SaveMode.Overwrite).parquet("data/la-liga.parquet")

Seq(
    "Andalucía" -> Seq("Betis", "Malaga", "Xerez", "Almeria", "Granada"),
    "C. Valenciana" -> Seq("Villarreal", "Elche"),
    "Catalunya" -> Seq("Espanyol", "Barcelona"),
    "Madrid" -> Seq("Ath Madrid", "Real Madrid", "Vallecano", "Leganes"),
    "Euskadi" -> Seq("Ath Bilbao", "Eibar"),
    "Galiza" -> Seq("Celta", "La Coruna"),
    "Cantabria" -> Seq("Santander"),
    "Aragón" -> Seq("Zaragoza"),
    "Canarias" -> Seq("Tenerife")
).flatMap{
    case (region, teams) => teams.map((_, region))
}.toDF.select(
    $"_1".alias("team"),
    $"_2".alias("region")
)
.write.mode(SaveMode.Overwrite).parquet("data/dictionary.parquet")

# SPARK UI http://localhost:4040/jobs/

In [None]:
val ligaDF = spark.read.parquet("data/la-liga.parquet") // Eager execution (schema read)
ligaDF.printSchema

In [None]:
ligaDF.show()

## Declarative

In [None]:
val homeGoalsPerTeam =
    ligaDF
        .groupBy($"home".alias("team"))
        .agg(func.sum($"homeGoals").alias("goals"))

In [None]:
homeGoalsPerTeam.explain(true)

In [None]:
homeGoalsPerTeam.write.mode(SaveMode.Overwrite).parquet("aggregation.parquet")

In [None]:
spark.read.parquet("aggregation.parquet").show

# Optimizations

## Pushdown filters

In [None]:
ligaDF.printSchema

In [None]:
val goalsPerTeam = 
    ligaDF
        .where($"away" =!= "Betis")
        .select(
        func.explode(
            func.array(
                func.struct(
                    $"home".alias("team"),
                    $"homeGoals".alias("goals"),
                ),
                func.struct(
                    $"away".alias("team"),
                    $"awayGoals".alias("goals"),
                )
            )
        ).alias("team")
    ).selectExpr("team.*")
    
    .groupBy($"team")
    .agg(func.sum($"goals").alias("goals"))


goalsPerTeam.explain(true)

In [None]:
run(
    goalsPerTeam
        .write.mode(SaveMode.Overwrite).parquet("data/goals-per-team-df.parquet")
)

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Necessary to declare case classes
import java.util.Date

case class Match(
    home: String,
    away: String,
    homeGoals: Option[Long],
    awayGoals: Option[Long],
    homeGoalsHT: Option[Long],
    awayGoalsHT: Option[Long],
    description: String
)

ligaDF
    .as[Match]
    .flatMap{
        m => 
         Seq((m.home, m.homeGoals), (m.away, m.awayGoals) )
    }.groupByKey(_._1)
    .mapValues(_._2.getOrElse(0L))
    .reduceGroups(_ + _)
    .toDF("team", "goals")
    .write.mode(SaveMode.Overwrite).parquet("data/goals-per-team-df2.parquet")

In [None]:
spark.read.parquet("data/goals-per-team-df.parquet").show

## PushDown from inner stage

In [None]:
spark.read.parquet("data/dictionary.parquet").printSchema

In [None]:
val threshold = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val teamsMetadata =  spark.read.parquet("data/dictionary.parquet")

run(
    ligaDF
        .join(teamsMetadata, $"home" === $"team")
        .where($"home" === "Betis")
        .write.mode(SaveMode.Overwrite).parquet("data/teams-no-betis-df.parquet")
)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", threshold)

In [None]:
spark.read.parquet("data/teams-no-betis-df.parquet").show

In [None]:
val threshold = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Necessary to declare case classes

case class Match(
    home: String,
    away: String,
    homeGoals: Option[Long],
    awayGoals: Option[Long],
    homeGoalsHT: Option[Long],
    awayGoalsHT: Option[Long],
    description: String
)

case class TeamDictionary(
    team: String,
    region: String
)

val teamsMetadata =  spark.read.parquet("data/dictionary.parquet")

run(
    ligaDF
        .as[Match]
        .joinWith(teamsMetadata.as[TeamDictionary], $"home" === $"team")
        .filter(_._1.home != "Betis")
        .selectExpr("_1.*", "_2.*")
        .write.mode(SaveMode.Overwrite).parquet("data/teams-no-betis-ds.parquet")
)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", threshold)

In [None]:
spark.read.parquet("data/teams-no-betis-ds.parquet").show

## Caching

In [None]:
def someHardCalculation(v: Long): Long = {
    Thread.sleep(20) // simulates a computationally expensive operation
    v * 10
}

// This transformation takes some time
val computedData = spark.range(0, 1000).map(someHardCalculation(_))

val cachedDS = computedData.cache() // equivalent to computedData.persist(StorageLevel.MEMORY_ONLY)

In [None]:
cachedDS.explain()

In [None]:
run(cachedDS.reduce(_ + _))

run(cachedDS.count())

# Not so good
## Caching

In [None]:
computedData.unpersist()

In [None]:
cachedDS.explain()

In [None]:
run(cachedDS.reduce(_ + _))

run(cachedDS.count())

## Hidden side effects

In [None]:
spark.read.text("data.json")

In [None]:
spark.read.text("data2.json")

## Encoder generation unchecked at compilation

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Necessary to declare case classes
import java.util.Date

case class Match(
    home: String,
    away: String,
    homeGoals: Option[Long],
    awayGoals: Option[Long],
    homeGoalsHT: Option[Long],
    awayGoalsHT: Option[Long],
    date: Date
)

def readMatches(): Dataset[Match] = {
    ligaDF.as[Match]
}

In [None]:
readMatches()

## Some more optimizations could be added to Dataset API

In [None]:
spark.read.parquet("data/la-liga.parquet")
    .select($"away")

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Necessary to declare case classes

case class Match(
    home: String,
    away: String,
    homeGoals: Option[Long],
    awayGoals: Option[Long],
    homeGoalsHT: Option[Long],
    awayGoalsHT: Option[Long]
)

ligaDF
    .as[Match]
    .map(_.away)

## Fine-grained parallelism

When a function's complexity depends on the row value it's not so easy to parallelize on Spark

### Spark naive implementation

In [None]:
def someHardComputation(v: Long): Long = {
    Thread.sleep(10)
    v * 10L
}

run(
    spark
        .range(0L, 100L)
        .map(
            i => if(i == 0) 10000L else i.toLong
        )
        .repartition(4).map(
    i => 0L.to(i.toLong).map(someHardComputation(_)).sum 
).reduce(_ + _)
)


In [None]:
spark.read.parquet("composition.parquet").show

### Akka Stream naive implementation

In [None]:
val serie = 0L.to(100L).map(
            i => if(i == 0) 10000L else i.toLong
        )

In [None]:
runAsync(
    Source(serie)
         .map{
            v =>
                v -> 0L.to(v).map(someHardCalculation).sum
        }
        .runFold(List.empty[(Long, Long)])((l, t) => t :: l)
)

### Akka stream optimized implementation

In [None]:
def someHardCalculation(v: Long) = Future {
    Thread.sleep(20) // simulates a computationally expensive operation
    v * 10
}

// Empezar por el programa malo

def calculate(v: Long) = 
    Source.fromIterator(() => 0L.to(v).iterator)
        .mapAsync(4)(someHardCalculation)
        .reduce(_ + _)
        .map(res => v -> res)

runAsync(
    Source(serie)
        .flatMapConcat(calculate)
        .runFold(List.empty[(Long, Long)])((l, t) => t :: l)
)

### Spark optimized implementation

In [None]:
def someHardCalculation(v: Long): Long = {
    Thread.sleep(20) // simulates a computationally expensive operation
    v * 10
}

run(
    spark
        .range(0L, 100L)
        .map(
            i => if(i == 0) 10000L else i.toLong
        )
        .flatMap(
            i =>
                0L.to(i).map(j => (i, j))
        )
        .map{
            case (i, j) => i -> someHardComputation(j)
        }
        .groupBy($"_1".alias("id"))
        .agg(func.sum($"_2").alias("value"))
        .select($"value")
        .as[Long]
        .reduce(_ + _)
)

In [None]:
spark.read.parquet("composition.parquet").show