# Optimización de Consultas

creamos la sesión de Spark

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` 

import org.apache.spark.sql.{NotebookSparkSession, SparkSession}

val spark: SparkSession = 
    NotebookSparkSession
      .builder()
      .appName("Queries Optimization")
      .master("local[*]")
      .getOrCreate()


In [None]:
import $ivy.`org.plotly-scala::plotly-almond:0.8.1`

import plotly._
import plotly.element._
import plotly.layout._
import plotly.Almond._

In [None]:
import $ivy.`ch.cern.sparkmeasure:spark-measure_2.12:0.17`

Logging

In [None]:
import org.slf4j.LoggerFactory
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger().setLevel(Level.ERROR)

imports

In [None]:
import spark.implicits._
import spark.sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.{functions => func, _}
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark._
import org.apache.spark.sql.types._, func._

# Los Datos

El dataset ha sido obtenido de:
https://www.ecdc.europa.eu/en/publications-data/download-todays-data-geographic-distribution-covid-19-cases-worldwide

En el se observan los casos diarios de Covid-19 por país hasta el 14-12-20

En la segunda parte se utilizan los datos de las medidas aplicadas a cada país por fecha de inicio y fin:

https://www.ecdc.europa.eu/en/publications-data/download-data-response-measures-covid-19

La última consulta para calcular las infecciones por km2:

https://www.kaggle.com/tanuprabhu/population-by-country-2020

## Creo una clase para trabajar con infecciones 

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class Infection(day : Int, 
                     month : Int, 
                     year : Int, 
                     nCases: Int, 
                     nDeaths : Int, 
                     country : String,  
                     continent : String) 
extends Serializable

Y un método para medir tiempos de ejecución

In [None]:
def run[A](code: => A): A = {
    val start = System.currentTimeMillis()
    val res = code
    println(s"Took ${System.currentTimeMillis() - start}")
    res
}

In [None]:
def runWithOutput[A](code: => A): Int = {
    val start = System.currentTimeMillis()
    val res = code
    val out = System.currentTimeMillis() - start
    println(s"Took ${System.currentTimeMillis() - start}")
    out.toInt
}

### Para utilizar showHTML() no he descubierto como importarlo

In [None]:
// Credit to Aivean
implicit class RichDF(val ds:DataFrame) {
    def showHTML(limit:Int = 20, truncate: Int = 20) = {
        import xml.Utility.escape
        val data = ds.take(limit)
        val header = ds.schema.fieldNames.toSeq        
        val rows: Seq[Seq[String]] = data.map { row =>
          row.toSeq.map { cell =>
            val str = cell match {
              case null => "null"
              case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
              case array: Array[_] => array.mkString("[", ", ", "]")
              case seq: Seq[_] => seq.mkString("[", ", ", "]")
              case _ => cell.toString
            }
            if (truncate > 0 && str.length > truncate) {
              // do not show ellipses for strings shorter than 4 characters.
              if (truncate < 4) str.substring(0, truncate)
              else str.substring(0, truncate - 3) + "..."
            } else {
              str
            }
          }: Seq[String]
        }
publish.html(s""" <table>
                <tr>
                 ${header.map(h => s"<th>${escape(h)}</th>").mkString}
                </tr>
                ${rows.map { row =>
                  s"<tr>${row.map{c => s"<td>${escape(c)}</td>" }.mkString}</tr>"
                }.mkString}
            </table>
        """)        
    }
}

## Calcularemos la media usando solo Scala sin que sea distribuido
(Work in progress)

In [None]:
  def infectionsScala(lines : List[String]) : List[Infection] =
    lines.map(line => {
      val arr = line.split(",")
      Infection(day = arr(1).toInt,
        month = arr(2).toInt,
        year = arr(3).toInt,
        nCases = arr(4).toInt,
        nDeaths = arr(5).toInt,
        country = arr(6),
        continent = arr(10))
    })

In [None]:
val content= scala.io.Source.fromFile("data.csv").getLines

In [None]:
// infectionsScala(content)

In [None]:
/*  def infectionArrayGrowthAverage(infections : List[Infection]) : Map[String, Int]= {

    val countriesAndCases : Map[String, List[(String, Int)]] = 
      infections.map(x => (x.country,x.nCases))
      .groupBy(_._1)
      
    countriesAndCases.mapValues(x => (x.sum / x.size))
  }
*/

In [None]:
/* val bufferedSource = scala.io.Source.fromFile("data.csv")
  for (line <- bufferedSource.getLines) {
    val cols = line.split(",").map(_.trim)

      // do whatever you want with the columns here
      
    println(s"${cols(0)}|${cols(1)}|${cols(2)}|${cols(3)}|${cols(4)}|${cols(5)}|${cols(6)}")
  }
  bufferedSource.close
*/

# Empiezo trabajando con RDDs

In [None]:
val infectionData = spark.sparkContext.textFile("data.csv")

Creo una funcion para trabajar con un RDD de infecciones

In [None]:
def infections(lines : RDD[String]) : RDD[Infection] =
    lines.map(line => {
      val arr = line.split(",")
      Infection(
        day = arr(1).toInt,
        month = arr(2).toInt,
        year = arr(3).toInt,
        nCases = arr(4).toInt,
        nDeaths = arr(5).toInt,
        country = arr(6),
        continent = arr(10)
      )
    })

Calculo la media de infecciones diarias por país trabajando con pair RDD

In [None]:
  def infectionGrowthAverage(infections : RDD[Infection]) : RDD[(String, Int)]= {

    val countriesAndCases : RDD[(String, Iterable[Int])] = 
      infections.map(x => (x.country,x.nCases))
      .groupByKey()
      
    countriesAndCases.mapValues(x => (x.sum / x.size)).sortBy(_._2)
  }

Muestro el resultado y el tiempo de ejecución

In [None]:
val infectionRDD = infections(infectionData)
val infectionAvgRDD = infectionGrowthAverage(infectionRDD)

In [None]:
val timeRDD = spark.time(infectionAvgRDD.collect())

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(infectionAvgRDD.collect())

# Hago los mismos calculos con un DataFrame

Convierto el RDD obtenido previamente en un DataFrame para inferir la clase infección

In [None]:
val infectionDF = spark.createDataFrame(infectionRDD)

Utilizo los métodos de la clase DF que incluye uno optimizado para calcular la media.

Ejecuto y comprabamos como el tiempo de ejecución es significativamente menor que en RDD

In [None]:
val infAvgOrDf = infectionDF.
    groupBy("country")
    .avg("nCases")
    .orderBy(desc("avg(nCases)"))

In [None]:
infAvgOrDf.showHTML()

In [None]:
spark.time(infAvgOrDf.count())

In [None]:
val timeDF = spark.time(infAvgOrDf.collect)

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(infAvgOrDf.collect)

Otra opción es crear el DataFrame directamente importando los datos pero deja de ser un DF de infecciones

In [None]:
val dfCovid = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.option("inferSchema", "true")
.csv("covidworldwide.csv")

In [None]:
dfCovid.schema

In [None]:
dfCovid.explain

In [None]:
run(dfCovid.toDF.groupBy("countriesAndTerritories")
    .agg(mean("cases")).orderBy("avg(cases)")).show(2000)

puedo definir el esquema manualmente para crear el DataFrame

In [None]:
//Defino el esquema manualmente pero podría verlo importando el csv y viendo como lo hace de base spark

val schema = new StructType()
    .add("dateRep",StringType,true)
    .add("day",IntegerType,true)
    .add("month",IntegerType,true)
    .add("year",IntegerType,true)
    .add("cases",IntegerType,true)
    .add("deaths",IntegerType,true)
    .add("countriesAndTerritories",StringType,true)
    .add("geoId",StringType,true)
    .add("countryterritoryCode",StringType,true)
    .add("popData2018",IntegerType,true)
    .add("continentExp",StringType,true)

In [None]:
val df = spark.read
.format("csv")
.option("header","true")
.schema(schema)
.load("data.csv")

In [None]:
df.printSchema()

In [None]:
df.show()

# Y con un DataSet

In [None]:
val infectionDS = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv("covidworldwide.csv")
.as[(String,String,String,String,String,String,String,String,String,String,String,String)]

In [None]:
val avgDS = 
    infectionDS.groupBy($"countriesAndTerritories")
    .agg(avg($"cases").as[Double])
    .orderBy("avg(cases)")

In [None]:
spark.time(avgDS.count)

In [None]:
val timeDS = spark.time(avgDS.collect)

In [None]:
val timeDataSet = runWithOutput(avgDS.collect)

### Otras opciones menos eficientes
(Work in progress)

In [None]:
/* infectionDS.groupByKey(p => p._7) //hace shuffling de los datos
        .mapValues(p => p._5.toDouble)
        .mapGroups((k,vs) => (???))
*/

In [None]:
/* infectionDS.groupByKey(p => p._7) //mas eficiente
        .mapValues(p => p._5.toDouble)
        .reduceGroups((acc,str)=> ???)
*/

In [None]:
/* val infGrowAvg = new Aggregator[]{ //utilizando un Aggregator
    
}.toColumn 
*/

### Intento trabajar con DataSet de infecciones Dataset[Infection] pero falla 
(Work in progress)

Esto nos dará error pues no se pueden crear datasets sin tipo de datos y no entiende las infecciones

In [None]:
val infectionDS = spark.createDataset(infectionRDD)

In [None]:
val infectionDS = spark.createDataset(infectionRDD).as[Infection]

In [None]:
val ds : Dataset[Infection] = spark.createDataset(infectionRDD).as[Infection]

Necesito importar los Encoders y explicitar el tipo de datos

In [None]:
import org.apache.spark.sql.Encoders
Encoders.product[Infection]

In [None]:
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.Encoders

val dataset = spark.createDataset(infectionRDD)(Encoders.product[Infection])

(A partir de aquí intento crear un DataSet de infecciones pero siempre obtengo el mismo error)

In [None]:
val infectionDS = spark.createDataset(infectionRDD).as[Infection]

In [None]:
  def infections(lines : Dataset[(String,String,String,String,String,String,String,String,String,String,String,String)]) 
                       : Dataset[Infection] = 
    lines.map(line => {
      Infection(
        day = line._1.toInt,
        month = line._2.toInt,
        year = line._3.toInt,
        nCases = line._4.toInt,
        nDeaths = line._5.toInt,
        country = line._6,
        continent = line._10
      )
    })

In [None]:
infections(infectionDS).as[Infection]

# Utilizo una segunda tabla y cruzo datos con RDD, DS y DF

## Creo una consulta para calcular la media de infecciones por Km2

### Utilizando RDDs

In [None]:
val populationData = spark.sparkContext.textFile("population_by_country_2020.csv")

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class Population(
    country : String, 
    population : Int, 
    density : Int, 
    land_area: Int, 
    ) 
extends Serializable

Limpio la primera linea del CSV y creo un RDD de población

In [None]:
val header = populationData.first() 

def population(lines : RDD[String]) : RDD[Population] =
    lines.filter(x => x != header)
    .map(line => {
      val arr = line.split(",")
      Population(
        country = arr(0),
        population = arr(1).toInt,
        density = arr(4).toInt,
        land_area = arr(5).toInt,
      )
    })

Compruebo que se visualizan correctamente los datos

In [None]:
val populationRDD = population(populationData)
populationRDD.toDF.showHTML()

### Un join computacionalmente pesado desde el principio ya que cruza todos los datos sin quedarnos con los que nos interesen

Spark no me deja hacer un Join de RDD que no sean pair RDD así que tenemos que construirlo

In [None]:
populationRDD.join(infectionRDD)

Construyo Pair RDDs conservando todos los datos

In [None]:
val populationByCountry = populationRDD.map(
    x => (x.country,x))

val infectionByCountry = 
      infectionRDD.map(x => (x.country,x))

Hago el Join y agrupo por paises

In [None]:
val megaRDD = infectionByCountry.join(populationByCountry).groupByKey()

Finalmente calculo la media

In [None]:
megaRDD.mapValues(
    x => x.map( 
        line => line._1.nCases.toFloat / line._2.land_area.toFloat
    )).mapValues(
    x => x.sum / x.size
).collect()

Lo hago todo en una única operación para calcular el tiempo de ejecución

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(
    infectionByCountry.join(populationByCountry)
    .groupByKey()
    .mapValues(
    x => x.map( 
        line => line._1.nCases.toFloat / line._2.land_area.toFloat)
    ).mapValues(
        x => x.sum / x.size
    ).collect()
)

¿Hay alguna diferencia cruzando los datos en orden inverso? Parece que no

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(
    populationByCountry.join(infectionByCountry)
    .groupByKey()
    .mapValues(
    x => x.map( 
        line => line._1.land_area.toFloat / line._2.nCases.toFloat)
    ).mapValues(
        x => x.sum / x.size
    ).collect()
)

#### Para optimizar un poco esta consulta:

Despejo solo los datos que me interesan para trabajar con Pair RDDs y optimizar la consulta

In [None]:
val countriesAndLandArea = populationRDD.map(
    x => (x.country,x.land_area))

In [None]:
val countriesAndCases = 
      infectionRDD.map(x => (x.country,x.nCases))
      .groupByKey()

Ejecuto un join y trabajo para calcular primero la media de infecciones por Km2 diaria, 
para luego calcular la media total

In [None]:
val average = countriesAndCases.join(countriesAndLandArea)

In [None]:
average.mapValues(
    x => x._1.map(
        y => (y.toFloat / x._2.toFloat)
    )).mapValues(
    x => x.sum/x.size
).collect()

Lo hago todo en una única operación para calcular el tiempo de ejecución

In [None]:
val querie1 =
countriesAndCases.join(countriesAndLandArea)   
.mapValues(
    x => x._1.map(
        y => (y.toFloat / x._2.toFloat)
    )).mapValues(
    x => x.sum / x.size
)

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(
countriesAndCases.join(countriesAndLandArea)   
.mapValues(
    x => x._1.map(
        y => (y.toFloat / x._2.toFloat)
    )).mapValues(
    x => x.sum / x.size
).collect())

#### Pruebas : producto cartesiano y demás (work in progress)

In [None]:
populationRDD.cartesian(infectionRDD).take(10)

In [None]:
val joined = infectionAvgRDD.join(countriesAndLandArea)

In [None]:
infectionAvgRDD.collect()
countriesAndLandArea.collect()
joined.collect()

In [None]:
joined.map( x => (x._1,(x._2._1 / x._2._2) : Float)).foreach(println)

### Consulta con DataSet

In [None]:
val infectionDS = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.option("inferSchema", "true")
.csv("covidworldwide.csv")
.as[(String,String,String,String,Double,Double,String,String,String,String,String,String)]

In [None]:
val dsPopulation = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.option("inferSchema", "true")
.csv("population_by_country_2020.csv")
.as[(String,Float,String,Float,Float,Float,Double,String,String,String,String)]

In [None]:
val querie2 = 
infectionDS.join(dsPopulation, $"Country (or dependency)" === $"countriesAndTerritories")
        .select($"Country (or dependency)" as "Country",
                $"dateRep" as "date",
                $"cases",
                $"Land Area (Km\u00b2)",
                $"cases" / $"Land Area (Km\u00b2)" as "infection Per Km\u00b2")
        .groupBy("Country")
        .avg("infection Per Km\u00b2")
        .orderBy(asc("avg(infection Per Km²)"))

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(
infectionDS.join(dsPopulation, $"Country (or dependency)" === $"countriesAndTerritories")
        .select($"Country (or dependency)" as "Country",
                $"dateRep" as "date",
                $"cases",
                $"Land Area (Km\u00b2)",
                $"cases" / $"Land Area (Km\u00b2)" as "infection Per Km\u00b2")
        .groupBy("Country")
        .avg("infection Per Km\u00b2")
        .orderBy(asc("avg(infection Per Km²)"))
        .collect())

## intentando dataset mejor tipado...

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class InfectionRow(
        date : String,
        day : String,
        month : String,
        year : String,
        nCases: Double,
        nDeaths : Double,
        country : String,
        geoID :String,
        territoryCode : String,
        population :Int,
        continent : String,
        cumulative_14_days : Float
)

In [None]:
def infectionDSTyped(infectionDF: DataFrame): Dataset[InfectionRow] =
    infectionDF.map(r => InfectionRow(
      r.getAs[String]("dateRep"),
      r.getAs[String]("day"),
      r.getAs[String]("month"),
      r.getAs[String]("year"),
      r.getAs[Double]("nCases"),
      r.getAs[Double]("nDeaths"),
      r.getAs[String]("Country"),
      r.getAs[String]("geoID"),
      r.getAs[String]("territoryCode"),
      r.getAs[Int]("population"),
      r.getAs[String]("continent"),
      r.getAs[Float]("cumulative_14_days")
    ))

In [None]:
infectionDSTyped(infectionDS).show

In [None]:
infectionDS.join(dsPopulation, "Country" ).showHTML()

In [None]:
val querie2 = 
infectionDS.join(dsPopulation, "Country")
        .map(r => 
             (r.getString(0),r.getTimestamp(1),r.getInt(5),r.getInt(16),r.getInt(16)/r.getInt(16)))
        .groupByKey()
        .agg(round(avg(_._5),10).as[Float])
        .orderBy(desc(_._5))

In [None]:
infectionDS.join(dsPopulation, "Country")
        .map(r => 
             (r.getString(0),r.getString(1),r.getInt(5),r.getInt(16),r.getInt(16)/r.getInt(16)))
        .groupByKey(_._1)
        .mapValues(mean())

### Consulta con DataFrame

In [None]:
val dfCovid = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.option("inferSchema", "true")
.csv("covidworldwide.csv")

In [None]:
val dfMeasures = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.option("inferSchema", "true")
.csv("response_graphs_data_2021-04-15.csv")
dfMeasures.show
dfMeasures.schema

In [None]:
val dfPopulation = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.option("inferSchema", "true")
.csv("population_by_country_2020.csv")
.withColumnRenamed("Country (or dependency)","Country")
.withColumnRenamed("Population (2020)","Population")
dfPopulation.showHTML()
dfPopulation.schema

Modifico los datos de entrada para que el formato fecha se adecue al TimeStamp de Spark

In [None]:
val dfCovidClean = dfCovid.select($"*",$"dateRep",translate($"dateRep","/","-").as("new-date")).drop("dateRep").show()

Hago una consulta de prueba para obtener la media solo de los casos en España

In [None]:
val spainCovid = dfCovid.select("dateRep","cases").where("countriesAndTerritories == 'Spain'").toDF

In [None]:
run(spainCovid.agg(avg("cases"))).show

Cruzo los datos con un Join y hago algunas consultas sencillas

In [None]:
val megaDF = dfCovid.join(dfMeasures, $"Country" === $"countriesAndTerritories")

In [None]:
megaDF.select("cases","deaths","dateRep","Response_measure")
    .where("countriesAndTerritories == 'Spain'").show

In [None]:
run(dfCovid.join(dfMeasures, $"Country" === $"countriesAndTerritories")
        .select("cases","deaths","dateRep","Response_measure")
        .where("countriesAndTerritories == 'Spain'").collect())

### Finalmente ejecuto la consulta de nuestro caso de uso, infecciones por Km2

In [None]:
val querie3 = 
dfCovid.join(dfPopulation, $"country" === $"countriesAndTerritories")
        .select($"country",
                $"dateRep" as "date",
                $"cases",
                $"Land Area (Km\u00b2)",
                $"cases" / $"Land Area (Km\u00b2)" as "infection Per Km\u00b2")
        .groupBy("country")
        .avg("infection Per Km\u00b2")
        .orderBy(desc("avg(infection Per Km²)"))
        .collect

In [None]:
ch.cern.sparkmeasure.StageMetrics(spark).runAndMeasure(
dfCovid.join(dfPopulation, $"country" === $"countriesAndTerritories")
        .select($"country",
                $"dateRep" as "date",
                $"cases",
                $"Land Area (Km\u00b2)",
                $"cases" / $"Land Area (Km\u00b2)" as "infection Per Km\u00b2")
        .groupBy("country")
        .avg("infection Per Km\u00b2")
        .orderBy(desc("avg(infection Per Km²)"))
        .collect()
    )

### Casos por número de habitante

In [None]:
val infectionsPerPopulation = dfCovid.join(dfPopulation, $"country" === $"countriesAndTerritories")
        .select($"country",
                $"dateRep" as "date",
                $"cases",
                $"Population",
                $"cases" / $"Population" as "infection Per Population")
        .groupBy("country")
        .avg("infection Per Population")
        .orderBy(desc("avg(infection Per Population)"))
        .collect

# Notas y observaciones personales interesantes 

In [None]:
dfCovid.select("dateRep","countriesAndTerritories","cases").show //aplico los métodos de DFs

In [None]:
dfCovid.createOrReplaceTempView("covid") //se pueden aplicar consultas SQL sobre DF

spark.sql("SELECT * FROM covid WHERE countriesAndTerritories == 'Spain'").show

In [None]:
//filtrados sobre dataframes
dfCovid.filter($"continentExp" === "Asia" || $"continentExp" === "Europe").sort($"continentExp".asc).show()

# Visualización de datos con plotly

In [None]:
val (x, y) = Seq(
  "Banana" -> 10,
  "Apple" -> 8,
  "Grapefruit" -> 5
).unzip

Bar(x, y).plot()

In [None]:
val (x,y) = infAvgOrDf.collect.map(r=>(r(0).toString, r(1).toString.toDouble)).toList.unzip
Bar(x, y).plot()

In [None]:
val (x,y) = querie3.map(r=>(r(0).toString, r(1).toString.toFloat)).toList.unzip
Bar(x, y).plot()

In [None]:
val (x,y) = infectionsPerPopulation.map(r=>(r(0).toString, r(1).toString)).toList.unzip
Bar(x, y).plot()

# Visualización de eficiencia

para la querie de media de infecciones diarias:

In [None]:
val (x, y) = Seq(
  "RDD" -> runWithOutput(infectionAvgRDD.collect),
  "DataSet" -> runWithOutput(infAvgOrDf.collect),
  "DataFrame" -> runWithOutput(avgDS.collect)
).unzip

Bar(x, y).plot()

para la querie de infecciones por km2

In [None]:
val (x, y) = Seq(
  "RDD" -> runWithOutput(querie1.collect),
  "DataSet" -> runWithOutput(querie2.collect),
  "DataFrame" -> runWithOutput(querie3.collect)
).unzip

Bar(x, y).plot()