In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import explode, col
import pymongo
import json
import uuid
import pandas as pd
from pyspark.sql.types import Row
import json

In [2]:
# start pyspark
spark = SparkSession.builder.appName("BigData").master("local[*]").getOrCreate()
sqlContext = SparkSession(spark)
spark.sparkContext.setLogLevel("ERROR")

Input from MongoDB (KillianCluster)

In [5]:
# read input 
input_client = pymongo.MongoClient(
    "mongodb+srv://killian0812:KfEXux78H4e5F5FG@killiancluster.1sfdevm.mongodb.net/?retryWrites=true&w=majority"
)
input_db = input_client["LOL_data"]  # select database
input = input_db["test"]  # select collection

data = list(input.find({}))
for document in data:
    document["_id"] = str(document["_id"])

In [6]:
json_data = json.dumps(data, indent=4)
with open("result.json", 'w') as file:  # Write to result.json 
    file.write(json_data)
df = spark.read.option("multiline", "true").json('result.json')   # get dataframe

In [7]:
# count fields
def count_fields(schema):
    count = 0
    for field in schema.fields:
        count += 1
        if isinstance(field.dataType, StructType):
            count += count_fields(field.dataType)
        elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
            count += count_fields(field.dataType.elementType)
    return count

# Get the total count of fields
total_fields = count_fields(df.schema)
print(f"Total fields including nested fields: {total_fields}")

Total fields including nested fields: 337


In [8]:
# choose classic matches
classic_df= df.select(
    explode("info.participants").alias("participant"),
    "info.gameId",
    "info.gameMode",
    "info.gameVersion").where("gameMode = 'CLASSIC'")

# remove duplicate
classic_df = classic_df.dropDuplicates()

In [56]:
classic_df.show(10)

+--------------------+---------+--------+--------------+
|         participant|   gameId|gameMode|   gameVersion|
+--------------------+---------+--------+--------------+
|{0, 3, 10, 0, 0, ...|466060136| CLASSIC|14.11.589.9418|
|{0, 1, 0, 0, 0, 0...|466015778| CLASSIC|14.11.589.9418|
|{0, 5, 19, 0, 0, ...|466037478| CLASSIC|14.11.589.9418|
|{0, 0, 20, 0, 0, ...|465374152| CLASSIC|14.11.589.9418|
|{0, 0, 0, 0, 0, 3...|465321501| CLASSIC|14.11.587.6777|
|{0, 1, 3, 0, 0, 2...|466176524| CLASSIC|14.11.589.9418|
|{0, 5, 3, 0, 0, 1...|465559270| CLASSIC|14.11.587.6777|
|{0, 0, 15, 0, 0, ...|462711941| CLASSIC|14.11.587.6777|
|{0, 0, 9, 0, 0, 1...|465971982| CLASSIC|14.11.589.9418|
|{0, 2, 1, 0, 0, 0...|463461978| CLASSIC|14.11.589.9418|
+--------------------+---------+--------+--------------+
only showing top 10 rows



DF of CLASSIC Matches

In [9]:
match_df = classic_df.select(
    "gameId",
    "participant.riotIdGameName",
    "participant.championId",
    "participant.championName",
    "participant.teamId",
    "participant.teamPosition",
    "participant.win"
)
match_df = match_df.withColumn("win", col("win").cast("integer"))

In [10]:
match_df.show(20)

+---------+----------------+----------+------------+------+------------+---+
|   gameId|  riotIdGameName|championId|championName|teamId|teamPosition|win|
+---------+----------------+----------+------------+------+------------+---+
|466060136|            híuu|        81|      Ezreal|   100|      BOTTOM|  1|
|466015778|           bedom|       268|        Azir|   100|      MIDDLE|  0|
|466037478|          Orsted|        22|        Ashe|   200|     UTILITY|  1|
|465374152|         New Day|         3|       Galio|   100|     UTILITY|  1|
|465321501|       Shinigami|        58|    Renekton|   100|         TOP|  1|
|466176524|        Y Tá Meo|       221|        Zeri|   200|      BOTTOM|  1|
|465559270|           Acien|        18|    Tristana|   100|      MIDDLE|  1|
|462711941|       Bạch liễu|       267|        Nami|   100|     UTILITY|  1|
|465971982|         1652002|       238|         Zed|   200|      MIDDLE|  1|
|463461978|           Mingg|        23|  Tryndamere|   100|      MIDDLE|  0|

Get matchup winrates

In [11]:
match_df.createOrReplaceTempView("match")
join_query = """
SELECT m1.gameId, m1.championName as champion1, m2.championName as champion2,m1.teamPosition, m2.teamPosition,m1.teamId as team1, m2.teamId as team2, m1.win
FROM match m1, match m2
WHERE m1.gameId = m2.gameId 
    AND m1.teamPosition = m2.teamPosition
    AND m1.teamId != m2.teamId
"""
matchup_each_game_df = spark.sql(join_query)

In [12]:
matchup_each_game_df.show(10)

+---------+---------+---------+------------+------------+-----+-----+---+
|   gameId|champion1|champion2|teamPosition|teamPosition|team1|team2|win|
+---------+---------+---------+------------+------------+-----+-----+---+
|466186195|    Garen| Tristana|      MIDDLE|      MIDDLE|  100|  200|  1|
|468885046|   Thresh|  Skarner|     UTILITY|     UTILITY|  100|  200|  1|
|469048347|    Sylas|   LeeSin|      JUNGLE|      JUNGLE|  200|  100|  1|
|468885046|  Skarner|   Thresh|     UTILITY|     UTILITY|  200|  100|  0|
|469714161|     Jinx|  Caitlyn|      BOTTOM|      BOTTOM|  100|  200|  1|
|466151695|      Vex|  Taliyah|      MIDDLE|      MIDDLE|  200|  100|  0|
|466239170|    Brand|    Shaco|     UTILITY|     UTILITY|  200|  100|  0|
|464947645|    Nasus|     Sett|         TOP|         TOP|  200|  100|  0|
|469709192| Vladimir|   Aatrox|         TOP|         TOP|  200|  100|  0|
|466238365|   Khazix|  Hecarim|      JUNGLE|      JUNGLE|  100|  200|  1|
+---------+---------+---------+-------

In [14]:
from pyspark.sql.functions import col, when, count, sum

In [15]:
matchup_df = (
    matchup_each_game_df.groupBy("champion1", "champion2")
    .agg(sum("win").alias("total_wins"), count("gameId").alias("total_games"))
    .withColumn("champion1_win_rate", col("total_wins") / col("total_games"))
    .orderBy("champion1", "champion2")
)

In [16]:
matchup_df.show()

+---------+------------+----------+-----------+------------------+
|champion1|   champion2|total_wins|total_games|champion1_win_rate|
+---------+------------+----------+-----------+------------------+
|   Aatrox|      Aatrox|         3|          6|               0.5|
|   Aatrox|       Akali|         5|          6|0.8333333333333334|
|   Aatrox|       Annie|         1|          1|               1.0|
|   Aatrox| AurelionSol|         1|          1|               1.0|
|   Aatrox|       Brand|         0|          1|               0.0|
|   Aatrox|     Camille|         7|         11|0.6363636363636364|
|   Aatrox|  Cassiopeia|         0|          1|               0.0|
|   Aatrox|      Darius|         0|          5|               0.0|
|   Aatrox|       Fiora|         2|          5|               0.4|
|   Aatrox|   Gangplank|         1|          1|               1.0|
|   Aatrox|       Garen|         0|          1|               0.0|
|   Aatrox|        Gnar|         2|          4|               

In [21]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
filter = 5
matchups_over_filter = matchup_df.filter(F.col('total_games') > filter)
windowSpec = Window.partitionBy('champion1').orderBy('champion1_win_rate')
ranked_matchups = matchups_over_filter.withColumn("rank", F.row_number().over(windowSpec))
worst_matchups = ranked_matchups.filter(F.col("rank") <= 5)

In [22]:
windowSpec = Window.partitionBy('champion1').orderBy(col('champion1_win_rate').desc())
ranked_matchups = matchups_over_filter.withColumn("rank", F.row_number().over(windowSpec))
best_matchups = ranked_matchups.filter(F.col("rank") <= 5)

In [24]:
csv_path = "best_matchups_output"
best_matchups.coalesce(1).write.csv(csv_path)

csv_path = "worst_matchups_output"
worst_matchups.coalesce(1).write.csv(csv_path)

In [25]:
output_client = pymongo.MongoClient(
    "mongodb+srv://ngcuong0812:FZSTDudoxnwIh38A@atlassearch.dryw8rf.mongodb.net/?retryWrites=true&w=majority&appName=AtlasSearch"
)

output_db = output_client["LOL_data"]  # select database

best_matchups_collection = output_db["best_matchups"]  # select collection

best_matchups_collection.delete_many({}) # clear

best_matchups_list = best_matchups.collect()
best_matchups_dicts = [row.asDict() for row in best_matchups_list]
best_matchups_collection.insert_many(best_matchups_dicts)

InsertManyResult([ObjectId('666ecb90affd0313c8bb819c'), ObjectId('666ecb90affd0313c8bb819d'), ObjectId('666ecb90affd0313c8bb819e'), ObjectId('666ecb90affd0313c8bb819f'), ObjectId('666ecb90affd0313c8bb81a0'), ObjectId('666ecb90affd0313c8bb81a1'), ObjectId('666ecb90affd0313c8bb81a2'), ObjectId('666ecb90affd0313c8bb81a3'), ObjectId('666ecb90affd0313c8bb81a4'), ObjectId('666ecb90affd0313c8bb81a5'), ObjectId('666ecb90affd0313c8bb81a6'), ObjectId('666ecb90affd0313c8bb81a7'), ObjectId('666ecb90affd0313c8bb81a8'), ObjectId('666ecb90affd0313c8bb81a9'), ObjectId('666ecb90affd0313c8bb81aa'), ObjectId('666ecb90affd0313c8bb81ab'), ObjectId('666ecb90affd0313c8bb81ac'), ObjectId('666ecb90affd0313c8bb81ad'), ObjectId('666ecb90affd0313c8bb81ae'), ObjectId('666ecb90affd0313c8bb81af'), ObjectId('666ecb90affd0313c8bb81b0'), ObjectId('666ecb90affd0313c8bb81b1'), ObjectId('666ecb90affd0313c8bb81b2'), ObjectId('666ecb90affd0313c8bb81b3'), ObjectId('666ecb90affd0313c8bb81b4'), ObjectId('666ecb90affd0313c8bb81

In [26]:
worst_matchups_collection = output_db["worst_matchups"]  # select collection

worst_matchups_collection.delete_many({}) # clear

worst_matchups_list = worst_matchups.collect()
worst_matchups_dicts = [row.asDict() for row in worst_matchups_list]
worst_matchups_collection.insert_many(worst_matchups_dicts)

InsertManyResult([ObjectId('666ecbb6affd0313c8bb8291'), ObjectId('666ecbb6affd0313c8bb8292'), ObjectId('666ecbb6affd0313c8bb8293'), ObjectId('666ecbb6affd0313c8bb8294'), ObjectId('666ecbb6affd0313c8bb8295'), ObjectId('666ecbb6affd0313c8bb8296'), ObjectId('666ecbb6affd0313c8bb8297'), ObjectId('666ecbb6affd0313c8bb8298'), ObjectId('666ecbb6affd0313c8bb8299'), ObjectId('666ecbb6affd0313c8bb829a'), ObjectId('666ecbb6affd0313c8bb829b'), ObjectId('666ecbb6affd0313c8bb829c'), ObjectId('666ecbb6affd0313c8bb829d'), ObjectId('666ecbb6affd0313c8bb829e'), ObjectId('666ecbb6affd0313c8bb829f'), ObjectId('666ecbb6affd0313c8bb82a0'), ObjectId('666ecbb6affd0313c8bb82a1'), ObjectId('666ecbb6affd0313c8bb82a2'), ObjectId('666ecbb6affd0313c8bb82a3'), ObjectId('666ecbb6affd0313c8bb82a4'), ObjectId('666ecbb6affd0313c8bb82a5'), ObjectId('666ecbb6affd0313c8bb82a6'), ObjectId('666ecbb6affd0313c8bb82a7'), ObjectId('666ecbb6affd0313c8bb82a8'), ObjectId('666ecbb6affd0313c8bb82a9'), ObjectId('666ecbb6affd0313c8bb82

Get overall winrates

In [61]:
from pyspark.sql.functions import col, count, sum

win_rate_df = (
    match_df.groupBy("championName")
    .agg(sum("win").alias("totalWins"), count("win").alias("totalGames"))
    .withColumn("winRate", col("totalWins") / col("totalGames"))
    .orderBy("championName")
)

win_rate_df.count()

167

In [66]:
win_rate_df.coalesce(1).write.csv("winrate_output") # write to local folder
win_rate_df.show(20)

+------------+---------+----------+-------------------+
|championName|totalWins|totalGames|            winRate|
+------------+---------+----------+-------------------+
|      Aatrox|      101|       200|              0.505|
|        Ahri|       43|        86|                0.5|
|       Akali|       50|       109|0.45871559633027525|
|      Akshan|       28|        46| 0.6086956521739131|
|     Alistar|       40|        87|0.45977011494252873|
|       Amumu|        3|         9| 0.3333333333333333|
|      Anivia|        6|        11| 0.5454545454545454|
|       Annie|       16|        30| 0.5333333333333333|
|    Aphelios|       31|        65|0.47692307692307695|
|        Ashe|       44|        89| 0.4943820224719101|
| AurelionSol|       26|        52|                0.5|
|        Azir|        6|        18| 0.3333333333333333|
|        Bard|        9|        17| 0.5294117647058824|
|     Belveth|        9|        22| 0.4090909090909091|
|  Blitzcrank|       30|        62| 0.4838709677

Push winrates to MongoDB

In [85]:
winrate_collection = output_db["winrate"]  # select collection

winrate_collection.delete_many({}) # clear

win_rate_list = win_rate_df.collect()
win_rate_dicts = [row.asDict() for row in win_rate_list]
winrate_collection.insert_many(win_rate_dicts)

InsertManyResult([ObjectId('666eb85bf27cb3469932df27'), ObjectId('666eb85bf27cb3469932df28'), ObjectId('666eb85bf27cb3469932df29'), ObjectId('666eb85bf27cb3469932df2a'), ObjectId('666eb85bf27cb3469932df2b'), ObjectId('666eb85bf27cb3469932df2c'), ObjectId('666eb85bf27cb3469932df2d'), ObjectId('666eb85bf27cb3469932df2e'), ObjectId('666eb85bf27cb3469932df2f'), ObjectId('666eb85bf27cb3469932df30'), ObjectId('666eb85bf27cb3469932df31'), ObjectId('666eb85bf27cb3469932df32'), ObjectId('666eb85bf27cb3469932df33'), ObjectId('666eb85bf27cb3469932df34'), ObjectId('666eb85bf27cb3469932df35'), ObjectId('666eb85bf27cb3469932df36'), ObjectId('666eb85bf27cb3469932df37'), ObjectId('666eb85bf27cb3469932df38'), ObjectId('666eb85bf27cb3469932df39'), ObjectId('666eb85bf27cb3469932df3a'), ObjectId('666eb85bf27cb3469932df3b'), ObjectId('666eb85bf27cb3469932df3c'), ObjectId('666eb85bf27cb3469932df3d'), ObjectId('666eb85bf27cb3469932df3e'), ObjectId('666eb85bf27cb3469932df3f'), ObjectId('666eb85bf27cb3469932df

Get winrates by position

In [86]:
# calculate win rate of champion in each teamposition
from pyspark.sql.functions import col, avg
winrate_pos_df = match_df.groupBy('championName', 'teamPosition').agg(
    avg(col('win')).alias('winRate')
).orderBy('championName', 'teamPosition')

In [72]:
winrate_pos_df.coalesce(1).write.csv("winrate_pos_output")
winrate_pos_df.show()

+------------+------------+-------------------+
|championName|teamPosition|            winRate|
+------------+------------+-------------------+
|      Aatrox|      JUNGLE| 0.6666666666666666|
|      Aatrox|      MIDDLE|               0.75|
|      Aatrox|         TOP|0.49740932642487046|
|        Ahri|      MIDDLE|                0.5|
|       Akali|      BOTTOM|                0.0|
|       Akali|      MIDDLE| 0.5454545454545454|
|       Akali|         TOP|            0.40625|
|      Akshan|      MIDDLE| 0.6428571428571429|
|      Akshan|         TOP|               0.25|
|     Alistar|     UTILITY|0.45977011494252873|
|       Amumu|      JUNGLE|                0.5|
|       Amumu|     UTILITY|                0.2|
|      Anivia|      MIDDLE| 0.6666666666666666|
|      Anivia|     UTILITY|                0.0|
|       Annie|      MIDDLE| 0.5714285714285714|
|       Annie|         TOP|                0.0|
|       Annie|     UTILITY|                0.0|
|    Aphelios|      BOTTOM|0.47692307692

In [87]:
winrate_pos_collection = output_db["winrate_pos"]  # select collection

winrate_pos_collection.delete_many({}) # clear

win_rate_pos_list = winrate_pos_df.collect()
win_rate_pos_dicts = [row.asDict() for row in win_rate_pos_list]
winrate_pos_collection.insert_many(win_rate_pos_dicts)

InsertManyResult([ObjectId('666eb8ecf27cb3469932dfce'), ObjectId('666eb8ecf27cb3469932dfcf'), ObjectId('666eb8ecf27cb3469932dfd0'), ObjectId('666eb8ecf27cb3469932dfd1'), ObjectId('666eb8ecf27cb3469932dfd2'), ObjectId('666eb8ecf27cb3469932dfd3'), ObjectId('666eb8ecf27cb3469932dfd4'), ObjectId('666eb8ecf27cb3469932dfd5'), ObjectId('666eb8ecf27cb3469932dfd6'), ObjectId('666eb8ecf27cb3469932dfd7'), ObjectId('666eb8ecf27cb3469932dfd8'), ObjectId('666eb8ecf27cb3469932dfd9'), ObjectId('666eb8ecf27cb3469932dfda'), ObjectId('666eb8ecf27cb3469932dfdb'), ObjectId('666eb8ecf27cb3469932dfdc'), ObjectId('666eb8ecf27cb3469932dfdd'), ObjectId('666eb8ecf27cb3469932dfde'), ObjectId('666eb8ecf27cb3469932dfdf'), ObjectId('666eb8ecf27cb3469932dfe0'), ObjectId('666eb8ecf27cb3469932dfe1'), ObjectId('666eb8ecf27cb3469932dfe2'), ObjectId('666eb8ecf27cb3469932dfe3'), ObjectId('666eb8ecf27cb3469932dfe4'), ObjectId('666eb8ecf27cb3469932dfe5'), ObjectId('666eb8ecf27cb3469932dfe6'), ObjectId('666eb8ecf27cb3469932df

Push to HDFS

In [73]:
import os

# Create folder
mkdir_command = f"hadoop fs -mkdir -p /output"
os.system(mkdir_command)

# Đẩy lên HDFS
hadoop_command = f"hadoop fs -put -f winrate_pos_output /output"
os.system(hadoop_command)

hadoop_command = f"hadoop fs -put -f winrate_output /output"
os.system(hadoop_command)

1

CLEAN DATA

In [74]:
columns_to_keep = ["_id", 'metadata',
                    "info.gameMode",
                    'info.gameId',
                    'info.gameType',
                    ]


In [75]:
main_to_keep = ["_id", 'metadata',
                    "gameMode",
                    'gameId',
                    'gameType',
                    ]

In [76]:
columns_to_keep_per_participant = [
    'assists',
    'challenges.deathsByEnemyChamps',
    'challenges.damagePerMinute',
    'challenges.kda',
    'challenges.legendaryItemUsed',
    'challenges.takedowns',
    'champExperience',
    'champLevel',
    'championId',
    'championName',
    'deaths',
    'goldEarned',
    'goldSpent',
    'individualPosition',
    'item0',
    'item1',
    'item2',
    'item3',
    'item4',
    'item5',
    'item6',
    'kills',
    'lane',
    'participantId',
    'teamId',
    'totalDamageDealtToChampions',
    'totalDamageTaken',
    'totalHeal',
    'totalMinionsKilled',
    ]

In [77]:
columns_keep = [
    'assists',
    'deathsByEnemyChamps',
    'damagePerMinute',
    'kda',
    'legendaryItemUsed',
    'takedowns',
    'champExperience',
    'champLevel',
    'championId',
    'championName',
    'deaths',
    'goldEarned',
    'goldSpent',
    'individualPosition',
    'item0',
    'item1',
    'item2',
    'item3',
    'item4',
    'item5',
    'item6',
    'kills',
    'lane',
    'participantId',
    'teamId',
    'totalDamageDealtToChampions',
    'totalDamageTaken',
    'totalHeal',
    'totalMinionsKilled',
]

In [78]:
from pyspark.sql.functions import lit
from pathlib import Path
from pyspark.sql.functions import col, lit, create_map, array, struct
import json
def initialize_empty_dataframes(n):
    return [pd.DataFrame() for _ in range(n)]

In [79]:
#data processing in each file json
total_result = []
file_paths = [str(path) for path in Path("D:/BigData/Data/MatchDetail/").glob("*.json")]
for file_path in file_paths:
    
    
    df = spark.read.option("multiline", "true").json(file_path)

    #check whether gamemode is classic
    if (df.select('info.gameMode').first()[0] != 'CLASSIC'):
        continue

    #shorten dataframe, choose appropriate field
    df_selected = df.select(
    *[col(col_name) for col_name in columns_to_keep],
    *[col(f'info.participants.{col_name}').alias(f'{col_na}')
      for (col_name, col_na) in zip(columns_to_keep_per_participant, columns_keep)]
    )
    df_selected = df_selected.select('*').where("gameMode = 'CLASSIC'")
    df_selected = df_selected.dropDuplicates()

    #convert to json 
    df_pd = df_selected.toPandas()
    df_dic = df_pd.to_dict()
    df_dict = df_pd.iloc[0].to_dict()
    df_com = initialize_empty_dataframes(len(main_to_keep))
    for j in range(len(main_to_keep)):
        df_com[j] = df_dict.pop(main_to_keep[j])
    df_result = pd.DataFrame(df_dict)
    list_of_dicts = df_result.to_dict('records')
    result = {main_to_keep[i] : df_com[i] for i in range(len(main_to_keep))}
    result['participant'] = list_of_dicts

    #total result is a file contain all clean data
    total_result.append(result)

    #save to json file (can change name later)
    json_result_data = json.dumps(result, indent=4)
    for i in range(len(file_paths)):
        with open(f"D:/BigData/Data/result_{isinstance}", 'w') as file:
            file.write(json_result_data)
    
        # Đẩy file json lên HDFS
        hadoop_command = f"hadoop fs -put -D:/BigData/Data/result_{i}.json /transformed/"
        os.system(hadoop_command)
    


Py4JError: An error occurred while calling o461.pandasStructHandlingMode. Trace:
py4j.Py4JException: Method pandasStructHandlingMode([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)



In [None]:
#save file total_result to json
json_result_data = json.dumps(total_result, indent=4)
with open("D:/BigData/Data/total_result.json", 'w') as file:
    file.write(json_result_data)

In [None]:
#push saved json file above to mongodb
import pymongo
import json
import uuid
client = pymongo.MongoClient('mongodb+srv://ngcuong0812:FZSTDudoxnwIh38A@atlassearch.dryw8rf.mongodb.net/?retryWrites=true&w=majority&appName=AtlasSearch')

db = client['LOL_data'] #select database 
collection = db['match_detail'] #select collection

file_paths = [str(path) for path in Path("D:/BigData/Data/").glob("*.json")]
#rad each of file
for file_path in file_paths:
    #read file json
    with open(file_path) as f:
        file_data = json.load(f)

    new_id = str(uuid.uuid4())
    file_data["_id"] = new_id

    #push it to collection
    collection.insert_one(file_data)
client.close()

In [35]:
spark.stop()