In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=b04a8b6ddbbd0b7a4452a22550760afc9fb7c71ebe8d6ab41e14225ddff535aa
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType, StructField, DoubleType, LongType, MapType, BooleanType
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
import pyspark.sql.functions as F

In [4]:

def jsonToDataFrame(json_input, schema=None):
    reader = spark.read
    if schema:
        reader.schema(schema)
    return reader.json(sc.parallelize([json_input]))
def flattencolum(rcolum):
    stack = [((), rcolum)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return rcolum.select(columns)
def transformtype(df, column):
    df_select = df.select(col(column))
    str_ = df_select.take(1)[0].asDict()[column]
    df_select = jsonToDataFrame(json.dumps(eval(str_)))
    schema = df_select.schema
    
    eval_column = udf(lambda x : eval(x), ArrayType(schema))

    df = df.withColumn(column, eval_column(col(column)))
    
    return df, schema
def itemstransform(x):
    try:
        value = items_dict[int(x)] 
    except:
        value = "Name Not Found"
    return value


def championstransform(x):
    try:
        value = champions_dict[int(x)] 
    except:
        value = "Name Not Found"
    return value

In [5]:
record_s = StructType(
    [
        StructField('colum0', IntegerType(), True),
        StructField('matchCreation', DoubleType(), True),
        StructField('matchDuration', DoubleType(), True),
        StructField('matchId', DoubleType(), True),
        StructField('matchMode', StringType(), True),
        StructField('matchType', StringType(), True),
        StructField('matchVersion', StringType(), True),
        StructField('mapId', DoubleType(), True),
        StructField('participantIdentities', StringType(), True),
        StructField('participants',  StringType(), True),
        StructField('platformId', StringType(), True),
        StructField('queueId', DoubleType(), True),
        StructField('seasonId', DoubleType(), True)
    ]
)

items_s = StructType(
    [
        StructField('colum0', IntegerType(), True),
        StructField('item_id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('upper_item', StringType(), True),
        StructField('explain', StringType(), True),
        StructField('buy_price', IntegerType(), True),
        StructField('sell_price', IntegerType(), True),
        StructField('tag', StringType(), True)
    ]
)

champions_s = StructType(
    [
        StructField('colum0', IntegerType(), True),
        StructField('version', StringType(), True),
        StructField('id', StringType(), True),
        StructField('key', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('title', StringType(), True),
        StructField('blurb', StringType(), True),
        StructField('tags', StringType(), True),
        StructField('partype', StringType(), True),
        StructField('info.attack', IntegerType(), True),
        StructField('info.defense', IntegerType(), True),
        StructField('info.magic', IntegerType(), True),
        StructField('info.difficulty', IntegerType(), True)
    ]
)

In [15]:
spark = SparkSession.builder.appName('lol').\
        master("local").\
        getOrCreate()
sqlContext = SQLContext(spark)
sc = spark.sparkContext

In [16]:
record = spark.read.csv("/content/match_data4.csv",
                    header='true',
                    schema=record_s)

items = spark.read.csv("/content/riot_item.csv",
                    header='true',
                    schema=items_s)

champions = spark.read.csv("/content/riot_champion.csv",
                    header='true',
                    schema=champions_s)



In [17]:
record, participants = transformtype(record, "participants")

record, participantidentities = transformtype(record, "participantIdentities")



combine = udf(lambda x, y: list(zip(x, y)),ArrayType(StructType([StructField("ids", participants),
                                    StructField("info", participantidentities)]))
             )
record.printSchema()
record = record.withColumn("participants_info", combine("participants", "participantIdentities"))

columns_to_drop = ['participants', 'participantIdentities']
record = record.drop(*columns_to_drop)



record = record.withColumn("participants_info", explode("participants_info"))


record=flattencolum(record)
record.printSchema()
items_dict = items.select("item_id", "name").distinct().collect()
items_dict = {v["item_id"]:v["name"] for v in items_dict}


nitems = udf(lambda x : itemstransform(x), StringType())

record = record.withColumn("item0", nitems(col("participants_info_ids_stats_item0")))
record = record.withColumn("item1", nitems(col("participants_info_ids_stats_item1")))
record = record.withColumn("item2", nitems(col("participants_info_ids_stats_item2")))
record = record.withColumn("item3", nitems(col("participants_info_ids_stats_item3")))
record = record.withColumn("item4", nitems(col("participants_info_ids_stats_item4")))
record = record.withColumn("item5", nitems(col("participants_info_ids_stats_item5")))
record = record.withColumn("item6", nitems(col("participants_info_ids_stats_item6")))

champions_dict = champions.select("key", "name").distinct().collect()
champions_dict = {v["key"]:v["name"] for v in champions_dict}
new_cols_champions = udf(lambda x : championstransform(x), StringType())

root
 |-- colum0: integer (nullable = true)
 |-- matchCreation: double (nullable = true)
 |-- matchDuration: double (nullable = true)
 |-- matchId: double (nullable = true)
 |-- matchMode: string (nullable = true)
 |-- matchType: string (nullable = true)
 |-- matchVersion: string (nullable = true)
 |-- mapId: double (nullable = true)
 |-- participantIdentities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- participantId: long (nullable = true)
 |    |    |-- player: struct (nullable = true)
 |    |    |    |-- accountId: string (nullable = true)
 |    |    |    |-- currentAccountId: string (nullable = true)
 |    |    |    |-- currentPlatformId: string (nullable = true)
 |    |    |    |-- matchHistoryUri: string (nullable = true)
 |    |    |    |-- platformId: string (nullable = true)
 |    |    |    |-- profileIcon: long (nullable = true)
 |    |    |    |-- summonerId: string (nullable = true)
 |    |    |    |-- summonerName: string (nullab

In [25]:
champions_dict = champions.select("key", "name").distinct().collect()
champions_dict = {v["key"]:v["name"] for v in champions_dict}
new_cols_champions = udf(lambda x : championstransform(x), StringType())



record = record.withColumn("name_champion", new_cols_champions(col("participants_info_ids_championId")))
record.createOrReplaceTempView("record")



In [21]:
# SQL querrie to extract the victory stats for each  player with an especific champion"
summonerchampWR = sqlContext.sql("""
    SELECT victorys.id as user, victorys.name_champion as name_champion, victorys.won_matches,                                                 matches.total_matches, victorys.won_matches/matches.total_matches as win_rate 
    FROM 
    (SELECT record.participants_info_info_player_accountId as id, record.name_champion as name_champion, COUNT(DISTINCT(record.matchId))       as     won_matches 
    FROM record 
    WHERE record.participants_info_ids_stats_win == true 
    GROUP BY record.participants_info_info_player_accountId, record.name_champion) as victorys 
    LEFT JOIN (SELECT record.participants_info_info_player_accountId as id, record.name_champion as name_champion,                             COUNT(DISTINCT(record.matchId)) as total_matches 
    FROM record 
    GROUP BY record.participants_info_info_player_accountId, record.name_champion) as matches
    ON victorys.id=matches.id AND victorys.name_champion = matches.name_champion
    ORDER BY victorys.id DESC """) 
summonerchampWR.createOrReplaceTempView("summonerchampWR");

champwinrate = sqlContext.sql("""
    SELECT victorys.name_champion as name_champion, victorys.won_matches, matches.total_matches,                                               victorys.won_matches/matches.total_matches as win_rate                               
    FROM 
    (SELECT record.name_champion as name_champion, COUNT(DISTINCT(record.matchId)) as won_matches 
    FROM record 
    WHERE record.participants_info_ids_stats_win == true 
    GROUP BY record.name_champion) as victorys 
    LEFT JOIN (SELECT record.name_champion as name_champion, COUNT(DISTINCT(record.matchId)) as total_matches 
    FROM record 
    GROUP BY record.name_champion) as matches 
    ON victorys.name_champion = matches.name_champion
    ORDER BY matches.total_matches DESC """) 
champwinrate.createOrReplaceTempView("champions");


championFI = sqlContext.sql("""
	SELECT FI.championName, FI.FI_name as first_item, COUNT(FI.FI_name) as total_matches 
    FROM
    (SELECT record.name_champion as championName, record.item0  as FI_name 
    FROM record 
	WHERE record.participants_info_info_player_accountId 
	IN ( 
	SELECT summonerchampWR.user
	FROM summonerchampWR 
    WHERE summonerchampWR.win_rate > 0.5
	)) as FI
    GROUP BY FI.championName, FI.FI_name 
	ORDER BY total_matches DESC """)
championFI = championFI.dropDuplicates((['championName'])).sort((['championName']))
championFI.createOrReplaceTempView("championFI"); 


In [22]:
summonerchampWR.show()

+--------------------+-------------+-----------+-------------+------------------+
|                user|name_champion|won_matches|total_matches|          win_rate|
+--------------------+-------------+-----------+-------------+------------------+
|zpdpHmvHki76wDo0T...|       Kai'Sa|          1|            1|               1.0|
|zpdpHmvHki76wDo0T...| Miss Fortune|          1|            1|               1.0|
|zpdpHmvHki76wDo0T...|         Ashe|          1|            1|               1.0|
|zpdpHmvHki76wDo0T...|      Kalista|          1|            1|               1.0|
|zkYtjdZq7DuYLDNAt...|      Lee Sin|          1|            1|               1.0|
|zkYtjdZq7DuYLDNAt...|        Shaco|          2|            3|0.6666666666666666|
|zkYtjdZq7DuYLDNAt...|      Vel'Koz|          1|            1|               1.0|
|zkYtjdZq7DuYLDNAt...|       Maokai|          1|            1|               1.0|
|zkYtjdZq7DuYLDNAt...|     Vladimir|          1|            1|               1.0|
|zkYtjdZq7DuYLDN

In [23]:
champions.show()

+------+-------+-----------+---+------------+--------------------+--------------------+--------------------+----------+-----------+------------+----------+---------------+
|colum0|version|         id|key|        name|               title|               blurb|                tags|   partype|info.attack|info.defense|info.magic|info.difficulty|
+------+-------+-----------+---+------------+--------------------+--------------------+--------------------+----------+-----------+------------+----------+---------------+
|     0| 10.6.1|     Aatrox|266|      Aatrox|    the Darkin Blade|Once honored defe...| ['Fighter', 'Tank']|Blood Well|          8|           4|         3|              4|
|     0| 10.6.1|       Ahri|103|        Ahri| the Nine-Tailed Fox|Innately connecte...|['Mage', 'Assassin']|      Mana|          3|           4|         8|              5|
|     0| 10.6.1|      Akali| 84|       Akali|  the Rogue Assassin|Abandoning the Ki...|        ['Assassin']|    Energy|          5|         

In [24]:
championFI.show()

+------------+--------------------+-------------+
|championName|          first_item|total_matches|
+------------+--------------------+-------------+
|      Aatrox|      Doran's Shield|           37|
|        Ahri|     Hextech GLP-800|           14|
|       Akali|    Hextech Gunblade|           35|
|     Alistar|Bulwark of the Mo...|           21|
|       Amumu|   Refillable Potion|            1|
|      Anivia|      Name Not Found|            1|
|       Annie|          Stormrazor|            1|
|    Aphelios|      Essence Reaver|           30|
|        Ashe|Blade of the Ruin...|           28|
|Aurelion Sol|     Hextech GLP-800|            2|
|        Azir|      Nashor's Tooth|           10|
|        Bard|          Redemption|           30|
|  Blitzcrank|Pauldrons of Whit...|           19|
|       Brand|        Luden's Echo|            4|
|       Braum|Pauldrons of Whit...|           11|
|     Caitlyn|       Infinity Edge|           18|
|     Camille|      Ravenous Hydra|           15|
