In [28]:
# Importing necessary libraries to extract the data properly from Postgres using PySpark
import pandas as pd
import numpy as np
from API_Key import username, password
import requests
import time
import psycopg2 as pg2
import json
from pandas.io.json import json_normalize
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as psf

In [2]:
spark = SparkSession \
    .builder \
    .appName("TFT Analysis") \
    .config("spark.jars", "/Users/hiowatah/downloads/postgresql-42.2.18.jar") \
    .getOrCreate()

In [3]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/TFT") \
    .option("dbtable", "matches") \
    .option("user", username) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [4]:
df.schema

StructType(List(StructField(match,StringType,true),StructField(game_datetime,LongType,true),StructField(game_length,DoubleType,true),StructField(game_version,StringType,true),StructField(gold_left,IntegerType,true),StructField(last_round,IntegerType,true),StructField(level,IntegerType,true),StructField(placement,IntegerType,true),StructField(puuid,StringType,true),StructField(time_eliminated,DoubleType,true),StructField(total_damage,IntegerType,true),StructField(traits,StringType,true),StructField(units,StringType,true)))

In [12]:
df.select("units", "traits").show(1, truncate = False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 units  | [{"items": [], "name": "Katarina", "tier": 2}, {"items": [], "name": "Swain", "tier": 2}, {"items": [], "name": "Mordekaiser", "tier": 3}, {"items": [], "name": "Draven", "tier": 2}, {"items": [], "name": "Poppy", "tier": 3}, {"items": [], "name": "Darius", "tier": 3}, {"items": [], "name": "Garen", "tier": 3}]    

# In order to analyze the games played on the latest version, I need to filter the game_version column

In [14]:
df.where("game_version LIKE '%Version 11%'")

DataFrame[match: string, game_datetime: bigint, game_length: double, game_version: string, gold_left: int, last_round: int, level: int, placement: int, puuid: string, time_eliminated: double, total_damage: int, traits: string, units: string]

In [29]:
def jsonColParser(df, *cols, sanitize=True):
    res = df
    for i in cols:
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))
        
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
            
    return res

In [30]:
test = jsonColParser(df, "units", "traits")

In [32]:
test.schema

StructType(List(StructField(match,StringType,true),StructField(game_datetime,LongType,true),StructField(game_length,DoubleType,true),StructField(game_version,StringType,true),StructField(gold_left,IntegerType,true),StructField(last_round,IntegerType,true),StructField(level,IntegerType,true),StructField(placement,IntegerType,true),StructField(puuid,StringType,true),StructField(time_eliminated,DoubleType,true),StructField(total_damage,IntegerType,true),StructField(traits,ArrayType(StructType(List(StructField(name,StringType,true),StructField(num_units,LongType,true),StructField(style,LongType,true),StructField(tier_current,LongType,true),StructField(tier_total,LongType,true))),true),true),StructField(units,ArrayType(StructType(List(StructField(character_id,StringType,true),StructField(chosen,StringType,true),StructField(items,ArrayType(LongType,true),true),StructField(name,StringType,true),StructField(rarity,LongType,true),StructField(tier,LongType,true))),true),true)))

In [35]:
test.select("units", "traits").show(1, truncate = False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 units  | [[,, [], Katarina,, 2], [,, [], Swain,, 2], [,, [], Mordekaiser,, 3], [,, [], Draven,, 2], [,, [], Poppy,, 3], [,, [], Darius,, 3], [,, [], Garen,, 3]]                                            
 traits | [[Assassin, 1,, 0, 3], [Blademaster, 1,, 0, 3], [Demon, 1,, 0, 3], [Imperial, 4,, 2, 2], [Knight, 6,, 3, 3], [Noble, 1,, 0, 2], [Phantom, 1,, 0, 1], [Shapeshifter, 1,, 0, 2], [Yordle, 1,, 0, 3]] 
only showing top 1 row

