## Importing Libraries

In [None]:
import os

from delta import *
from delta.tables import *

from dotenv import load_dotenv

import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions as F

## Loading Environment Variables

In [None]:
load_dotenv()

api_key = os.getenv("API_KEY")

## Create Spark Session

In [None]:
#  Create a spark session with Delta
builder = pyspark.sql.SparkSession.builder.appName("esports_tournaments_silver") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Create spark context
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

## Load Recent Tournaments Data

In [None]:
df = spark.read.parquet('../../data/bronze/esports_tournaments.parquet')
df.printSchema()
df.show()

### Clean Data

In [None]:
na_rows = df.count() - df.na.drop().count()
print(na_rows)

In [None]:
duplicate_rows = df.count() - df.dropDuplicates().count()
print(duplicate_rows)

In [None]:
df.describe().show()

In [None]:
df = df.withColumn('StartDate', F.when((df['StartDate'] == '0202-05-07') & (df['GameId'] == 785), '2022-05-07').otherwise(df['StartDate']))

In [None]:
df = df.na.drop()
df = df.dropDuplicates()

### Set Data Type

In [None]:
df_prep = df.withColumn('EndDate', df['EndDate'].cast(DateType())) \
            .withColumn('GameId', df['GameId'].cast(IntegerType())) \
            .withColumn('Location', df['Location'].cast(StringType())) \
            .withColumn('StartDate', df['StartDate'].cast(DateType()))  \
            .withColumn('Teamplay', df['Teamplay'].cast(IntegerType())) \
            .withColumn('TotalUSDPrize', df['TotalUSDPrize'].cast(FloatType())) \
            .withColumn('TournamentId', df['TournamentId'].cast(IntegerType())) \
            .withColumn('TournamentName', df['TournamentName'].cast(StringType()))


### Save Data

In [None]:
local_path = f"../../data/silver/esports_tournaments"
df.coalesce(1).write.format("delta").option("header", "true").mode("overwrite").save(local_path)

In [None]:
got_df = spark.read.format('delta').load('../../data/silver/esports_tournaments')
got_df.show()

## Load Games Genre

In [None]:
df = spark.read.parquet('/../../data/bronze/esports_games_genre.parquet')
df.printSchema()
df.show()

### Clean Data

In [None]:
na_rows = df.count() - df.na.drop().count()
print(na_rows)

In [None]:
duplicate_rows = df.count() - df.dropDuplicates().count()
print(duplicate_rows)

In [None]:
df = df.na.drop()
df = df.dropDuplicates()

### Save Data

In [None]:
local_path = f"../../data/silver/esports_games_genre"
df.coalesce(1).write.format("delta").option("header", "true").mode("overwrite").save(local_path)
df.show()

## Load Games Awarding Prize Money

In [None]:
df = spark.read.parquet('../../data/bronze/esports_games_awarding_prize_money.parquet')
df.printSchema()
df.show()

### Clean Data

In [None]:
na_rows = df.count() - df.na.drop().count()
print(na_rows)

In [None]:
duplicate_rows = df.count() - df.dropDuplicates().count()
print(duplicate_rows)

In [None]:
df = df.na.drop()
df = df.dropDuplicates()

### Set Data Types

In [None]:
df_prep = df.withColumn('GameId', df['GameId'].cast(IntegerType())) \
            .withColumn('GameName', df['GameName'].cast(StringType())) \
            .withColumn('TotalPlayers', df['TotalPlayers'].cast(IntegerType())) \
            .withColumn('TotalTournaments', df['TotalTournaments'].cast(IntegerType()))  \
            .withColumn('TotalUSDPrize', df['TotalUSDPrize'].cast(FloatType())) \

### Save Data

In [None]:
local_path = f"../../data/silver/esports_games_awarding_prize_money_test"
df_prep.coalesce(1).write.format("parquet").option("header", "true").mode("overwrite").save(local_path)
df_prep.show()