## We start by creating the spark session and adding the Scala Kernel

### Imports 

Some necessary and useful imports

In [None]:
import $file.common
import org.apache.spark._
import org.apache.spark.sql.{functions => func, _}
import org.apache.spark.sql.types._, func._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType, _}
import org.slf4j.LoggerFactory
import org.apache.log4j.{Level, Logger}
import spark.implicits._
Logger.getRootLogger().setLevel(Level.ERROR)

### Global variables

We define global variables that in the SBT project will be passed as arguments, here as a case study we will asume them since the beggining

In [None]:
var NMATCHES = 10
val ACCOUNT_ID = "639740" //Global variable in case the account ID should be passed as an argument
val URL_PLAYER = "https://api.opendota.com/api/players/" ++ ACCOUNT_ID ++ "/recentMatches"

# Extract

We call the API endpoint with Scala IO methods. It is the simplest way to access the data as no api keys are needed. and we directly convert the data to a DataFrame to work with it.

In [None]:
def GetUrlContentJson(url: String): DataFrame = {
    val result = scala.io.Source.fromURL(url).mkString
    val jsonResponseOneLine = result.toString().stripLineEnd
    val jsonRdd = spark.sparkContext.parallelize(jsonResponseOneLine :: Nil)
    spark.read.json(jsonRdd)
  }

# Transform

Different functions are developed here to calculate the KPIs as asked

this function will return a list of strings with all the matches IDs from a player, so that we can later on build a Dataframe joining the matches

In [None]:
//Gets all matches from a player and transforms them to a List of match IDs
  def getPlayerMatches(userDF: DataFrame ,nMatches: Int): List[String] = {
    userDF.limit(nMatches).select("match_id").rdd.map(r => r(0).toString).collect.toList
  }

#### The following functions computes the KDAs from a player, being the average, the maximum and the minimum.

In [None]:
 def avgKdaComputation(userDF: DataFrame, nMatches: Int): Float = {
    userDF
      .limit(nMatches)
      .select(
        $"kills" + $"assists" as "KA",
        $"deaths")
      .select($"KA" / $"deaths" as "KDA")
      .agg(mean("KDA"))
      .withColumn("avg(KDA)", round($"avg(KDA)", 2))
      .first()
      .getDouble(0)
      .toFloat
  }

In [None]:
def maxKdaComputation(userDF: DataFrame, nMatches: Int): Float = {
    userDF
      .limit(nMatches)
      .select(
        $"kills" + $"assists" as "KA",
        $"deaths")
      .select($"KA" / $"deaths" as "KDA")
      .orderBy($"KDA".desc)
      .withColumn("KDA", round($"KDA",2))
      .first()
      .getDouble(0).toFloat
  }

In [None]:
def minKdaComputation(userDF: DataFrame, nMatches: Int): Float = {
    userDF
      .limit(nMatches)
      .select(
        $"kills" + $"assists" as "KA",
        $"deaths")
      .select($"KA" / $"deaths" as "KDA")
      .orderBy($"KDA".asc)
      .withColumn("KDA", round($"KDA",2))
      .first()
      .getDouble(0).toFloat
  }

#### This function computes the KP for one specific match taking into consideration what team the player is playing in.

In [None]:
// Computes the KP for one specific match depending on the team whether DIRE or RADIANT
  def KPComputationTeam(matchDF: DataFrame, userDF: DataFrame): Float = {
    matchDF.join(userDF, "match_id")
      .select($"kills", $"assists", $"player_slot", $"radiant_score", $"dire_score")
      .withColumn("player_slot",
        when(col("player_slot") <= 127, (($"kills" + $"assists") / ($"radiant_score")) * 100)
          .otherwise((($"kills" + $"assists") / ($"dire_score"))*100)).as("KP")
      .withColumn("player_slot",round($"player_slot",2))
      .select("player_slot").first().getDouble(0).toFloat
  }

#### This recursive function takes the results from the previous one, and creates a list of all the KPs from a player, to be evaluated later.

In [None]:
def KPTotalComputation(matchesList: List[String], userDF: DataFrame): List[Float] =
    matchesList match {
      case Nil => List()
      case match_id :: rest =>
        val dotaMatchDF = GetUrlContentJson("https://api.opendota.com/api/matches/" ++ match_id)
        List(KPComputationTeam(dotaMatchDF, userDF)) ++ KPTotalComputation(rest, userDF)
    }

#### The following functions will recibe the list created before and compute the average, maximum and minimum from the previous list of KPs.

In [None]:
def KPAvg(KPList: List[Float]): Float = {
    val x = (KPList.sum / KPList.size)
    BigDecimal.decimal(x).setScale(2, BigDecimal.RoundingMode.HALF_UP).toFloat
  }

  def KPMax(KPList: List[Float]): Float = {
    KPList.max
  }

  def KPMin(KPList: List[Float]): Float = {
    KPList.min
  }

#### Finally we create the final DataFrame with the Data that is relevant from this assignment, the structure and schema is custom made so it can be serialized properly.

In [None]:
def createKPIdataframe(game: String, player_name: String,total_games: Int, KDAavg : Float,
                         KDAmax : Float, KDAmin : Float, KPavg : Float, KPmax : Float, KPmin : Float) : DataFrame  = {

    val KPIs = List(Row(game,player_name,total_games,KDAavg,KDAmax,KDAmin,KPavg,KPmax,KPmin))

    val schema = StructType(List(
      StructField("game", StringType, true ).withComment("The title of the game the matches are related to"),
      StructField("player_name", StringType, true).withComment("The player’s ingame name / Summoner’s name"),
      StructField("total_games", IntegerType,true),
      StructField("max_kda", FloatType, true).withComment("Maximum KDA across n games"),
      StructField("min_kda", FloatType,true).withComment("Minimum KDA across n games"),
      StructField("avg_kda", FloatType,true).withComment("Average KDA across n games"),
      StructField("max_kp", FloatType, true).withComment("Maximum KP across n games"),
      StructField("min_kp", FloatType,true).withComment("Minimum KP across n games"),
      StructField("avg_kp", FloatType,true).withComment("Average KP across n games")
    ))

    val rdd = spark.sparkContext.parallelize(KPIs)
    spark.createDataFrame(rdd,schema)
  }

# Load

#### Finally we save our data as a JSON file indicating the URL, so that it can be pushed maybe to some cloud service, like S3 or other one. For this case will be saved locally.

In [None]:
def saveDFtoJSON(df : DataFrame, url : String) : Unit = {
    df.coalesce(1).write.mode(SaveMode.Overwrite).json(url)
  }

In [None]:
val dotaPlayerDF = GetUrlContentJson(URL_PLAYER)
val playerMatches = getPlayerMatches(dotaPlayerDF, NMATCHES)

  //Transform the Data and compute the KPIs
  val totalKP = KPTotalComputation(playerMatches, dotaPlayerDF)

  val maxKDA = maxKdaComputation(dotaPlayerDF, NMATCHES)
  val minKDA = minKdaComputation(dotaPlayerDF, NMATCHES)
  val avgKDA = avgKdaComputation(dotaPlayerDF, NMATCHES)

  val maxKP = KPMax(totalKP)
  val minKP = KPMin(totalKP)
  val avgKP = KPAvg(totalKP)

  val finalDF = createKPIdataframe("Dota","YrikGood",NMATCHES,maxKDA,minKDA,avgKDA,maxKP,minKP,avgKP)

  finalDF.show()