In [1]:
import pyspark
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from datetime import datetime, timedelta
from pyspark.sql.types import *
import random

In [None]:
# credentials_location = "/home/abdol/AFCON_2023_DE_Stats/code/mage-spark/keys/my-creds.json"

In [2]:
credentials_location = "/home/abdol/keys/afcon_project/my-creds.json"

In [3]:
GCS_connector = "/home/abdol/AFCON_2023_DE_Stats/lib/gcs-connector-hadoop3-2.2.5.jar" 
GBQ_connector = "/home/abdol/AFCON_2023_DE_Stats/lib/spark-3.3-bigquery-0.36.1.jar"
conf_jars = f"{GCS_connector},{GBQ_connector}"

In [None]:
# temp_GCS_Bucket = "cloud_bucket_dbt"

In [4]:
temp_GCS_Bucket = "afcon_datalake"
bucket_name = "afcon_datalake"

In [5]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", conf_jars) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .set("temporaryGcsBucket",temp_GCS_Bucket)

In [None]:
all_configs = conf.getAll()
for key, value in all_configs:
    print(f"{key}: {value}")

In [6]:
sc = SparkContext(conf=conf)

In [7]:
hadoop_conf = sc._jsc.hadoopConfiguration()

In [8]:
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [9]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [10]:
matches_df = spark.read.parquet("gs://{}/acfon_matches.parquet".format(bucket_name))

In [None]:
# matches_df.count()

In [None]:
# matches_df.printSchema()

In [11]:
matches_df.registerTempTable('Matches')



In [None]:
matches_df.columns

In [None]:
# matches_df.show(n=5,truncate=False)

In [None]:
events_df = spark.read.parquet("gs://{}/match_events.parquet".format(bucket_name))

In [None]:
spark.stop()

In [None]:
# events_df.count()

In [None]:
# events_df.printSchema()

In [None]:
# events_df.filter(events_df.player_id.isNotNull()).show(n=1,truncate=False)

In [None]:
events_df.registerTempTable('events')

In [None]:
# spark.sql("""
#         SELECT DISTINCT shot_outcome 
#         FROM events
#     """).show(truncate=False)

In [None]:
# events_df.filter(events_df.player_id.isNull()).count()

In [None]:
# events_df.filter(events_df.player_id.isNotNull()).count()

#####  define dimensions

##### team dimension

In [None]:
Team_dim = spark.sql("""
            SELECT DISTINCT 
            home_team 
            FROM Matches
    """)

In [None]:
# Team_dim.count()

In [None]:
Team_dim = Team_dim.withColumnRenamed('home_team','team')

In [None]:
Team_dim  = Team_dim.join(events_df.select('team_id','team').distinct(),on='team',how='leftouter')

In [None]:
Team_dim = Team_dim.withColumn('team_id',Team_dim.team_id.cast(IntegerType()))

In [None]:
# Team_dim.count()

In [None]:
# Team_dim.printSchema()

In [None]:
Team_dim = Team_dim.select('team_id', 'team')

In [None]:
# Team_dim.filter(Team_dim.team_id.isNull()).show()

In [None]:
excluded_teams = Team_dim.select(collect_list('team_id')).collect()[0][0]

In [None]:
excluded_teams_ids = spark.sparkContext.broadcast(excluded_teams)

In [None]:
def generate_random_int_not_in_list(team_id,x_list=excluded_teams_ids.value):
    if team_id is None:
        while True:
            random_int = random.randint(1, 1000)
            if random_int not in x_list:
                return int(random_int)
    else:
        return int(team_id)


In [None]:
generate_random_int_udf= udf(generate_random_int_not_in_list,IntegerType())

In [None]:
Team_dim = Team_dim.withColumn("team_id", generate_random_int_udf("team_id"))

In [None]:
# Team_dim.show(truncate=False)

##### Staduims dimension

In [12]:
Staduim = spark.sql("""
            SELECT DISTINCT stadium
            FROM Matches
    """)

In [13]:
Staduim = Staduim.withColumn('Staduim_ID',monotonically_increasing_id())

In [14]:
Staduim = Staduim.select('Staduim_ID', 'stadium')

In [None]:
# Staduim.show(truncate=False)

##### Referee Dimension

In [None]:
Referee = spark.sql("""
                    SELECT DISTINCT referee
                    FROM Matches
            """)

In [None]:
Referee = Referee.withColumn('referee_id', monotonically_increasing_id())

In [None]:
Referee = Referee.select('referee_id', 'referee')

In [None]:
# Referee.show(truncate=False)

##### Manager Dimension

In [None]:
Manager = spark.sql("""
                    SELECT DISTINCT home_managers 
                    FROM Matches
            """)

In [None]:
Manager = Manager.withColumn("manager_id", monotonically_increasing_id())

In [None]:
Manager = Manager.withColumnRenamed('home_managers', 'manager')

In [None]:
# Manager.count()

In [None]:
# Manager.show(truncate=False)

##### Player Dimension

In [None]:
Player_dim = spark.sql("""

            SELECT DISTINCT player_id,player
            FROM events
    """)

In [None]:
# Player_dim.show()

In [None]:
# Player_dim.printSchema()

In [None]:
Player_dim = Player_dim.withColumn('player_id',Player_dim.player_id.cast(IntegerType()))

In [None]:
# print('Count of distinct players in tournment before drop null: {}'.format(Player_dim.count()))

In [None]:
Player_dim = Player_dim.filter(Player_dim.player_id.isNotNull())

In [None]:
# print('Count of distinct players in tournment after drop null: {}'.format(Player_dim.count()))

##### Date Dimension

In [None]:
# def get_max_min_date_from_src():
#     max_min_date_list = spark.sql("""
#                         SELECT 
#                                 MIN(match_date) AS min_match_date,
#                                 MAX(match_date) AS max_match_date
#                         FROM 
#                                 Matches
#             """).collect()
    
    
#     start_date = to_date(lit(max_min_date_list[0]['min_match_date']), "%Y-%m-%d")
#     max_date = to_date(lit(max_min_date_list[0]['max_match_date']), "%Y-%m-%d")
    

    
#     dates = spark.range(start_date.subtract(days=1), max_date.add(days=1), 1).select("id").alias("date_id")
    
#     dates = dates.withColumn("date", date_add(start_date, col("date_id") - 1))

    
#     dates = dates.withColumn("year", year(col("date")))
#     dates = dates.withColumn("month", month(col("date")))
#     dates = dates.withColumn("day_of_week", dayofweek(col("date")))
#     dates = dates.withColumn("id", monotonically_increasing_id())                        
#     return dates

In [None]:
# get_max_min_date_from_src().show()

##### Event fact table

In [None]:
events_df = spark.read.parquet("gs://cloud_bucket_dbt/match_events.parquet")

In [None]:
# events_df.printSchema()

In [None]:
Events_fact = events_df.filter(events_df.player_id.isNotNull())
Events_fact = Events_fact.drop(Events_fact.player)

In [None]:
### get the team ids from Team dimension 
Events_fact = Events_fact.drop('team_id')
Events_fact = Events_fact.join(Team_dim, on='Team', how='leftouter')
Events_fact = Events_fact.drop('team')
Events_fact = Events_fact.withColumnRenamed('team_id', 'team_event_id')

In [None]:
# Events_fact.printSchema()

In [None]:
### get possession team ids from team dimension 
Events_fact = Events_fact.join(Team_dim, col("possession_team") == Team_dim.team, how='leftouter') \
                         .drop(Team_dim.team) \
                         .drop("possession_team") \
                         .withColumnRenamed("team_id","possession_team_id")

In [None]:
Events_fact = Events_fact.withColumnRenamed('team_event_id', 'team_id')

In [None]:
Events_fact = Events_fact.drop('source')

In [None]:
## Hash-based Surrogate Key for events fact table
hash_cols = ["player_id","match_id","team_id","type","timestamp"]

In [None]:
def create_hash(player_id, match_id, team_id, event, timestamp):
  # Combine the values into a string (you can customize this logic)
  return f"{player_id}_{match_id}_{team_id}_{event}_{timestamp}"

create_hash_udf = udf(create_hash, StringType())

Events_fact = Events_fact.withColumn("event_id", md5(create_hash_udf(*hash_cols)))

In [None]:
Events_fact.createOrReplaceTempView('events_fact')

In [None]:
# spark.sql("""SELECT *
#     FROM (
#             SELECT 
#                 row_number() OVER(PARTITION BY EVENT_ID ORDER BY EVENT_ID) AS ROW_NUM,
#                 *
#             FROM EVENTS_FACT)L1
#     WHERE L1.ROW_NUM > 1""").show()

In [None]:
## DROP DUPLICATE EVENTS 
Events_fact = spark.sql("""
    SELECT *
    FROM (
            SELECT 
                row_number() OVER(PARTITION BY EVENT_ID ORDER BY EVENT_ID) AS ROW_NUM,
                *
            FROM EVENTS_FACT)L1
    WHERE L1.ROW_NUM = 1
        
    """)

In [None]:
# Events_fact.count()

In [None]:
Events_fact = Events_fact.withColumnRenamed('type', 'event_type') \
                         .withColumnRenamed('timestamp', 'event_timestamp') \
                         .withColumnRenamed('minute', 'event_minute') \
                         .withColumnRenamed('location', 'event_location')

In [None]:
# Events_fact.printSchema()

In [None]:
Events_fact = Events_fact.select(
                
            'event_id',
            'player_id',
            'match_id',
            'team_id',
            'event_type',
            'event_timestamp',
            'event_minute',
            'event_location',
            'play_pattern', 
            'position',
            'pass_outcome',
            'pass_cross',
            'pass_goal_assist',
            'pass_shot_assist',
            'foul_committed_type',
            'foul_committed_card',
            'foul_committed_offensive',
            'foul_won_defensive',
            'foul_committed_penalty',
            'foul_won_penalty',
            'shot_outcome',
            'interception_outcome',
            'possession',
            'possession_team_id'
            
    )

In [None]:
Events_fact = Events_fact.withColumn("zone",
  when(col("event_location")[0] >= 80, "Attack")
  .when((col("event_location")[0] >= 40) & (col("event_location")[0] < 80), "Midfield")
  .otherwise("Defense"))

In [None]:
# Events_fact.show(n=5,truncate=False)

In [None]:
# Events_fact.printSchema()

In [None]:
Events_fact = Events_fact.withColumn('player_id', Events_fact.player_id.cast(IntegerType())) \
                      .withColumn('match_id', Events_fact.match_id.cast(IntegerType())) \
                      .withColumn('event_timestamp', Events_fact.event_timestamp.cast(TimestampType())) \
                      .withColumn('event_location', Events_fact.event_location.cast(StringType()))

In [None]:
Events_fact.write.mode("overwrite").format("bigquery").option("table", "trips_data_all.events_fact_spark_generated").save()

In [17]:
Staduim.write \
        .mode("overwrite") \
        .format("bigquery") \
        .option('parentProject', "data-engineering-afcon-2023")\
        .option("table", "data-engineering-afcon-2023.afcon_events.stad_dim").save()

In [18]:
spark.stop()