In [None]:
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import when, col, regexp_extract, expr, trim, lit, to_timestamp, collect_list, struct, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, TimestampType
import json
import pandas as pd
from pyspark.sql.window import Window

In [None]:
def cleanse_df(df):
    """Cleanse Spark NHL Raw DataFrame""" 
    # Change Montréal Canadiens name without accent
    df = df.withColumn('Team Name', 
                       when(col('Team Name') == 'Montréal Canadiens', 'Montreal Canadiens')
                       .otherwise(col('Team Name')))
    
    # Change "St. Louis Blues" to "St Louis Blues"
    df = df.withColumn('Team Name', 
                       when(col('Team Name') == 'St. Louis Blues', 'St Louis Blues')
                       .otherwise(col('Team Name')))
    
    # Drop Columns 'GF/GP' and 'GA/GP' since they are the same as 'GF' and 'GA'
    df = df.drop('GF/GP').drop('GA/GP')
    
    # Replace "--" with 100 in PK and Net PK columns
    df = df.withColumn('PK%', when(col('PK%') == '--', 100).otherwise(col('PK%')))
    df = df.withColumn('Net PK%', when(col('Net PK%') == '--', 100).otherwise(col('Net PK%')))
    
    # Replace "--" with 0 in PP and Net PP columns
    df = df.withColumn('PP%', when(col('PP%') == '--', 0).otherwise(col('PP%')))
    df = df.withColumn('Net PP%', when(col('Net PP%') == '--', 0).otherwise(col('Net PP%')))
    
    # Replace "N/A" with NaNs
    df = df.replace('N/A', None)
    
    # Convert selected columns to numeric type
    numeric_columns = ['GP', 'W', 'L', 'P', 'P%', 'RW', 'ROW', 'SO_win', 'GF', 'GA', 'PP%', 'PK%',
                       'Net PP%', 'Net PK%', 'Shots/GP', 'SA/GP', 'FOW%']
    for col_name in numeric_columns:
        df = df.withColumn(col_name, df[col_name].cast(IntegerType()))
    
    # Calculate "Save %" and "Shooting &"
    df = df.withColumn('Save %', (col('SA/GP') - col('GA')) / col('SA/GP'))
    df = df.withColumn('Shooting %', col('GF') / col('Shots/GP'))
    
    # Calculate Corsi %
    df = df.withColumn('Corsi%', (col('Shots/GP') / (col('Shots/GP') + col('SA/GP'))) * 100)
    
    # Extract the abbreviation for the opposing team
    opponent_abbr = regexp_extract(col('Game Date'), r'@ (\w{3})|vs (\w{3})', 1)
    
    # Assign the extracted values to the 'Against Team' column
    df = df.withColumn('Against Team', 
                       when(opponent_abbr != '', opponent_abbr)
                       .otherwise(regexp_extract(col('Game Date'), r'@ (\w{3})|vs (\w{3})', 2)))
    
    # JDBC connection properties for PostgreSQL
    db_properties = {
        "user": "User_1",
        "password": "postgres",
        "driver": "org.postgresql.Driver"
    }

    # JDBC URL for PostgreSQL
    jdbc_url = "jdbc:postgresql://192.168.1.246:5432/nhl_master_data"

    # Create teamname_abbreviations_df
    teamname_abbreviations_df = spark.read.jdbc(url=jdbc_url, table="teamname_abbreviations", properties=db_properties)
    
    # Joining the DataFrames to map abbreviations to full team names
    df = df.join(
        teamname_abbreviations_df,
        col("Against Team") == col("abbreviation"),
        "left"
    ).drop(teamname_abbreviations_df["abbreviation"])
    
    # Replacing the "Against Team" column with the full team names
    df = df.withColumn("Against Team", col("Full Name"))
    
    # Selecting the necessary columns and showing the DataFrame
    df = df.select("Team Name", "Game Date", "GP", "W", "L", "T", "OT", "P", "P%", "RW", "ROW", "SO_win", "GF", "GA", "PP%", "PK%", "Net PP%", "Net PK%", "Shots/GP", "SA/GP", "FOW%", "Season", "Type", "Save %", "Shooting %", "Corsi%", "Against Team")
    
    # Define a function to determine home and away teams
    def determine_home_away(Game_Date, Team_Name, Against_Team):
        if "@" in Game_Date:
            return Against_Team, Team_Name
        elif "vs" in Game_Date:
            return Team_Name, Against_Team
    
    # Apply the function to create new columns
    df = df.withColumn('Home Team', when(col('Game Date').contains('@'), col('Against Team')).otherwise(col('Team Name')))
    df = df.withColumn('Away Team', when(col('Game Date').contains('@'), col('Team Name')).otherwise(col('Against Team')))
    
    # Apply the regexp_extract function to the 'Game Date' column
    df = df.withColumn('Game Date', regexp_extract(col('Game Date'), r'\d{4}/\d{2}/\d{2}', 0))

    # Convert 'Game Date' to TimestampType
    df = df.withColumn("Game Date", to_timestamp("Game Date", "yyyy/MM/dd"))
    
    # Create a new column 'Home Team W' initialized with 0
    df = df.withColumn('Home Team W', lit(0))
    
    # Iterate over each row in the DataFrame
    df = df.withColumn('Home Team W', when((col('Team Name') == col('Home Team')) & (col('W') == 1), 1)
                       .when((col('Team Name') == col('Away Team')) & (col('W') == 0), 1)
                       .otherwise(0))
    
    return df

In [None]:
def calculate_n_days_rolling_stats(group_df, n_values):
    columns_to_avg = ['P%', 'Corsi%']
    
    for n in n_values:
        for column in columns_to_avg:
            # Define the window frame
            window = Window.partitionBy('Team Name').orderBy(col('Game Date')).rowsBetween(-n, -1)
            
            # Apply the window function to calculate rolling average
            rolling_avg_column = avg(column).over(window)
            
            # Apply the window function to the DataFrame
            group_df = group_df.withColumn(f'{column}_NDays_Rolling_Avg_{n}', rolling_avg_column)
    
    return group_df

def cast_numeric_columns(row, schema):
    """Cast numeric columns to appropriate types"""
    updated_values = []
    for field in schema.fields:
        value = row[field.name]
        if isinstance(value, int) and isinstance(field.dataType, DoubleType):
            value = float(value)
        updated_values.append(value)
    return Row(*updated_values)

def update_teams_tables(grouped, schema, db_string):
    """Update Teams' Tables in PostgreSQL with New Data and Rolling Averages/Sums"""

    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("Update Teams Tables") \
        .getOrCreate()
    
    for row in grouped.collect():  # collect() is used to convert GroupedData to a DataFrame
        team = row['Team Name']
        group_df = row['group_df']
        
        # Check if group_df is not empty and is a list of rows
        if group_df:
            # Cast numeric columns to appropriate types
            group_df = [cast_numeric_columns(row, schema) for row in group_df]
            
            # Explode the nested DataFrame
            group_df = spark.createDataFrame(group_df, schema)

            # Replace zeros with float(0) in columns of DoubleType
            for field in schema.fields:
                if field.dataType == DoubleType():
                    group_df = group_df.withColumn(field.name, \
                        when(col(field.name) == 0, float(0)).otherwise(col(field.name)))

            # Add the following columns to group_df DataFrame
            group_df = group_df.withColumn("P%_NDays_Rolling_Avg_35", lit(float(0)).cast(DoubleType()))
            group_df = group_df.withColumn("Corsi%_NDays_Rolling_Avg_35", lit(float(0)).cast(DoubleType()))
            group_df = group_df.withColumn("P%_NDays_Rolling_Avg_40", lit(float(0)).cast(DoubleType()))
            group_df = group_df.withColumn("Corsi%_NDays_Rolling_Avg_40", lit(float(0)).cast(DoubleType()))
            group_df = group_df.withColumn("P%_NDays_Rolling_Avg_50", lit(float(0)).cast(DoubleType()))
            group_df = group_df.withColumn("Corsi%_NDays_Rolling_Avg_50", lit(float(0)).cast(DoubleType()))

            # Read existing data from the team's table
            existing_data = spark.read.format("jdbc").options(
                url=db_string,
                dbtable=f"team_{team.replace(' ', '_').lower()}",
                user="User_1",
                password="postgres").load()
            
            # Union existing data with new data
            merged_data = existing_data.union(group_df)

            # Order the DataFrame by the timestamp column in ascending order
            merged_data = merged_data.orderBy(col("Game Date").asc())

            # Calculate Merged Data
            merged_data = calculate_n_days_rolling_stats(merged_data, [35,40,50])
            
            # Filter merged_data to include only rows where 'Game Date' is in group_df
            group_df_dates = [row['Game Date'] for row in group_df.select('Game Date').distinct().collect()]
            merged_data = merged_data.filter(col('Game Date').isin(group_df_dates))

            # Write the DataFrame to the database, replacing the table
            db_properties = {
                "user": "User_1", 
                "password": "postgres",
                "driver": "org.postgresql.Driver"
            }
            
            db_url = "jdbc:postgresql://192.168.1.246:5432/nhl_team_data"
            
            merged_data.write.jdbc(url=db_url, table=f"team_{team.replace(' ', '_').lower()}", mode="append", properties=db_properties)

In [None]:
def create_model_data(df):
    from pyspark.sql.functions import col as spark_col
    
    # Merge home and away teams
    home_teams = df.filter(df['Team Name'] == df['Home Team'])
    away_teams = df.filter(df['Team Name'] == df['Away Team'])
    
    # Select the "Team Name" column and get distinct values
    unique_team_names = df.select("Team Name").distinct()
    
    # Collect the distinct team table names as a list with the desired format
    table_names = ['team_' + row["Team Name"].replace(" ", "_").lower().replace(".", "") for row in unique_team_names.collect()]
    
    # Define the database connection properties
    db_string = "jdbc:postgresql://192.168.1.246:5432/nhl_team_data"
    
    # Read each table and union them
    combined_team_df = None
    for table_name in table_names:
        team_df = spark.read.format("jdbc") \
            .option("url", db_string) \
            .option("dbtable", table_name) \
            .option("user", "User_1") \
            .option("password", "postgres") \
            .load()
        if combined_team_df is None:
            combined_team_df = team_df
        else:
            combined_team_df = combined_team_df.union(team_df)
            
    # Select columns from combined_team_df that contain "Rolling" in the column header
    selected_columns = ['Team Name', 'Game Date'] + [col for col in combined_team_df.columns if 'Rolling' in col]
    combined_team_df = combined_team_df.select(*selected_columns)
    
    # Merge Home and Away Teams with Combined Rolling Dataframe Based on 'Team Name' and 'Game Date'
    home_merged_df = home_teams.join(combined_team_df, on=['Team Name', 'Game Date'], how='left')
    away_merged_df = away_teams.join(combined_team_df, on=['Team Name', 'Game Date'], how='left')
    
    # Remove Unnecessary Columns
    home_selected_columns = ['Game Date', 'Home Team', 'Away Team', 'Home Team W'] + [col for col in home_merged_df.columns if 'Rolling' in col]
    home_merged_df = home_merged_df.select(*home_selected_columns)
    
    away_selected_columns = ['Game Date', 'Home Team', 'Away Team', 'Home Team W'] + [col for col in away_merged_df.columns if 'Rolling' in col]
    away_merged_df = away_merged_df.select(*away_selected_columns)
    
    # For Home Teams iterate over column names and add "Home_" to columns containing "Rolling" in the header
    for column in home_merged_df.columns:
        if 'Rolling' in column:
            home_merged_df = home_merged_df.withColumnRenamed(column, 'Home_' + column)
    
    # For Away Teams iterate over column names and add "Away_" to columns containing "Rolling" in the header
    for column in away_merged_df.columns:
        if 'Rolling' in column:
            away_merged_df = away_merged_df.withColumnRenamed(column, 'Away_' + column)
    
    # Drop the "Team Name" column from home_merged_df and away_merged_df
    home_merged_df = home_merged_df.drop("Team Name")
    away_merged_df = away_merged_df.drop("Team Name")
    
    # Join the Home and Away Rolling DataFrames
    join_columns = ["Game Date", "Home Team", "Away Team", "Home Team W"]
    rolling_df = home_merged_df.join(away_merged_df, on=join_columns, how="inner")
    
    # Define the list of rolling column names for Home and Away
    home_rolling_columns = ['Home_P%_NDays_Rolling_Avg_35', 'Home_Corsi%_NDays_Rolling_Avg_35',
                            'Home_P%_NDays_Rolling_Avg_40', 'Home_Corsi%_NDays_Rolling_Avg_40',
                            'Home_P%_NDays_Rolling_Avg_50', 'Home_Corsi%_NDays_Rolling_Avg_50']
    
    away_rolling_columns = ['Away_P%_NDays_Rolling_Avg_35', 'Away_Corsi%_NDays_Rolling_Avg_35',
                            'Away_P%_NDays_Rolling_Avg_40', 'Away_Corsi%_NDays_Rolling_Avg_40',
                            'Away_P%_NDays_Rolling_Avg_50', 'Away_Corsi%_NDays_Rolling_Avg_50']
    
    # Create difference columns for each pair of Home and Away rolling columns
    for home_col, away_col in zip(home_rolling_columns, away_rolling_columns):
        # Generate the name for the difference column
        difference_column_name = home_col.replace("Home_", "Difference_")
        
        # Subtract the Away rolling column from the Home rolling column and create the difference column
        rolling_df = rolling_df.withColumn(difference_column_name, spark_col(home_col) - spark_col(away_col))
    
    # Select Final Columns 
    selected_columns = ['Game Date', 'Home Team', 'Away Team', 'Home Team W'] + [col for col in rolling_df.columns if 'Difference' in col]
    rolling_df = rolling_df.select(selected_columns)
    
    # Convert 'Home Team W' column to 'bigint'
    rolling_df = rolling_df.withColumn('Home Team W', spark_col('Home Team W').cast('bigint'))

    rolling_df.show()

    # Save the Model Data
    db_properties = {
        "user": "User_1", 
        "password": "postgres",
        "driver": "org.postgresql.Driver"
    }
    
    db_url = "jdbc:postgresql://192.168.1.246:5432/nhl_master_data"
    
    rolling_df.write.jdbc(url=db_url, table="update_model_data", mode="append", properties=db_properties)

In [None]:
# Process Streamed Kafka Data
def process_kafka_data(batch_df, batch_id):
    if not batch_df.isEmpty():
        values = [row.value for row in batch_df.collect()]
        for value in values:
            # Parse JSON string into dictionary
            data_dict = json.loads(value)
            
            # Create DataFrame from dictionary
            df = pd.DataFrame.from_dict(data_dict)
            df = spark.createDataFrame(df)
            df.show()

            # Define schema for group_df
            schema = StructType([
                StructField("Team Name", StringType(), nullable=True),
                StructField("Game Date", TimestampType(), nullable=True),
                StructField("GP", LongType(), nullable=True),
                StructField("W", LongType(), nullable=True),
                StructField("L", LongType(), nullable=True),
                StructField("T", StringType(), nullable=True),
                StructField("OT", StringType(), nullable=True),
                StructField("P", LongType(), nullable=True),
                StructField("P%", DoubleType(), nullable=True),
                StructField("RW", LongType(), nullable=True),
                StructField("ROW", LongType(), nullable=True),
                StructField("SO_win", LongType(), nullable=True),
                StructField("GF", LongType(), nullable=True),
                StructField("GA", LongType(), nullable=True),
                StructField("PP%", DoubleType(), nullable=True),
                StructField("PK%", DoubleType(), nullable=True),
                StructField("Net PP%", DoubleType(), nullable=True),
                StructField("Net PK%", DoubleType(), nullable=True),
                StructField("Shots/GP", DoubleType(), nullable=True),
                StructField("SA/GP", DoubleType(), nullable=True),
                StructField("FOW%", DoubleType(), nullable=True),
                StructField("Season", StringType(), nullable=True),
                StructField("Type", StringType(), nullable=True),
                StructField("Save %", DoubleType(), nullable=True),
                StructField("Shooting %", DoubleType(), nullable=True),
                StructField("Corsi%", DoubleType(), nullable=True),
                StructField("Against Team", StringType(), nullable=True),
                StructField("Home Team", StringType(), nullable=True),
                StructField("Away Team", StringType(), nullable=True),
                StructField("Home Team W", LongType(), nullable=True)
            ])
            
            # Cleanse Data From Kafka
            df = cleanse_df(df)
            
            # Group data by 'Team Name' and aggregate using collect_list
            grouped = df.groupBy('Team Name').agg(collect_list(struct(df.columns)).alias('group_df'))
            
            # Define the database connection properties
            db_string = "jdbc:postgresql://192.168.1.246:5432/nhl_team_data"
            
            # Update teams' tables
            update_teams_tables(grouped, schema, db_string)
            
            # Create model data
            create_model_data(df)

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Kafka_NHL_tream") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# Set Kafka configurations
kafka_bootstrap_servers = "192.168.1.246:9092"
kafka_topic = "spark_test"

# Read from Kafka topic as a streaming DataFrame
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load()

# Convert the value column from binary to string
df = df.selectExpr("CAST(value AS STRING)")

# Process Kafka Data
query = df \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(process_kafka_data) \
    .start()

# Wait for the termination of the query
query.awaitTermination()