In [None]:
import snowflake.snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark import Window
from snowflake.snowpark import functions as F   
from snowflake.snowpark.functions import udf, udtf
from snowflake.snowpark.types import IntegerType, FloatType, StringType, StructField, StructType, DateType
    
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')


In [None]:

from snowflake.snowpark.context import get_active_session
session = get_active_session()

# add version tracking
app_tag = {
    "origin": "sf_sit",
    "name": "hol_sport_predict",
    "version": '{major: 1, minor: 0}'
}

session.query_tag = app_tag

In [None]:
user_name = session.sql('select current_user()').collect()[0][0]


In [None]:
# Create a DataFrame by filtering and sorting match history from a database table
# Start by accessing the 'results' table from the session
# 
# Filter the data to include only records with a date on or after January 1, 1994,
# and exclude records where the 'tournament' column is 'Friendly'
#
# Finally, sort the resulting DataFrame by the 'id' column

df_match_history = (
    session.table("results")
    .filter(
        (F.col("date") >= "1994-01-01") &
        (F.col("tournament") != "Friendly")
    )
    .sort(F.col("id"))
)

# Create another DataFrame by sorting the 'rankings' table from the session
# Sort the data by the 'rank_date' column in ascending order

df_rank = session.table("rankings").sort(F.col("rank_date"), ascending=True)

In [None]:
df_match_history.limit(10)


In [None]:

# we join the history with rank - this will explode the table so we'll have all fixtures and all ranks 
# we'll want to get the fixture record with the most recent rank date relevant to the match date
df_match_rank_home = df_match_history.join(
    df_rank,
    ((df_rank['rank_date'] <= df_match_history['date']) &
     (df_match_history['home_team'] == df_rank['country_full'])),
    'left'
).sort(['id', df_rank['rank_date'].desc()])

window_spec = Window.partition_by('id').order_by(df_rank['rank_date'].desc())
df_match_rank_home = df_match_rank_home.select(
    F.row_number().over(window_spec).alias('row_number'),
    'id',
    'rank',
    'rank_date'
).filter(F.col('row_number') == 1)

df_match_rank_home = df_match_rank_home.withColumnRenamed('id', 'home_id') \
.withColumnRenamed('rank', 'home_team_rank') \
.drop('row_number', 'rank_date')

# and next we'll want to do the same thing for the away team as well...
df_match_rank_away = df_match_history.join(
    df_rank,
    (
        (df_rank["rank_date"] <= df_match_history["date"])
        & (df_match_history["away_team"] == df_rank["country_full"])
    ),
    "left",
).sort(["id", F.desc("rank_date")])

df_match_rank_away = (
    df_match_rank_away.select(
        F.row_number()
        .over(Window.partitionBy("id").orderBy(F.desc("rank_date")))
        .alias("row_number"),
        "id",
        "rank",
        "rank_date",
    )
    .filter(F.col("row_number") == 1)
    .drop("row_number", "rank_date")
)

# Rename columns to reflect they are for the away team
df_match_rank_away = df_match_rank_away.withColumnRenamed(
    "id", "away_id"
).withColumnRenamed("rank", "away_team_rank")

# now we join these 2 dataframes together
df_match_rank = (
    df_match_rank_home.join(
        df_match_rank_away,
        df_match_rank_home["home_id"] == df_match_rank_away["away_id"],
        "left",
    )
    .select(
        df_match_rank_home["home_id"].alias("id"),
        df_match_rank_home["home_team_rank"],
        df_match_rank_away["away_team_rank"]
    )
    .sort("id")
)

# get rid of the rows with null values
df_match_rank = df_match_rank.filter(
    df_match_rank.col('home_team_rank').is_not_null() & df_match_rank.col('away_team_rank').is_not_null()
)

df_match_rank = df_match_rank.na.drop()

df_match_rank.limit(10)

In [None]:
# UDTF to calculate the trailing performance of a team, with a given set of games
# It will return:
# - no. of wins
# - no. of losses
# - goal difference (goals scored - goals conceded)

output_struct = StructType([
    StructField("ttl_wins", FloatType()),
    StructField("ttl_losses", FloatType()),
    StructField("ttl_draws", FloatType()),
    StructField("goal_diff",FloatType())
])

@udtf(output_schema = output_struct,
    input_types = [IntegerType(),DateType(), FloatType(), FloatType(), IntegerType()],
    name = "calc_performance", 
    session = session,
    is_permanent=True, 
    stage_location="@python_load",
    packages=["pandas"], 
    replace=True)

class calc_ttl_performance:
    def __init__(self):
        self.fixture_id = []
        self.date = []
        self.goals_for = []
        self.goals_against = []
        self.is_home = []

    def process(self, fixture_id, date, goals_for, goals_against, is_home):
        self.fixture_id.append(fixture_id)
        self.date.append(date)
        self.goals_for.append(goals_for)
        self.goals_against.append(goals_against)
        self.is_home.append(is_home)
    
    def end_partition(self):
        df = pd.DataFrame(zip(self.fixture_id, self.date, self.goals_for, self.goals_against, self.is_home), 
                            columns=['fixture_id', 'date', 'home_team_goals', 'away_team_goals', 'is_home'])

        df['goals_scored'] = np.where(df['is_home'] == 1, df['home_team_goals'], df['away_team_goals'])  
        
        df['goals_conceded'] = np.where(df['is_home'] == 1, df['away_team_goals'], df['home_team_goals']) 

        df['wins'] = np.where((df['home_team_goals'] > df['away_team_goals']) & (df['is_home'] == 1), 1, 
                     np.where((df['home_team_goals'] < df['away_team_goals']) & (df['is_home'] != 1), 1, 0))  
        
        df['draws'] = np.where(df['home_team_goals'] == df['away_team_goals'], 1, 0)  

        df['losses'] = np.where((df['home_team_goals'] < df['away_team_goals']) & (df['is_home'] == 1), 1, 
                       np.where((df['home_team_goals'] > df['away_team_goals']) & (df['is_home'] != 1), 1, 0))  


        ttl_wins = np.sum(df['wins'])
        ttl_losses = np.sum(df['losses'])
        ttl_draws = np.sum(df['draws'])
        goal_diff = np.sum(df['goals_scored']) - np.sum(df['goals_conceded'])

        if np.isnan(goal_diff):
            goal_diff = 0

        yield (ttl_wins, ttl_losses, ttl_draws, goal_diff)

In [None]:
df_match_history = (
    session.table("results")
    .filter(
        (F.col("date") >= "1994-01-01") &
        (F.col("tournament") != "Friendly")
    )
    .sort(F.col("id"), ascending=True)
)

df_history = (
    session.table("results")
    .sort(F.col("id"), ascending=True)
)

In [None]:
# Starting with the home team
# For each match we want to get all the prior matches
# Then in the next step we will limit that to the prior 5

df_home_team = (
    df_match_history.join(
        df_history,
        (
            (
                (df_match_history["home_team"] == df_history["home_team"]) | (df_match_history["home_team"] == df_history["away_team"])
            )
            & (df_history["date"] < df_match_history["date"])
        ),
        "left",
    )
    .select(
        df_match_history["id"].alias("id"),
        df_match_history["date"].alias("match_date"),
        df_match_history["home_team"].alias("team_1"),
        df_match_history["away_team"].alias("team_2"),
        df_match_history["neutral"].alias("neutral"),
        df_history["home_team"].alias("home_team"),
        df_history["away_team"].alias("away_team"),
        df_history["date"].cast(DateType()).alias("history_date"),
        df_history["home_team_score"].alias("home_team_score"),
        df_history["away_team_score"].alias("away_team_score"),
    )
    .with_column(
        "is_home",
        F.when(df_match_history["home_team"] == df_history["home_team"], 1).otherwise(0),
    )
    .with_column(
        "row_number",
        F.row_number().over(Window.partition_by("id").order_by(F.col("history_date").desc())),
    )
    .filter(F.col("row_number") <= 15)
    .drop("row_number")
)

# we use our UDTF to pass through the games partitioned by fixture id - there should be 5 games per partition

perf_udtf = F.table_function("calc_performance")

df_home_team = df_home_team.join_table_function(
    perf_udtf(
        df_home_team.col('id').cast(IntegerType()),
        df_home_team.col('history_date').cast(DateType()),
        df_home_team.col('home_team_score').cast(FloatType()),
        df_home_team.col('away_team_score').cast(FloatType()),
        df_home_team.col('is_home').cast(IntegerType())).over(partition_by='id',order_by=['history_date'])
    )\
    .select(
        F.col('id').alias('home_fixture_id'),
        F.col('goal_diff').alias('home_goal_diff'),
        F.col('ttl_wins').alias('home_ttl_wins'),
        F.col('ttl_losses').alias('home_ttl_losses')
    )

df_home_team.limit(10)

In [None]:
# Now the same for the away team
# Again, we use our UDTF to pass through the games partitioned by fixture id - there should be 5 games per partition
df_away_team = (
    df_match_history.join(
        df_history,
        (
            (
                (df_match_history["away_team"] == df_history["home_team"])
                | (df_match_history["away_team"] == df_history["away_team"])
            )
            & (df_history["date"] < df_match_history["date"])
        ),
        "left",
    )
    .select(
        df_match_history["id"].alias("id"),
        df_match_history["date"].alias("match_date"),
        df_match_history["home_team"].alias("team_1"),
        df_match_history["away_team"].alias("team_2"),
        df_match_history["neutral"].alias("neutral"),
        df_history["home_team"].alias("home_team"),
        df_history["away_team"].alias("away_team"),
        df_history["date"].cast(DateType()).alias("history_date"),
        df_history["home_team_score"].alias("home_team_score"),
        df_history["away_team_score"].alias("away_team_score"),
    )
    .with_column(
        "is_home",
        F.when(df_match_history["away_team"] == df_history["home_team"], 1).otherwise(
            0
        ),
    )
    .with_column(
        "row_number",
        F.row_number().over(Window.partition_by("id").order_by(F.col("history_date").desc())),
    )
    .filter(F.col("row_number") <= 15)
    .drop("row_number")
)

perf_udtf = F.table_function("calc_performance")

df_away_team = df_away_team.join_table_function(
    perf_udtf(
        df_away_team.col('id').cast(IntegerType()),
        df_away_team.col('history_date').cast(DateType()),
        df_away_team.col('home_team_score').cast(FloatType()),
        df_away_team.col('away_team_score').cast(FloatType()),
        df_away_team.col('is_home').cast(IntegerType())).over(partition_by='id',order_by=['history_date'])
    ) \
    .select(
        F.col('id').alias('away_fixture_id'),
        F.col('goal_diff').alias('away_goal_diff'),
        F.col('ttl_wins').alias('away_ttl_wins'),
        F.col('ttl_losses').alias('away_ttl_losses')    
    )

df_away_team.limit(10)

In [None]:
# now we join the home and away performance into its own dataframe - these can be joined later on and used as features.
df_team_perf = df_home_team \
    .join(
        df_away_team,
        (
            df_away_team.col('away_fixture_id') == df_home_team.col('home_fixture_id')
        ),
        'left'
    ).drop(
        'away_fixture_id'
    ).rename(
        F.col("home_fixture_id"), 'id'
    )

df_team_perf.limit(10)

In [None]:
# UDF to bin the outcome into:
# home win = 1
# away win or draw = 0

@udf(name='convert_score',  
     is_permanent=True,
     replace=True,
     session=session,
     stage_location='python_load')

def convert_score(x:int, y:int) -> int: 
    if x > y : # home win
        return 1

    return 0
     

In [None]:
df_match_history_full = session.table('results') \
    .filter(
        (F.col("date") >= "1994-01-01") &
        (F.col("tournament") != "Friendly")
    ).sort(F.col('id'), ascending=False
)
     

In [None]:
df_match_history_full = session.table('results') \
    .filter(
        (F.col("date") >= "1994-01-01") &
        (F.col("tournament") != "Friendly")
    ).sort(F.col('id'), ascending=False
)

df_match_history_full = df_match_history_full.join(
    df_match_rank,
    (
        df_match_history_full.col('id') == df_match_rank.col('id')
    ),
    'left'
).join(
    df_team_perf,
    (
        df_match_history_full.col('id') == df_team_perf.col('id')
    ),
    'left'
).select(
    df_match_history_full.col('id').alias('id'),
    df_match_history_full.col('neutral'),
    df_team_perf.col('home_goal_diff').alias('team_1_goal_diff'),
    df_team_perf.col('home_ttl_wins').alias('team_1_ttl_wins'),
    df_team_perf.col('home_ttl_losses').alias('team_1_ttl_losses'),
    df_team_perf.col('away_goal_diff').alias('team_2_goal_diff'),
    df_team_perf.col('away_ttl_wins').alias('team_2_ttl_wins'),
    df_team_perf.col('away_ttl_losses').alias('team_2_ttl_losses'),
    (df_match_rank.col('home_team_rank') - df_match_rank.col('away_team_rank')).cast(IntegerType()).alias('team_1_vs_team_2_rank'),
    F.call_function(
        'convert_score',
        df_match_history_full.col('home_team_score').cast(IntegerType()),
        df_match_history_full.col('away_team_score').cast(IntegerType())
        ).alias('game_outcome')
).sort(
    F.col('id'), ascending=True
).na.drop()

# save a specific version for this user
df_match_history_full.write.save_as_table(f'final_data_{user_name}', mode='overwrite')

session.table(f'final_data_{user_name}').limit(10)
     