In [1]:
from pyspark.sql.types import StructType, StructField, StringType, ByteType, ShortType, BooleanType, TimestampType,LongType
from pyspark.sql.functions import col, to_timestamp, expr
from datetime import datetime
import json

# from databricks.connect import DatabricksSession

In [2]:
credentials = json.load(open('../credentials/credentials.json'))
configs = json.load(open('../configs/flows_config.json'))

storage_account_name = credentials["AZURE_BLOB_PROJECT_NAME"]
storage_account_access_key = credentials["AZURE_BLOB_STORAGE_KEY"]
blob_container = credentials["AZURE_BLOB_CONTAINER"]

In [3]:
# spark = DatabricksSession.builder.remote(
#     host = credentials["SPARK_MASTER_HOSTNAME"],
#     token = credentials["DATABRICKS_ACCESS_TOKEN"],
#     cluster_id = credentials["DATABRICKS_CLUSTER_ID"]
# ).getOrCreate()


# spark = SparkSession.builder \
    # .appName("Main Spark App")\
    # .config("spark.master", SPARK_MASTER_CONNECTION_STRING) \
    # .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")\
    # .config(f'fs.azure.account.key.{storage_account_name}.blob.core.windows.net',storage_account_access_key)\
    # .getOrCreate()

    spark

## Accounts DataFrame

### Read, Parse, and Clean Data Types

In [4]:
accounts_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/accounts/*.csv")

accounts_df = accounts_df.withColumn("revisionDate", to_timestamp(col("revisionDate"))) \
                .withColumn("profileIconId", col("profileIconId").cast(ShortType())) \
                .withColumn("summonerLevel", col("summonerLevel").cast(ShortType())) \
                .withColumn("leaguePoints", col("leaguePoints").cast(ShortType())) \
                .withColumn("wins", col("wins").cast(ShortType())) \
                .withColumn("losses", col("losses").cast(ShortType()))

accounts_df.schema 



SparkConnectGrpcException: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INVALID_ARGUMENT
	details = "BAD_REQUEST: Spark Connect is enabled only on Unity Catalog enabled Shared and Single User Clusters."
	debug_error_string = "UNKNOWN:Error received from peer ipv4:52.187.145.107:443 {created_time:"2023-04-26T08:29:26.5990023+00:00", grpc_status:3, grpc_message:"BAD_REQUEST: Spark Connect is enabled only on Unity Catalog enabled Shared and Single User Clusters."}"
>

### Cluster and Partitioning

In [None]:
spark.sql("DROP TABLE IF EXISTS accounts")
accounts_df.write.partitionBy("leagueId").mode("overwrite").saveAsTable("accounts")


### Performance Test
Uncomment the code below if you want to test performance

In [None]:

# spark.sql("DROP TABLE IF EXISTS accounts_not_clustered")

# accounts_df.write.mode("overwrite").saveAsTable("accounts_not_clustered")

# start = datetime.now()
# test = spark.sql("SELECT COUNT(*) FROM accounts GROUP BY leagueId").show(1)
# print(f"Took {datetime.now() - start} to complete with clustered dataframe")

# start = datetime.now()
# test = spark.sql("SELECT COUNT(*) FROM accounts_not_clustered GROUP BY leagueId").show(1)

# print(f"Took {datetime.now() - start } to complete with non paritioned dataframe")

# print("Partitions can be at least 25 percent faster.  Similar steps can be reproduced in the following data frame.")


## Champion Mastery DataFrame

In [None]:
champion_mastery_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/champion_mastery/*.csv")

champion_mastery_df = champion_mastery_df.withColumn("lastPlayTime", to_timestamp(col("lastPlayTime"))) \
                        .withColumn("championId", col("championId").cast(ShortType())) \
                        .withColumn("championLevel", col("championLevel").cast(ByteType())) \
                        .withColumn("tokensEarned", col("tokensEarned").cast(ByteType())) \

champion_mastery_df.printSchema()
spark.sql("DROP TABLE IF EXISTS champion_mastery")
champion_mastery_df.write.partitionBy("championId").mode("overwrite").saveAsTable("champion_mastery")


## Leagues DataFrame

In [None]:
leagues_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/leagues/*/*/*/*.csv")
leagues_df.drop_duplicates(["tier","division"]).show()
leagues_df.printSchema()
                
spark.sql("DROP TABLE IF EXISTS leagues")
leagues_df.write.partitionBy("tier").mode("overwrite").saveAsTable("leagues")
            

## Data Dragon DataFrames

In [None]:
champion_infos_df = spark.read.format("parquet").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/data_dragon/{configs["LEAGUE_PATCH"]}/champion_infos.parquet")
champion_infos_df = champion_infos_df \
                        .withColumn("key", col("key").cast(ShortType())) \
                        .withColumn("attack_stat", col("attack_stat").cast(ShortType())) \
                        .withColumn("defense_stat", col("defense_stat").cast(ShortType())) \
                        .withColumn("magic_stat", col("magic_stat").cast(ShortType())) \
                        .withColumn("difficulty_stat", col("difficulty_stat").cast(ShortType())) \
                        .withColumn("hpperlevel", col("hpperlevel").cast(ShortType())) \
                        .withColumn("base_mp", col("base_mp").cast(ShortType())) \
                        .withColumn("base_armor", col("base_armor").cast(ShortType())) \
                        .withColumn("base_spellblock", col("base_spellblock").cast(ShortType())) \
                        .withColumn("base_attackrange", col("base_attackrange").cast(ShortType())) \
                        .withColumn("base_crit", col("base_crit").cast(ShortType())) \
                        .withColumn("critperlevel", col("critperlevel").cast(ShortType())) \
                        .withColumn("base_attackdamage", col("base_attackdamage").cast(ShortType())) \

champion_infos_df.printSchema()
spark.sql("DROP TABLE IF EXISTS champion_info")
champion_infos_df.write.partitionBy("primary_class").mode("overwrite").saveAsTable("champion_info")

In [None]:
item_infos_df = spark.read.format("parquet").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/data_dragon/{configs["LEAGUE_PATCH"]}/item_infos.parquet")
item_infos_df = item_infos_df.withColumn("id", col("id").cast(ShortType()))\
    .withColumn("baseGold", col("baseGold").cast(ShortType()))\
    .withColumn("sellGold", col("sellGold").cast(ShortType()))\
    .withColumn("totalGold", col("totalGold").cast(ShortType()))\
    .withColumn("maximumStacks", col("maximumStacks").cast(ShortType()))\
    .withColumn("depth", col("depth").cast(ShortType()))\
    .withColumn("specialRecipe", col("specialRecipe").cast(ShortType()))

item_infos_df.printSchema()

spark.sql("DROP TABLE IF EXISTS item_info")
item_infos_df.write.mode("overwrite").saveAsTable("item_info")

In [None]:
spark.sql("DROP TABLE IF EXISTS map_info")
map_infos_df = spark.read.format("parquet").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/data_dragon/{configs["LEAGUE_PATCH"]}/map_infos.parquet") \
    .write.mode("overwrite").saveAsTable("map_info")


In [None]:
rune_infos_df = spark.read.format("parquet").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/data_dragon/{configs["LEAGUE_PATCH"]}/rune_infos.parquet")
rune_infos_df = rune_infos_df.withColumn("id", col("id").cast(ShortType()))\
             .withColumn("rune_level", col("rune_level").cast(ShortType()))\

spark.sql("DROP TABLE IF EXISTS rune_info")
rune_infos_df.write.mode("overwrite").saveAsTable("rune_info")

In [None]:
summoner_spells_info_df = spark.read.format("parquet").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/data_dragon/{configs["LEAGUE_PATCH"]}/summoner_spells_info.parquet")
summoner_spells_info_df = summoner_spells_info_df.withColumn("key", col("key").cast(ShortType()))\
                        .withColumn("maxrank", col("maxrank").cast(ShortType()))\
                        .withColumn("cooldown", col("cooldown").cast(ShortType()))\
                        .withColumn("cooldownBurn", col("cooldownBurn").cast(ShortType()))\
                        .withColumn("cost", col("cost").cast(ShortType()))\
                        .withColumn("minimumSummonerLevel", col("minimumSummonerLevel").cast(ShortType()))\
                        .withColumn("rangeBurn", col("rangeBurn").cast(ShortType()))\

spark.sql("DROP TABLE IF EXISTS summoner_spells_info")
summoner_spells_info_df.write.mode("overwrite").saveAsTable("summoner_spells_info")

## General Matches DataFrame

In [None]:
general_match_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/match/general/*/*/*/*.csv")

general_match_df = general_match_df.withColumn("gameCreation", to_timestamp(col("gameCreation")/1000)) \
            .withColumn("gameStartTimestamp", to_timestamp(col("gameStartTimestamp")/1000)) \
            .withColumn("gameEndTimestamp", to_timestamp(col("gameEndTimestamp")/1000)) \
            .withColumn("queueId", col("queueId").cast(ShortType()))\
            .withColumn("mapId", col("mapId").cast(ShortType()))\
            .withColumn("dataVersion", col("dataVersion").cast(ByteType()))\
            .withColumn("gameDuration", expr("interval '1 second' * gameDuration"))

spark.sql("DROP TABLE IF EXISTS general_match")
general_match_df.write.partitionBy("gameMode").mode("overwrite").saveAsTable("general_match")


In [None]:
players_match_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/match/players/*/*/*/*.csv")

for column, col_type in players_match_df.dtypes:
    # To Fix, challenges df not removed in ETL process: match_entries.py:getMatchPlayersInfo
    if "challenges" in column:
        players_match_df = players_match_df.drop(column)
    if col_type == "bigint":
        players_match_df = players_match_df.withColumn(column, col(column).cast(ShortType()))

spark.sql("DROP TABLE IF EXISTS players_match")
players_match_df.write.partitionBy("championId").mode("overwrite").saveAsTable("players_match")

In [None]:
spark.conf.set("spark.sql.parquet.enableVectorizedReader","true")
players_challenges_match_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/match/players_challenges/*/*/*/*.csv")
double_rows = ["damagePerMinute",
"damageTakenOnTeamPercentage",
"effectiveHealAndShielding",
"gameLength",
"goldPerMinute",
"kda",
"killParticipation'",
"shortestTimeToAceFromFirstTakedown",
"teamDamagePercentage"]


for column, col_type in players_challenges_match_df.dtypes:
    if (col_type == "bigint" or col_type =="double") and col_type not in double_rows:
        players_challenges_match_df = players_challenges_match_df.withColumn(column, col(column).cast(ShortType()))

players_challenges_match_df = players_challenges_match_df.withColumn('alliedJungleMonsterKills', col('alliedJungleMonsterKills').cast(LongType()))

#Table has no partitions
spark.sql("DROP TABLE IF EXISTS player_challenges_match")
players_challenges_match_df.write.mode("overwrite").saveAsTable("player_challenges_match")



In [None]:
teams_match_df = spark.read.format("csv").option("header","true").load(f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net/resources/match/teams/*/*/*/*.csv")

for column, col_type in teams_match_df.dtypes:
    if (col_type == "bigint" or col_type =="double"):
        teams_match_df = teams_match_df.withColumn(column, col(column).cast(ShortType()))

spark.sql("DROP TABLE IF EXISTS teams_match")
#Table has no partitions
teams_match_df.write.mode("overwrite").saveAsTable("teams_match")


END OF FILE

In [None]:
spark.sql("SELECT matchId, championId FROM players_match ORDER BY championId DESC").show()

In [None]:
%sql


     SELECT COUNT(DISTINCT(matchId)) FROM `dbt_petecastle`.`champion_picks_bans`
    WHERE type = "ban" AND championId = "Aatrox"



In [None]:
spark.sql(
    """
    WITH temp_table AS (
        SELECT * FROM dbt_petecastle.champion_item_picks
        WHERE championId = 235 
        -- GROUP BY matchId
    )
    SELECT *
    FROM temp_table
    ORDER BY matchId
    """).show()

# -- 3262


In [None]:
spark.sql(
    """
        SELECT 
            primaryStyle_0_id,
            -- primaryStyle_0_var1,
            -- primaryStyle_0_var2,
            -- primaryStyle_0_var3,
            primaryStyle_1_id,
            -- primaryStyle_1_var1,
            -- primaryStyle_1_var2,
            -- primaryStyle_1_var3,
            primaryStyle_2_id,
            -- primaryStyle_2_var1,
            -- primaryStyle_2_var2,
            -- primaryStyle_2_var3,
            primaryStyle_3_id
            -- primaryStyle_3_var1,
            -- primaryStyle_3_var2,
            -- primaryStyle_3_var3
        FROM players_match
    
    """).show()