In [1]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [15]:
val bucketname = "unibo-bd2324-dcohen"

val path_apps = "s3a://"+bucketname+"/datasets/project/App_ID_Info.csv"
val path_players = "s3a://"+bucketname+"/datasets/project/Player_Summaries.csv"
val path_friends = "s3a://"+bucketname+"/datasets/project/Friends.csv"
val path_games_full = "s3a://"+bucketname+"/datasets/project/Games.csv"
val path_games = "s3a://"+bucketname+"/datasets/project/Games20Percent.csv"
val path_games_daily = "s3a://"+bucketname+"/datasets/project/Games_Daily.csv"
val path_games_info = "s3a://"+bucketname+"/datasets/project/Games_Info.csv"
val path_achievement = "s3a://"+bucketname+"/datasets/project/Achievement_Percentages.csv"
val path_output_correlation = "s3a://"+bucketname+"/datasets/project/results/correlation"
val path_output_most_achieved_app = "s3a://"+bucketname+"/datasets/project/results/most_achieved_app"
val path_output_avg_playtime_release_year = "s3a://"+bucketname+"/datasets/project/results/avg_playtime_release_year"

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2324-dcohen
path_apps: String = s3a://unibo-bd2324-dcohen/datasets/project/App_ID_Info.csv
path_players: String = s3a://unibo-bd2324-dcohen/datasets/project/Player_Summaries.csv
path_friends: String = s3a://unibo-bd2324-dcohen/datasets/project/Friends.csv
path_games_full: String = s3a://unibo-bd2324-dcohen/datasets/project/Games.csv
path_games: String = s3a://unibo-bd2324-dcohen/datasets/project/Games20Percent.csv
path_games_daily: String = s3a://unibo-bd2324-dcohen/datasets/project/Games_Daily.csv
path_games_info: String = s3a://unibo-bd2324-dcohen/datasets/project/Games_Info.csv
path_achievement: String = s3a://unibo-bd2324-dcohen/datasets/project/Achievement_Percentages.csv
path_output_correlation: String = s3a://unibo-bd2324-dcohen/datasets/project/results/correlation
path_output_most_achieved_app: String = s3a://unibo-bd2324-dcohen/datasets/project/results/most_achieved_app
path_output_avg_playtime_release_year: String = s3a://unibo-bd2324-dcohen/datas

In [9]:
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.SaveMode
import scala.util.{Try, Success, Failure}

case class App(appid: Long, title: String, typ: String, price: Double, releaseYear: Int, rating: Int, requiredAge: Int, isMultiplayer: Int)
case class PlayerSummary(steamid: Long, personaname: String, lastlogoffYear: Int, realname: Option[String], primaryclanid: Long, timecreatedYear: Int, loccountrycode: Option[String], locstatecode: Option[String])
case class GameInfo(appid: Long, developer: String, publisher: String, genre: String)
case class Game(steamid: Long, appid: Long, playtimeTwoWeek: Int, playtimeForever: Int)
case class Friend(steamidA: Long, steamidB: Long, friendSinceYear: Int)
case class Achievement(appid: Long, playername: String, percentage: Float)

case class AppValues(title: String, typ: String, price: Double, releaseYear: Int, rating: Int, requiredAge: Int, isMultiplayer: Int)
case class PlayerSummaryValues(personaname: String, lastlogoffYear: Int, realname: Option[String], primaryclanid: Long, timecreatedYear: Int, loccountrycode: Option[String], locstatecode: Option[String])
case class GameInfoValues(developer: String, publisher: String, genre: String)
case class GameValues(playtimeTwoWeek: Int, playtimeForever: Int)
case class GameValuesWithAppKey(appid: Long, playtimeTwoWeek: Int, playtimeForever: Int)
case class GameValuesWithSteamKey(steamid: Long, playtimeTwoWeek: Int, playtimeForever: Int)
case class FriendValues(friendSinceYear: Int)
case class AchievementValuesWithPlayerNameKey(playername: String, percentage: Float)
case class AchievementValuesWithAppKey(appid: Long, percentage: Float)

object SteamGamesParser {

  val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
  val pipeRegex = "\\|(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
  val quotes = "\""

    /** Convert from date string "yyyy-MM-dd HH:mm:ss" to year (Int) */
    def yearFromTimestamp(timestamp: String): Int = {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val date = dateFormat.parse(timestamp)
      val cal = Calendar.getInstance()
      cal.setTime(date)
      cal.get(Calendar.YEAR)
    }
    


    def cleanField(field: String): String = field.replace(quotes, "").trim

    def safelyConvert[A](str: String)(conversion: String => A): Option[A] = {
      Try(conversion(str)) match {
        case Success(value) => Some(value)
        case Failure(e) => 
          println(s"Error converting value '$str': ${e.getMessage}")
          None
      }
    }

    /** PARSING **/
    
    def parseAppLine(line: String): Option[App] = {
      val fields = line.split(",").map(cleanField)

      fields match {
        case Array(
                appid, 
                title, 
                typ, 
                price, 
                releaseDate, 
                rating, 
                requiredAge, 
                isMultiplayer) =>
          for {
            appidLong        <- safelyConvert(appid)(_.toLong)
            priceDouble      <- safelyConvert(price)(_.toDouble)
            releaseYear      <- safelyConvert(releaseDate)(yearFromTimestamp)
            ratingInt        <- safelyConvert(rating)(_.toInt)
            requiredAgeInt   <- safelyConvert(requiredAge)(_.toInt)
            isMultiplayerInt <- safelyConvert(isMultiplayer)(_.toInt)
          } yield App(
            appidLong, 
            title, 
            typ, 
            priceDouble, 
            releaseYear, 
            ratingInt, 
            requiredAgeInt, 
            isMultiplayerInt
        )
        case _ => None
      }
    }
    
    def parsePlayerSummaryLine(line: String): Option[PlayerSummary] = {
      val fields = line.split(",").map(cleanField)
      fields match {
        case Array(
                steamid, 
                personaname, 
                profileurl, 
                avatar, 
                avatarmedium, 
                avatarfull, 
                personastate, 
                communityvisibilitystate, 
                profilestate, 
                lastlogoff, 
                commentpermission, 
                realname, 
                primaryclanid, 
                timecreated, 
                gameid, 
                gameserverip, 
                gameextrainfo, 
                cityid, 
                loccountrycode, 
                locstatecode, 
                loccityid,
                dateretrieved) =>
          for {
            steamidLong        <- safelyConvert(steamid)(_.toLong)
            primaryclanidLong  <- safelyConvert(primaryclanid)(_.toLong)
            lastlogoffYear     <- safelyConvert(lastlogoff)(yearFromTimestamp)
            timecreatedYear    <- safelyConvert(timecreated)(yearFromTimestamp)
            loccityidInt       <- safelyConvert(loccityid)(_.toInt)
          } yield PlayerSummary(
            steamidLong, 
            personaname,
            lastlogoffYear, 
            Option(realname).orElse(Some("Unknown")), 
            primaryclanidLong, 
            timecreatedYear, 
            Option(loccountrycode).orElse(Some("Unknown")), 
            Option(locstatecode).orElse(Some("Unknown"))
          )
        case _ => None
      }
    }
    
    def parseGameInfoLine(line: String): Option[GameInfo] = {
      val fields = line.split(",").map(cleanField)

      fields match {
        case Array(
                appid, 
                developer, 
                publisher, 
                genre) =>
          for {
            appidLong  <- safelyConvert(appid)(_.toLong)
          } yield GameInfo(
            appidLong, 
            developer, 
            publisher, 
            genre
        )
        case _ => None
      }
    }

    def parseGameLine(line: String): Option[Game] = {
      val fields = line.split(",").map(cleanField)

      fields match {
        case Array(
                steamid, 
                appid, 
                playtimeTwoWeek,
                playtimeForever,
                dateretrieved) =>
          for {
            steamidLong  <- safelyConvert(steamid)(_.toLong)
            appidLong  <- safelyConvert(appid)(_.toLong)
            playtimeTwoWeekInt  <- safelyConvert(playtimeTwoWeek)(_.toInt)
            playtimeForeverInt  <- safelyConvert(playtimeForever)(_.toInt)
          } yield Game(
            steamidLong, 
            appidLong, 
            playtimeTwoWeekInt, 
            playtimeForeverInt
        )
        case _ => None
      }
    }
    
    def parseFriendLine(line: String): Option[Friend] = {
      val fields = line.split(",",-1).map(cleanField) 
        fields match {
          case Array(
              steamid_a,
              steamid_b,
              relationship,
              friendSince,
              dataretrived,
              lccTag) =>
            for {
              steamidALong     <- safelyConvert(steamid_a)(_.toLong)
              steamidBLong     <- safelyConvert(steamid_b)(_.toLong)
              friendSinceYear  <- safelyConvert(friendSince)(yearFromTimestamp)
            } yield Friend(
              steamidALong,
              steamidBLong,
              friendSinceYear
          )
          case _ => None
        }
    }
    
    def parseAchievementLine(line: String): Option[Achievement] = {
      val fields = line.split(",").map(cleanField) 
        fields match {
          case Array(
              appid,
              name,
              percentage,
              _*) =>
            for {
              appidLong           <- safelyConvert(appid)(_.toLong)
              percentageFloat     <- safelyConvert(percentage)(_.toFloat)
            } yield Achievement(
              appidLong,
              name,
              percentageFloat
          )
          case _ => None
        }
    }
}


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.SaveMode
import scala.util.{Try, Success, Failure}
defined class App
defined class PlayerSummary
defined class GameInfo
defined class Game
defined class Friend
defined class Achievement
defined class AppValues
defined class PlayerSummaryValues
defined class GameInfoValues
defined class GameValues
defined class GameValuesWithAppKey
defined class GameValuesWithSteamKey
defined class FriendValues
defined class AchievementValuesWithPlayerNameKey
defined class AchievementValuesWithAppKey
defined object SteamGamesParser


In [4]:
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner

val partitioner = new HashPartitioner(12)

val apps = sc.textFile(path_apps).flatMap(SteamGamesParser.parseAppLine).map { app => 
  (app.appid, AppValues(app.title, app.typ, app.price, app.releaseYear, app.rating, app.requiredAge, app.isMultiplayer))
}.cache()

val players = sc.textFile(path_players).flatMap(SteamGamesParser.parsePlayerSummaryLine).map { player => 
  (player.steamid, PlayerSummaryValues(player.personaname, player.lastlogoffYear, player.realname, player.primaryclanid, player.timecreatedYear, player.loccountrycode, player.locstatecode))
}.partitionBy(partitioner).cache()

val gameInfo = sc.textFile(path_games_info).flatMap(SteamGamesParser.parseGameInfoLine).map { gameInfo => 
  (gameInfo.appid, GameInfoValues(gameInfo.developer, gameInfo.publisher, gameInfo.genre))
}.partitionBy(partitioner).cache()

val gameSessions = sc.textFile(path_games_full).flatMap(SteamGamesParser.parseGameLine)

val gameSessionsWithSteamKey = gameSessions.map { game => 
  ((game.steamid), GameValuesWithAppKey(game.appid, game.playtimeTwoWeek, game.playtimeForever))
}.partitionBy(partitioner).persist(StorageLevel.MEMORY_ONLY)

val gameSessionsWithAppKey = gameSessions.map { game => 
  ((game.appid), GameValuesWithSteamKey(game.steamid, game.playtimeTwoWeek, game.playtimeForever))
}.partitionBy(partitioner).persist(StorageLevel.MEMORY_ONLY)

val friends = sc.textFile(path_friends).flatMap(SteamGamesParser.parseFriendLine).map { friend => 
  ((friend.steamidA, friend.steamidB), FriendValues(friend.friendSinceYear))
}.partitionBy(partitioner).persist(StorageLevel.MEMORY_ONLY)

val achievementsWithPlayerName = sc.textFile(path_achievement).flatMap(SteamGamesParser.parseAchievementLine).map {  achievement =>
    ((achievement.playername), AchievementValuesWithAppKey(achievement.appid, achievement.percentage))
}.partitionBy(partitioner).cache()

val achievementsWithAppKey = sc.textFile(path_achievement).flatMap(SteamGamesParser.parseAchievementLine).map {  achievement =>
    ((achievement.appid), AchievementValuesWithPlayerNameKey(achievement.playername, achievement.percentage))
}.partitionBy(partitioner).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@c
apps: org.apache.spark.rdd.RDD[(Long, AppValues)] = MapPartitionsRDD[3] at map at <console>:33
players: org.apache.spark.rdd.RDD[(Long, PlayerSummaryValues)] = ShuffledRDD[8] at partitionBy at <console>:36
gameInfo: org.apache.spark.rdd.RDD[(Long, GameInfoValues)] = ShuffledRDD[13] at partitionBy at <console>:36
gameSessions: org.apache.spark.rdd.RDD[Game] = MapPartitionsRDD[16] at flatMap at <console>:31
gameSessionsWithSteamKey: org.apache.spark.rdd.RDD[(Long, GameValuesWithAppKey)] = ShuffledRDD[18] at partitionBy at <console>:34
gameSessionsWithAppKey: org.apache.spark.rdd.RDD[(Long, GameValuesWithSteamKey)] = ShuffledRDD[20] at partitionBy at <console>:34
friends: org.apache.spark.rdd.RDD[((Long, Long), FriendValues)] = ShuffledRDD[25] at partitionBy at <console>:36
achievementsWithPlayerName: org.apache.spark.rdd.R

In [5]:
/** Total playtime for each game
    1. gameSession_mappedVal(appid, playtimeforever)
    2. gameSession_mappedVal_reduced(appid, sum(playtimeforever))
    3. Cached!!! used in (1) and (5)
**/
val totalPlaytimeForGame = gameSessionsWithAppKey.
    mapValues(_.playtimeForever).
    reduceByKey(_ + _).
    partitionBy(partitioner).
    cache()

/** Count the number of friends for each player
    1. friend_mapped(steamidA,1), friend(steamidB,1)
    2. friend_mapped_reduced(steamidA, sum), friend(steamidB, sum)
    3. Cached!!! Used in (3),(4) and (6)
**/
val friendCounts = friends.
    flatMap { case ((steamidA, steamidB), _) => Seq((steamidA, 1), (steamidB, 1)) }.
    reduceByKey(_ + _).
    partitionBy(partitioner).
    cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

totalPlaytimeForGame: org.apache.spark.rdd.RDD[(Long, Int)] = MapPartitionsRDD[37] at reduceByKey at <console>:36
friendCounts: org.apache.spark.rdd.RDD[(Long, Int)] = ShuffledRDD[39] at reduceByKey at <console>:37


In [6]:
val appsMap = apps.
    mapValues(v => (v.title, v.rating.toDouble, v.isMultiplayer, v.price.toDouble, v.releaseYear)).
    collectAsMap().toMap

val broadcastedApps = sc.broadcast(appsMap)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

appsMap: scala.collection.immutable.Map[Long,(String, Double, Int, Double, Int)] = Map(42890 -> (Sword of the Stars: Complete Collection,75.0,1,9.99,2010), 221020 -> (Towns,-1.0,0,14.99,2012), 428140 -> (TowerClimb - Extended Original Soundtrack,-1.0,0,8.99,2016), 402840 -> (Crusaders of the Lost Idols,-1.0,0,0.0,2015), 204308 -> (Awesomenauts - Hotrod Derpl Skin,-1.0,1,2.49,2012), 42900 -> (Hearts of Iron III: Semper Fi,65.0,1,4.99,2010), 446870 -> (Glitchrunners,-1.0,0,12.99,2016), 421166 -> (MX vs. ATV Supercross Encore - 2 Stroke 4 Pack,-1.0,1,1.99,2016), 90410 -> (Blue Toad Murder Files?: The Mysteries of Little Riddle Demo,-1.0,1,0.0,2010), 259620 -> (3079 -- Block Action RPG,-1.0,1,4.99,2013), 325330 -> (Muvee Reveal 11,-1.0,0,79.99,2015), 349680 -> (Supercharged Robot VULKAISER,...
broadcastedApps: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Long,(String, Double, Int, Double, Int)]] = Broadcast(8)


In [None]:
// Total number of games sessions 
println(s"Total number of games sessions: ${gameSessions.count()}")

// Total number of games
// (Possibile candidata broadcast)
println(s"Total number of apps: ${apps.count()}")

// Total number of players
// (Possibile candidata broadcast)
println(s"Total number of players: ${players.count()}")

// Total number of friends connections
println(s"Total number of friends connections: ${friends.count()}")

// Total number of game informations
// (Possibile candidata broadcast, solo se le informazioni sono molto leggere)
println(s"Total number of game informations: ${gameInfo.count()}")

// Total number of achievements
println(s"Total number of achievements: ${achievementsWithAppKey.count()}")


In [47]:
/** (6) - Correlation between number of friends and number multiplayer applications **/


/** Count the number of multiplayer sessions for each player 
    1. gameSessionWithAppKey(appid, steamid)
    1. join_broadcast(appid, (steamid ,...multiplayer == 1))
    2. join_broadcast_mapped(steamid, 1)
    3. join_broadcast_mapped_reduced(steamid, sum(multiplayerOccurences))
**/
val multiplayerSessions = gameSessionsWithAppKey.
    filter { case (appid, _) => 
        val appDetails = broadcastedApps.value.getOrElse(appid, null)
        appDetails != null && appDetails._3 == 1 }.
    mapValues(_.steamid).
    map { case (_, steamid) => (steamid, 1) }.
    reduceByKey(_ + _)
    

/** Unify with the number of friends
    1. join(steamid, ((sum(multiplayerOccurences), sum(friendsOccurrences))))
**/
val joinedData = multiplayerSessions.
    partitionBy(partitioner).
    join(friendCounts).
    mapValues { case (countMultiplayers, friends) => (friends.toDouble , countMultiplayers.toDouble) }

val finalDF = joinedData.
    coalesce(1).
    toDF("appid","metrics").
    select($"appid", 
           $"metrics._1".alias("friendCount"), 
           $"metrics._2".alias("numMultiplayerSessions"))

// Save results
finalDF.write.format("csv").mode(SaveMode.Overwrite).save(path_output_correlation)

// Calculate the correlation between number of friends and number multiplayer applications
val correlation3 = finalDF.stat.corr("friendCount", "numMultiplayerSessions")

// print results
println(s"Correlation between number of friends and multiplayer apps is $correlation3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

multiplayerSessions: org.apache.spark.rdd.RDD[(Long, Int)] = ShuffledRDD[215] at reduceByKey at <console>:49
joinedData: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = MapPartitionsRDD[219] at mapValues at <console>:44
finalDF: org.apache.spark.sql.DataFrame = [appid: bigint, friendCount: double ... 1 more field]
correlation3: Double = 0.07480698619060326
Correlation between number of friends and multiplayer apps is 0.07480698619060326


In [48]:
/** (10) - Where players try to complete more achievements? **/

/** Calculate the average percentage of completion for each app
  1. achievements_mapped(appid, (percentage, 1))
  2. achievements_reduced(appid, (sum(percentage), count))
**/
val totalAndCountAchievement = achievementsWithAppKey.
  mapValues(v => (v.percentage,1)).
  reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))

/** Average achievements
  1. achievements__reduced_mappedval(appid, average_percentage = sum / count)
**/
val averageAchievement = totalAndCountAchievement.
    mapValues { case (total, count) => total / count }

/** Most achieved apps
  1. join(appid, (average_percentage, appValues))
  2. join_mapped(average_percentage, title)
  3. Sort by average_percentage
**/
val mostAchievedAppsAverage = averageAchievement.
  map { case (appid, avg) =>
     val appDetails = broadcastedApps.value.getOrElse(appid, null)
     val title = if (appDetails != null) appDetails._1 else "Unknown"  
     (appid, (avg, title)) }.
  sortByKey(false)

val finalDF = mostAchievedAppsAverage.
    coalesce(1).
    toDF("appid","metrics").
    select($"appid", 
           $"metrics._1".alias("average_percentage"), 
           $"metrics._2".alias("title"))

// Save results
finalDF.write.format("csv").mode(SaveMode.Overwrite).save(path_output_most_achieved_app)

// Print results
// mostAchievedAppsAverage.
//     coalesce(1).
//     collect().
//     foreach {
//   case (appid, (averagePercentage, title)) => println(s"* '$title' ($appid), average achieved ${averagePercentage}% by players")
// }


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

totalAndCountAchievement: org.apache.spark.rdd.RDD[(Long, (Float, Int))] = MapPartitionsRDD[231] at reduceByKey at <console>:41
averageAchievement: org.apache.spark.rdd.RDD[(Long, Float)] = MapPartitionsRDD[232] at mapValues at <console>:38
mostAchievedAppsAverage: org.apache.spark.rdd.RDD[(Long, (Float, String))] = ShuffledRDD[236] at sortByKey at <console>:45
finalDF: org.apache.spark.sql.DataFrame = [appid: bigint, average_percentage: float ... 1 more field]


In [49]:
/** (12) - Average of playtime based on relased date **/

/** Average playtime by release year
    1. join(appid, (gameValues, appValues))
    2. join_mapped(releaseYear, playtimeForever)
    3. join_mapped_reduced(releaseYear, (sum(playtimeForever), count(this)))
    4. join_mapped_reduced_mapped(releaseYear, sum(playtimeForever) / count(this))
**/
val avgPlaytimeByReleaseYear = gameSessionsWithAppKey.
  map {case (appid, gameInfo) =>
     val appDetails = broadcastedApps.value.getOrElse(appid, null)
     val releaseYear: Int = if (appDetails != null) appDetails._5 else 0  
     (releaseYear, gameInfo.playtimeForever) }.
  aggregateByKey((0.0, 0))(
    (acc, v) => (acc._1 + v, acc._2 + 1),
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).
    mapValues(v => v._1 / v._2)

val finalDF = avgPlaytimeByReleaseYear.
    coalesce(1).
    toDF()

// Save results
finalDF.write.format("csv").mode(SaveMode.Overwrite).save(path_output_avg_playtime_release_year)

// print results
// avgPlaytimeByReleaseYear.
//     coalesce(1).
//     collect().
//     foreach { 
//     case (releaseYear, avg) => println(s"* On relased date $releaseYear the average playtime was $avg minutes")
// }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

avgPlaytimeByReleaseYear: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[243] at mapValues at <console>:51
finalDF: org.apache.spark.sql.DataFrame = [_1: int, _2: double]


In [None]:
/** (1) - Game most popular **/

/** Added gamename to the total playtime
    1. broadcast_join((appid, title), time)
**/
val totalPlaytimeWithNames = totalPlaytimeForGame.
    map { case (appid, time) =>
         val appDetails = broadcastedApps.value.getOrElse(appid, null)
         val title = if (appDetails != null) appDetails._1 else "Unknown"
        (time, (appid, title))
}

/** Most played game
    1. broadcast_join_reduced((appid, gamename), max(time))
**/
val mostPlayedGame = totalPlaytimeWithNames.
    reduceByKey((a, b) => if (a._2 > b._2) a else b).
    sortByKey(false)

// print results
mostPlayedGame.coalesce(1).collect().foreach { 
  case (time, (appid, title)) => 
    println(s"* '$title' ($appid) is the most played with $time minutes of total playtime.")
}

In [None]:
/** (2) - Number of achievements for every app sorted  **/

/** Number of achievements for each game
    1.achievement_mapped(appid, 1)
    2.achievement_mapped_reduced(appid, sum)
**/
val numberOfAchievementsByGame = achievementsWithAppKey.
    mapValues(_ => 1).
    reduceByKey(_ + _).
    sortByKey()

/** Added gamename to the total playtime
    1. broadcast_join((appid, sum), time)
**/
val joinedData = totalPlaytimeForGame.
    map { case (appid, sum) =>
         val appDetails = broadcastedApps.value.getOrElse(appid, null)
         val title = if (appDetails != null) appDetails._1 else "Unknown"
        (appid, (sum, title))
}

// print results
joinedData.coalesce(1).collect().foreach { 
  case (appid, (count, title)) => println(s"* $title ($appid) has $count achievements.")
}

In [None]:
/** (3) - Average friend of each country **/

/** Get player with country
    1. player_mappedVal(steamid, loccountrycode)
**/
val PlayerWithCountry = players.
    filter { case (_, playerValues) => playerValues.loccountrycode.isDefined }.
    mapValues(_.loccountrycode.get)

/** Add countrycode for each friends
    1. join(steamid, (countryCode, loccountrycode))
    2. join_mapped(countryCode, count) --> {permit aggregatebyKey}
**/
val friendCountsByCountry = friendCounts.
//     partitionBy(partitioner).
    join(PlayerWithCountry).
    map { case (steamid, (count, countryCode)) => (countryCode, count) }

/** Sum and count for each country
    1. join_mapped_reduce(countrycode, (sum, num))
**/
val sumAndCountByCountry = friendCountsByCountry.
    aggregateByKey((0, 0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )

/** Find the average
    1. join_mapped_reduce_mappedVal(countrycode, avg)
**/
val avgByCountry = sumAndCountByCountry.
    mapValues { case (sum, count) => if (count > 0) sum.toDouble / count else 0.0 }.
    sortBy(_.avg)

// print results
avgByCountry.coalesce(1).collect().foreach { 
  case (country, mean) => println(s"* Country $country has average friends of $mean.")
}

In [None]:
/** (4) - Correlation between total play time and friends **/

/** Calculate the total play time for each player
    1. gameSessions_mappedVal(steamid, playtimeForever)
    2. gameSessions_mapped_reduced(steamid, sum(playtimeForever))
**/
val totalPlayTimeByPlayer = gameSessionsWithSteamKey.
    mapValues(_.playtimeForever).
    reduceByKey(_ + _)

/** Unify the data
    1. join(steamid, (sum(playtimeforever), sum(friends_occurrences))) (TODO - used in the other correlations, maybe caching?)
**/
val joinedData = totalPlayTimeByPlayer.
//     partitionBy(partitioner).
    join(friendCounts).
    mapValues { case (playTime, friendCount) => (playTime.toDouble , friendCount.toDouble) }

val joinDF = joinedData.
    toDF("steamid","metrics").
    select($"steamid", 
           $"metrics._1".alias("totalPlayTime"), 
           $"metrics._2".alias("friendCount"))

// Calculate the correlation between the sum of total play time and the number of occured friends
val correlation1 = joinDF.stat.corr("totalPlayTime", "friendCount")

// print results
println(s"Correlation between total play time and number of friends is $correlation1")

In [None]:
/** (5) - Correlation between app ranking and total play time **/

/** Calculate the total play time for each player
    1. gameSessions_mapped(appid, playtimeForever)
    2. gameSessions_mapped_reduced(appid, sum(playtimeForever))
**/
val totalPlayTimeByGame = gameSessionsWithAppKey.
    mapValues(_.playtimeForever).
    reduceByKey(_ + _)

/** Unify the data
    1. join_broadcast(appid, (sum(playtimeforever), ranking))
**/
val joinedData = totalPlaytimeForGame.
    map { case (appid, time) =>
         val appDetails = broadcastedApps.value.getOrElse(appid, null)
         val rating = if (appDetails != null) appDetails._2 else 0.0 
        (appid, (rating, time)) }.
    mapValues { case (playTime, rating) => (playTime.toDouble , rating.toDouble) }

val joinDF = joinedData.
    toDF("appid","metrics").
    select($"appid", 
           $"metrics._1".alias("totalPlayTime"), 
           $"metrics._2".alias("ranking"))

// Calculate the correlation between the sum of total play time and the rating
val correlation2 = joinDF.stat.corr("totalPlayTime", "ranking")

// print results
println(s"Correlation between total play time and game ratings is $correlation2")

In [None]:
/** (7) - Find all the unplayed apps bought by players **/

/** Obtain only the unplayed game
    1. gameSessions_filtered(appid,... playtimeForever == 0)
**/
val unplayedGames = gameSessionsWithAppKey.
  filter {case (_, gameValues) => gameValues.playtimeForever == 0}

/** Count the unplayed apps for each player
    1. gameSessions_filtered_mapped(appid,1)
    2. gameSessions_filtered_mapped_reduced(appid,countGameSessions)
**/
val countUnplayedGamesByPlayer = unplayedGames.
    mapValues(_ => 1).
    reduceByKey(_ + _)

/** Unify the data alternative --> (join(apps.mapValues(_.title)))
    1. join_broadcast(appid, (countGameSessions, title))
**/
val joinedData = countUnplayedGamesByPlayer.
    map { case (appid, unplayed) =>
         val appDetails = broadcastedApps.value.getOrElse(appid, null)
         val title = if (appDetails != null) appDetails._1 else "Unknown"
        (appid, (unplayed, title)) }

// print results
joinedData.coalesce(1).collect().foreach { 
  case (steamid, (unplayed, title)) => println(s"* '$title' ($steamid) is unplayed by $unplayed players")
}

In [None]:
/** (8) - Apps more expensive by genre **/

/** Genre associated with apps informations
    1. apps_mapVal(appid, (title, price))
    2. join_broadcast(genre, (appid, title, price))
**/
val joinGame = gameInfo.
    map { case (appid, gameVal) =>
         val appDetails = broadcastedApps.value.getOrElse(appid, null)
         val title = if (appDetails != null) appDetails._1 else "Unknown"
         val price = if (appDetails != null) appDetails._4 else 0.0
         (gameVal.genre, (appid, title, price))
    }

/** Find the most expensive app by genre
    1. join_mapped_reduced(genre, max((appid, title, price)))
**/
val mostExpensiveGameByGenre = joinGame.
    reduceByKey { (game1, game2) => if (game1._3 > game2._3) game1 else game2 }

// print results 
mostExpensiveGameByGenre.coalesce(1).collect().foreach { 
    case (genre, (appid, title, price)) => println(s"* '$title' ($appid) in genre $genre costs: $price ")
}

In [None]:
/** (9) - Top 10 apps with played in last two weeks **/

/** Sum the game time for each apps in the last two weeks
    1. gameSession_mapped(appid, playtimeTwoWeek) --> TODO: altro caso da evitare
    2. gameSession_mapped_reduced(appid, sum(playtimeTwoWeek))
**/
val totalPlaytimeTwoWeekByGame = gameSessionsWithAppKey.
    mapValues(_.playtimeTwoWeek).
    reduceByKey(_ + _)

/** Unify with the information of app title
    1. join(appid, ((sum(playtimeTwoWeek), appValues))
    2. join_mapped((sum(playtimeTwoWeek), title)
    3. join_mapped_sorted(((sum(playtimeTwoWeek), title))
**/
val mostPlayedGames = totalPlaytimeTwoWeekByGame.
    map {
        case (appid, totalTwoWeek) =>
         val appDetails = broadcastedApps.value.getOrElse(appid, null)
         val title = if (appDetails != null) appDetails._1 else "Unknown"
         (totalTwoWeek, (appid, title)) }.
    sortByKey(false)
    
// print results
mostPlayedGames.take(10).foreach { 
    case (totalPlaytime, (appid, title)) => println(s"* '$title' ($appid) was played $totalPlaytime minutes in the last two weeks")
}

In [None]:
/** (11) - Number of apps possessed by players but never played **/

/** Total game session that is never played
  1. gameSessions_filtered((steamid, appid), ... playtimeForever == 0)
  2. gameSessions_filtered_mapped(steamid, 1)
  3. gameSessions_filtered_mapped(steamid, count(gamesSessions))
**/
val totalAppsNeverPlayed = gameSessionsWithSteamKey.
  filter { case (_, gameValues) => gameValues.playtimeForever == 0 && gameValues.playtimeTwoWeek == 0 }

/** Unplayed games by players
  1. join(steamid, (count(gamesSessions), playerValues))
  * Possibile broadcast (costosa)
**/
val unplayedGamesByPlayer = totalAppsNeverPlayed.
//   partitionBy(partitioner).
  join(players).
  map { case (steamid, (count, playerValues)) => (steamid, (count, playerValues.personaname)) }

// Print results
unplayedGamesByPlayer.coalesce(1).collect().foreach { 
  case (steamid, (count, personaname)) => println(s"Player: $personaname ($steamid), never played $count apps bought")
}


In [None]:
/** (13) - Average of playtime based on contry **/

/** Unify gameSessions and players
    1. gameSessions_mapped(steamid, gameValues)
    2. join(steamid, (gameValues, playerValues))
**/
val joinedData = gameSessionsWithSteamKey.
  join(players)

/** Average playtime by country
    1. join_mapped(loccountrycode, playtimeForever)
    3. join_mapped_reduced(loccountrycode, (sum(playtimeForever), count(this)))
    4. join_mapped_reduced_mapped(loccountrycode, sum(playtimeForever) / count(this))
**/
val avgPlayTimeByCountry = joinedData.
  map { case (steamid, (gameValues, playerValues)) => (playerValues.loccountrycode.getOrElse("Unknown"), gameValues.playtimeForever)}.
  aggregateByKey((0.0, 0))(
    (acc, v) => (acc._1 + v, acc._2 + 1),
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).
    mapValues(v => v._1 / v._2)

// print results
avgPlayTimeByCountry.coalesce(1).collect().foreach { 
  case (country, avgPlayTime) => println(s"Average playtime in $country is $avgPlayTime minutes.")
}

In [None]:
/** (14) - Churn Rate (abandoning the game) **/

/** Filter out active users from gameSessions
    1. gameSessions_filtered((steamid, appid), ...playtimeForever > 0 && gameValues.playtimeTwoWeek == 0)
**/
val inActiveUsers = gameSessionsWithAppKey.
  filter {case (_, gameValues) => gameValues.playtimeForever > 0 && gameValues.playtimeTwoWeek == 0}

/** Count the number of inactive users per game
    1. gameSessions_filtered_mapped(appid, 1)
    2. gameSessions_filtered_mapped_reduced(appid, sum(this))
**/
val inActiveUsersPerGame = inActiveUsers.
  mapValues(_ =>1).
  reduceByKey(_ + _)

/** Count the total number of users per game
    1. gameSessions_mapped(appid,1)
    2. gameSessions_mapped_reduced(appid, sum(this))
**/
val totalUsersPerGame = gameSessionsWithAppKey.
  mapValues(_ =>1).
  reduceByKey(_ + _)

/** Calculate the churn rate per game
    1. join(appid, (sum(this), sum(this)))
**/
val churnRatePerGame = inActiveUsersPerGame.
//   partitionBy(partitioner).
  join(totalUsersPerGame).
  mapValues {case ((inActiveCount, totalCount)) => ((inActiveCount.toDouble / totalCount) * 100)}

// print results
churnRatePerGame.coalesce(1).collect().foreach { 
  case (appid, churnRate) => println(s"The churn rate for game $appid is $churnRate%.")
}