# Creation of Stored Procedures & UDTFs

The intent behind this notebookis to create re-usable code for the prediction phase of the HOL.

We'll be re-using our feature engineering code, as well as creating some procedures and functions that pre and post process data for each round of the tournamment.

In [None]:
import snowflake.snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark import Window
from snowflake.snowpark import functions as F   
from snowflake.snowpark.functions import udf, udtf
from snowflake.snowpark.types import IntegerType, FloatType, StringType, StructField, StructType, DateType
    
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import streamlit as st

import warnings
warnings.filterwarnings('ignore')

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
# FUNCTION used in the event of a draw in the knockout
# tried building a model but there's so much variability
# decided to use a sigmoid function with the advantage to the higher ranked team

def penalty_shootout(team_1, team_2, rank_delta) -> str:

    boundary = 25
    
    adjustment_factor = max(min(rank_delta, boundary), -boundary) * 0.05 
    sigmoid = 1 / (1 + np.exp(-adjustment_factor))
    threshold = 50 + (sigmoid - 0.5) * 100

    random_number = np.random.randint(1, 101)
    if random_number >= threshold:
        return team_1
    else:
        return team_2

In [None]:
# UDTF to combine the 2 predictions into 1 and return number of points
# this is used in the group stage when there is a points based system to determine qualification

output_struct = StructType([
    StructField("match_id", IntegerType()),
    StructField("team_name", StringType()),
    StructField("points", IntegerType())
])

@udtf(output_schema = output_struct,
    input_types = [FloatType(),StringType(), FloatType()],
    name = "calc_game_outcome", 
    session = session,
    is_permanent=True, 
    stage_location="@python_load",
    packages=["pandas"], 
    replace=True)

class calc_game_outcome:
    def __init__(self):
        self.fixture_id = []
        self.team_name = []
        self.outcome = []

    def process(self, fixture_id, team_name, outcome):
        self.fixture_id.append(fixture_id)
        self.team_name.append(team_name)
        self.outcome.append(outcome)
    
    def end_partition(self):
        df = pd.DataFrame(zip(self.fixture_id, self.team_name, self.outcome), columns=['ID', 'TEAM_1', 'OUTCOME'])

        df_return = pd.DataFrame(columns=['match_id','name','points'])

        # Team 1 win, Team 2 loss - means Team 1 wins
        if (df.iloc[0]['OUTCOME'] == 1) & (df.iloc[1]['OUTCOME'] == 0):
            
            row = [df.iloc[0]['ID'],df.iloc[0]['TEAM_1'],3]
            df_return.loc[len(df_return)] = row

            row = [df.iloc[1]['ID'],df.iloc[1]['TEAM_1'],0]
            df_return.loc[len(df_return)] = row

        # Team 1 win, Team 2 win - means draws
        if (df.iloc[0]['OUTCOME'] == 1) & (df.iloc[1]['OUTCOME'] == 1):
            
            row = [df.iloc[0]['ID'],df.iloc[0]['TEAM_1'],1]
            df_return.loc[len(df_return)] = row

            row = [df.iloc[1]['ID'],df.iloc[1]['TEAM_1'],1]
            df_return.loc[len(df_return)] = row

        # Team 1 loss, Team 1 loss - means draws
        if (df.iloc[0]['OUTCOME'] == 0) & (df.iloc[1]['OUTCOME'] == 0):
            
            row = [df.iloc[0]['ID'],df.iloc[0]['TEAM_1'],1]
            df_return.loc[len(df_return)] = row

            row = [df.iloc[1]['ID'],df.iloc[1]['TEAM_1'],1]
            df_return.loc[len(df_return)] = row

        # Team 1 loss, Team 2 win - means Team 2 win
        if (df.iloc[0]['OUTCOME'] == 0) & (df.iloc[1]['OUTCOME'] == 1):
            
            row = [df.iloc[0]['ID'],df.iloc[0]['TEAM_1'],0]
            df_return.loc[len(df_return)] = row

            row = [df.iloc[1]['ID'],df.iloc[1]['TEAM_1'],3]
            df_return.loc[len(df_return)] = row

        #df_return['id'] = df_return['points'].astype(int)
        #df_return['points'] = df_return['points'].astype(int)

        for row in df_return.iterrows():
            yield (row[1]['match_id'], row[1]['name'], row[1]['points'])

In [None]:
# UDTF to combine the 2 predictions into 1 and return the winner
# This is used in the knock-out stage when the winning team is promoted to the next round
# NOTE: it also calls the penalty function in the event of a draw 

output_struct = StructType([
    StructField("match_id", IntegerType()),
    StructField("t1", StringType()),
    StructField("t2", StringType()),
    StructField("winner", StringType()),
    StructField("penalty_shootout", StringType()),
])

@udtf(output_schema = output_struct,
    input_types = [FloatType(), StringType(), FloatType(), FloatType()],
    name = "calc_game_outcome_knockout", 
    session = session,
    is_permanent=True, 
    stage_location="@python_load",
    packages=["pandas"], 
    replace=True)

class calc_game_outcome_knockout:

    def __init__(self):
        self.fixture_id = []
        self.team_name = []
        self.outcome = []
        self.rank_delta = []

    def process(self, fixture_id, team_name, outcome, rank_delta):
        self.fixture_id.append(fixture_id)
        self.team_name.append(team_name)
        self.outcome.append(outcome)
        self.rank_delta.append(rank_delta)
    
    def end_partition(self):
        df = pd.DataFrame(zip(self.fixture_id, self.team_name, self.outcome, self.rank_delta), columns=['ID', 'TEAM_1', 'OUTCOME', 'RANK_DELTA'])

        df_return = pd.DataFrame(columns=['match_id','team_1','team_2','winner','penalty_shootout'])

        team_1 = df.iloc[0]['TEAM_1']
        team_2 = df.iloc[1]['TEAM_1']

        rank_delta = df.iloc[0]['RANK_DELTA']

        # Team 1 win, Team 2 loss - means Team 1 wins
        if (df.iloc[0]['OUTCOME'] == 1) & (df.iloc[1]['OUTCOME'] == 0):
            
            row = [df.iloc[0]['ID'],team_1,team_2,team_1,'']
            df_return.loc[len(df_return)] = row

        # Team 1 win, Team 2 win - means draws
        if (df.iloc[0]['OUTCOME'] == 1) & (df.iloc[1]['OUTCOME'] == 1):
            
            winning_team = penalty_shootout(team_1,team_2,rank_delta)
            row = [df.iloc[0]['ID'],team_1,team_2,winning_team,'YES']
            df_return.loc[len(df_return)] = row

        # Team 1 loss, Team 1 loss - means draws
        if (df.iloc[0]['OUTCOME'] == 0) & (df.iloc[1]['OUTCOME'] == 0):
            
            winning_team = penalty_shootout(team_1,team_2,rank_delta)
            row = [df.iloc[0]['ID'],team_1,team_2,winning_team,'YES']
            df_return.loc[len(df_return)] = row

        # Team 1 loss, Team 2 win - means Team 2 win
        if (df.iloc[0]['OUTCOME'] == 0) & (df.iloc[1]['OUTCOME'] == 1):
            
            row = [df.iloc[0]['ID'],team_1,team_2,team_2,'']
            df_return.loc[len(df_return)] = row

        for row in df_return.iterrows():
            yield (row[1]['match_id'], row[1]['team_1'], row[1]['team_2'],row[1]['winner'], row[1]['penalty_shootout'])
    


In [None]:
# SPROC that contains all the feature engineering code
# this will take the fixture information and then calculate:
# > difference in rank
# > trailing performance for both teams
# > determine whether this is in a neutral location
#
# NOTE: for this model, neutral will be true for all teams except Germany who are the host nation 

def prep_prediction_data(session: snowflake.snowpark.Session, from_id: int, to_id: int) -> str:

    # get list of fixtures for the groups stages
    df_fixture = session.table("fixture_live").select(
            F.col('"MATCH NUMBER"').alias("id"),
            F.to_date(F.col('"DATE"'), "DD/MM/YYYY HH24:MI").alias("date"),
            F.col('"HOME TEAM"').alias('team_1'),
            F.col('"AWAY TEAM"').alias('team_2')
        ).filter(
            (F.col('"ID"') >= from_id) & (F.col('"ID"') <= to_id))
        
    # rank stuff first
    df_rank = session.table("rankings").sort(F.col("rank_date"), ascending=True)

    df_match_rank_team_1 = df_fixture.join(
        df_rank,
        ((df_rank['rank_date'] <= df_fixture['date']) &
        (df_fixture['team_1'] == df_rank['country_full'])),
        'left'
    ).sort(['id', df_rank['rank_date'].desc()])

    window_spec = Window.partition_by('id').order_by(df_rank['rank_date'].desc())
    df_match_rank_team_1 = df_match_rank_team_1.select(
        F.row_number().over(window_spec).alias('row_number'),
        'id',
        'rank',
        'rank_date'
    ).filter(F.col('row_number') == 1)

    df_match_rank_team_1 = df_match_rank_team_1.withColumnRenamed('id', 'team_1_id') \
        .withColumnRenamed('rank', 'team_1_rank') \
        .drop('row_number', 'rank_date')

    # and next we'll want to do the same thing for the away team as well...
    df_match_rank_team_2 = df_fixture.join(
        df_rank,
        (
            (df_rank["rank_date"] <= df_fixture["date"])
            & (df_fixture["team_2"] == df_rank["country_full"])
        ),
        "left",
    ).sort(["id", F.desc("rank_date")])

    df_match_rank_team_2 = (
        df_match_rank_team_2.select(
            F.row_number()
            .over(Window.partitionBy("id").orderBy(F.desc("rank_date")))
            .alias("row_number"),
            "id",
            "rank",
            "rank_date",
        )
        .filter(F.col("row_number") == 1)
        .drop("row_number", "rank_date")
    )

    # Rename columns to reflect they are for the away team
    df_match_rank_team_2 = df_match_rank_team_2.withColumnRenamed(
        "id", "team_2_id"
    ).withColumnRenamed("rank", "team_2_rank")

    df_match_rank = (
        df_match_rank_team_1.join(
            df_match_rank_team_2,
            df_match_rank_team_1["team_1_id"] == df_match_rank_team_2["team_2_id"],
            "left",
        )
        .select(
            df_match_rank_team_1["team_1_id"].alias("id"),
            df_match_rank_team_1["team_1_rank"],
            df_match_rank_team_2["team_2_rank"]
        )
        .sort("id")
    )

    # and now trailing performance

    # team_1 
    df_history = (
        session.table("results")
        .sort(F.col("id"), ascending=True)
    )

    df_team_1 = (
        df_fixture.join(
            df_history,
            (
                (
                    (df_fixture["team_1"] == df_history["home_team"]) | (df_fixture["team_1"] == df_history["away_team"])
                )
                & (df_history["date"] < df_fixture["date"])
            ),
            "left",
        )
        .select(
            df_fixture["id"].alias("id"),
            df_fixture["date"].alias("match_date"),
            df_fixture["team_1"],
            df_fixture["team_2"],
            F.iff(
                df_fixture["team_1"] == F.lit('Germany'),0,
                F.iff(df_fixture["team_2"] == F.lit('Germany'),1,0)).alias('neutral'),
            df_history["home_team"].alias("home_team"),
            df_history["away_team"].alias("away_team"),
            df_history["date"].cast(DateType()).alias("history_date"),
            df_history["home_team_score"].alias("home_team_score"),
            df_history["away_team_score"].alias("away_team_score"),
        )
        .with_column(
            "is_home",
            F.when(df_fixture["team_1"] == df_history["home_team"], 1).otherwise(0),
        )
        .with_column(
            "row_number",
            F.row_number().over(Window.partition_by("id").order_by(F.col("history_date").desc())),
        )
        .filter(F.col("row_number") <= 10)
        .drop("row_number")
    )

    perf_udtf = F.table_function("calc_performance")

    df_team_1 = df_team_1.join_table_function(
        perf_udtf(
            df_team_1.col('id').cast(IntegerType()),
            df_team_1.col('history_date').cast(DateType()),
            df_team_1.col('home_team_score').cast(FloatType()),
            df_team_1.col('away_team_score').cast(FloatType()),
            df_team_1.col('is_home').cast(IntegerType())).over(partition_by='id',order_by=['history_date'])
        )\
        .select(
            F.col('id').alias('team_1_fixture_id'),
            F.col('goal_diff').alias('team_1_goal_diff'),
            F.col('ttl_wins').alias('team_1_ttl_wins'),
            F.col('ttl_losses').alias('team_1_ttl_losses'),
            F.col('ttl_draws').alias('team_1_ttl_draws')
        )

    # team 2
    df_team_2 = (
        df_fixture.join(
            df_history,
            (
                (
                    (df_fixture["team_2"] == df_history["home_team"])
                    | (df_fixture["team_2"] == df_history["away_team"])
                )
                & (df_history["date"] < df_fixture["date"])
            ),
            "left",
        )
        .select(
            df_fixture["id"].alias("id"),
            df_fixture["date"].alias("match_date"),
            df_fixture["team_1"],
            df_fixture["team_2"],
            F.iff(
                df_fixture["team_1"] == F.lit('Germany'),0,
                F.iff(df_fixture["team_2"] == F.lit('Germany'),1,0)).alias('neutral'),
            df_history["home_team"].alias("home_team"),
            df_history["away_team"].alias("away_team"),
            df_history["date"].cast(DateType()).alias("history_date"),
            df_history["home_team_score"].alias("home_team_score"),
            df_history["away_team_score"].alias("away_team_score"),
        )
        .with_column(
            "is_home",
            F.when(df_fixture["team_2"] == df_history["home_team"], 1).otherwise(
                0
            ),
        )
        .with_column(
            "row_number",
            F.row_number().over(Window.partition_by("id").order_by(F.col("history_date").desc())),
        )
        .filter(F.col("row_number") <= 10)
        .drop("row_number")
    )

    perf_udtf = F.table_function("calc_performance")

    df_team_2 = df_team_2.join_table_function(
        perf_udtf(
            df_team_2.col('id').cast(IntegerType()),
            df_team_2.col('history_date').cast(DateType()),
            df_team_2.col('home_team_score').cast(FloatType()),
            df_team_2.col('away_team_score').cast(FloatType()),
            df_team_2.col('is_home').cast(IntegerType())).over(partition_by='id',order_by=['history_date'])
        ) \
        .select(
            F.col('id').alias('team_2_fixture_id'),
            F.col('goal_diff').alias('team_2_goal_diff'),
            F.col('ttl_wins').alias('team_2_ttl_wins'),
            F.col('ttl_losses').alias('team_2_ttl_losses'),
            F.col('ttl_draws').alias('team_2_ttl_draws')
        )

    # and the perfornance final join
    df_team_perf = df_team_1 \
        .join(
            df_team_2,
            (
                df_team_2.col('team_2_fixture_id') == df_team_1.col('team_1_fixture_id')
            ),
            'left'
        ).drop(
            'team_2_fixture_id'
        ).rename(
            F.col("team_1_fixture_id"), 'id'
        )

    # join the rank and performance together
    df_fixture_full = df_fixture.join(
        df_match_rank,
        (
            df_fixture.col('id') == df_match_rank.col('id')
        ),
        'left'
    ).join(
        df_team_perf,
        (
            df_fixture.col('id') == df_team_perf.col('id')
        ),
        'left'
    ).select(
        df_fixture.col('id').alias('match_id'),
        df_team_perf.col('team_1_goal_diff'),
        df_team_perf.col('team_1_ttl_wins'),
        df_team_perf.col('team_1_ttl_losses'),
        df_team_perf.col('team_2_goal_diff'),
        df_team_perf.col('team_2_ttl_wins'),
        df_team_perf.col('team_2_ttl_losses'),
        (df_match_rank.col('team_1_rank') - df_match_rank.col('team_2_rank')).alias('team_1_vs_team_2_rank'),
    ).sort(
        F.col('id'), ascending=True
    ).na.fill(0)

    # add back in the team names
    df_fixture_full = df_fixture_full.join(
        df_fixture,
        (df_fixture_full.col('match_id') == df_fixture.col('id')),
        'left'
    ).select(
        df_fixture["ID"],
        df_fixture["TEAM_1"],
        df_fixture["TEAM_2"],
        df_fixture_full["TEAM_1_TTL_WINS"],
        df_fixture_full["TEAM_1_TTL_LOSSES"],
        df_fixture_full["TEAM_1_GOAL_DIFF"],
        df_fixture_full["TEAM_2_TTL_WINS"],
        df_fixture_full["TEAM_2_TTL_LOSSES"],
        df_fixture_full["TEAM_2_GOAL_DIFF"],
        df_fixture_full["TEAM_1_VS_TEAM_2_RANK"]
    )

    # split the games up, so we run predictions for both teams
    df_fixture_first = df_fixture_full.select(
        F.col('id'),

        df_fixture_full.col('team_1').alias('team_1'),

        df_fixture_full.col('team_1_ttl_wins'),
        df_fixture_full.col('team_1_ttl_losses'),
        df_fixture_full.col('team_1_goal_diff'),

        df_fixture_full.col('team_2_ttl_wins'),
        df_fixture_full.col('team_2_ttl_losses'),
        df_fixture_full.col('team_2_goal_diff'),
        
        df_fixture_full.col('team_1_vs_team_2_rank')
    )

    df_fixture_second = df_fixture_full.select(
        F.col('id'),

        df_fixture_full.col('team_2').alias('team_1'),

        df_fixture_full.col('team_2_ttl_wins').alias('team_1_ttl_wins'),
        df_fixture_full.col('team_2_ttl_losses').alias('team_1_ttl_losses'),
        df_fixture_full.col('team_2_goal_diff').alias('team_1_goal_diff'),
        
        df_fixture_full.col('team_1_ttl_wins').alias('team_2_ttl_wins'),
        df_fixture_full.col('team_1_ttl_losses').alias('team_2_ttl_losses'),
        df_fixture_full.col('team_1_goal_diff').alias('team_2_goal_diff'),
        
        df_fixture_full.col('team_1_vs_team_2_rank').alias('temp_df_team_1_vs_team_2_rank')
    )

    df_fixture_second = df_fixture_second.withColumn('team_1_vs_team_2_rank', -1 * df_fixture_second.col('temp_df_team_1_vs_team_2_rank'))
    df_fixture_second = df_fixture_second.drop('temp_df_team_1_vs_team_2_rank')

    df_fixture_all = df_fixture_first.union(df_fixture_second)

    df_fixture_all = df_fixture_all.with_column(
        'NEUTRAL',
        F.iff(df_fixture_all.col('team_1') == 'Germany',0,1)
    ) 

    df_fixture_all.write.save_as_table('data_for_predictions',mode='overwrite', table_type='temp')

    return 'done'

session.sproc.register(
    func=prep_prediction_data, 
    name="prep_prediction_data", 
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location='python_load',
    session=session,
    replace=True,
    execute_as='caller'
)

In [None]:
# SPROC to take the output from the model predictions and build the results view
# This will also be used to determine who should be in the next round
#
# NOTE: This is only for processing predictions from the group stage, since these are handled differently to knockout

def process_group_predictions(session: snowflake.snowpark.Session) -> str:

    prediction = session.table('predictions')
    
    # combine predictions
    outcome_udtf = F.table_function("calc_game_outcome")

    df_round_1_outcome = prediction.join_table_function(
        outcome_udtf(
            prediction.col('id').cast(FloatType()),
            prediction.col('team_1').cast(StringType()),
            prediction.col('output_game_outcome').cast(FloatType())).over(partition_by='id')
        ).select(
            F.col('match_id'),
            F.col('team_name'),
            F.col('points')    
        ) 

    # join the points calculations back to the original fixtures
    df_fixture = session.table('fixture')

    df_round_1_outcome = df_round_1_outcome.join(
        df_fixture,
        (df_round_1_outcome.col('match_id') == df_fixture.col('"MATCH NUMBER"')),
        'left'
    ).select(
        df_round_1_outcome.col('match_id'),
        df_round_1_outcome.col('team_name'),
        df_round_1_outcome.col('points'),
        df_fixture.col('"GROUP"').alias('group')
    )

    df_round_1_outcome = df_round_1_outcome.group_by(['group','team_name']).agg(F.sum('points').alias('points')).order_by(F.col('group'),F.col('points').desc())

    window_spec = Window.partition_by('group').order_by(df_round_1_outcome['points'].desc())
    df_round_1_outcome = df_round_1_outcome.with_column('rank', F.row_number().over(window_spec))
    df_round_1_outcome = df_round_1_outcome.with_column('qualified',F.iff(((df_round_1_outcome.col('rank') == 1 ) | (df_round_1_outcome.col('rank') == 2)),'YES',''))
    df_round_1_outcome = df_round_1_outcome.with_column(
        'place_code',
        F.concat(
            F.col('rank'),
            F.right(df_fixture.col('"GROUP"'),1))
        )
        
    df_round_1_outcome.write.save_as_table('results_group_stage', mode='overwrite',table_type='temp')

    # now factor in the top 4 3rd place teams - really annoying to have to do this UEFA!
    df_3rd_place_teams = session.table('results_group_stage').filter(df_round_1_outcome.col('rank') == 3)

    window_spec = Window.order_by(df_3rd_place_teams['points'].desc())
    df_3rd_place_teams = df_3rd_place_teams.with_column(
        'rank_third',
        F.row_number().over(window_spec))

    df_3rd_place_teams = df_3rd_place_teams.filter(df_3rd_place_teams.col('rank_third') <= 4)

    # do a merge update to mark the 3rd place teams as qualifying
    target = session.table('results_group_stage')
    source = df_3rd_place_teams

    target.merge(
        source,
        (target['group'] == source['group']) & (target['team_name'] == source['team_name']),
        [
            F.when_matched().update(
                {'qualified': F.lit('YES')}
            )
        ]
    )

    target.write.save_as_table('results_group_stage', mode='overwrite',table_type='temp')
    
    return 'done'

session.sproc.register(
    func=process_group_predictions, 
    name="process_group_predictions", 
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location='python_load',
    session=session,
    replace=True,
    execute_as='caller'
)



In [None]:
# SPROC to calculate what teams are in the first knockout round (round of 16)
# This is only used once, as there's only one situation where we go from group to knockout

def calculate_r16_games(session: snowflake.snowpark.Session) -> str:

    # get list of fixtures for the groups stages
    df_round_r16 = (
        session.table("fixture")
        .select(
            F.col('"MATCH NUMBER"').alias("id"),
            F.to_date(F.col('"DATE"'), "DD/MM/YYYY HH24:MI").alias("date"),
            F.col('"HOME TEAM"').alias('team_1'),
            F.col('"AWAY TEAM"').alias('team_2'),
            F.col('"ROUND NUMBER"')
        )
        .filter(
            (F.col('"ROUND NUMBER"') == 'Round of 16')))

    df_round_r16.write.save_as_table('fixture_round_of_16', mode='overwrite',table_type='temp')

    target = session.table("fixture_round_of_16")
    source = session.table('results_group_stage')

    # first merge pass
    target.merge(
        source,
        (target['team_1'] == source['place_code']),
        [
            F.when_matched().update(
                {'team_1': source.col('team_name')}
            )
        ]
    )

    target.collect()

    target = session.table("fixture_round_of_16")
    source = session.table('results_group_stage')

    # first merge pass to populate the teams we have explicit joins on (i.e. 1A, 2D)
    target.merge(
        source,
        (target['team_2'] == source['place_code']),
        [
            F.when_matched().update(
                {'team_2': source.col('team_name')}
            )
        ]
    )

    target.collect()

    # now we allocate 3rd place teams
    df_round_r16 = session.table("fixture_round_of_16")
    df_round_r16 = df_round_r16.filter(F.left(df_round_r16.col('team_2'),1) == '3')

    window_spec = Window.order_by(df_round_r16['team_2'].desc())
    df_round_r16 = df_round_r16.with_column('team_3_rank_join',F.row_number().over(window_spec))

    df_groups_3rd = session.table('results_group_stage').filter(
        (F.left(F.col('place_code'),1) == '3') & (F.col('qualified') == 'YES'))

    window_spec = Window.order_by(df_groups_3rd['place_code'].desc())
    df_groups_3rd = df_groups_3rd.with_column('rank_join',F.row_number().over(window_spec))

    df_groups_3rd = df_round_r16.join(
        df_groups_3rd,
        (df_groups_3rd.col('rank_join') == df_round_r16.col('team_3_rank_join')),
        'left').select(    
            df_round_r16.col('id'),
            df_groups_3rd.col('team_name')    
        )

    target = session.table("fixture_round_of_16")
    source = df_groups_3rd

    target.merge(
        source,
        (target.col('id') == source.col('id')),
        [
            F.when_matched().update(
                {'team_2': source.col('team_name')}
            )
        ]
    )

    target.collect() 

    target = session.table('fixture_live')
    source = session.table('fixture_round_of_16')

    target.merge(
        source,
        (target.col('"MATCH NUMBER"') == source.col('id')),
        [
            F.when_matched().update(
                {
                    '"HOME TEAM"': source.col('team_1'),
                    '"AWAY TEAM"': source.col('team_2'),
                }
            )
        ]
    )

    target.collect()

    return 'done'

session.sproc.register(
    func=calculate_r16_games, 
    name="calculate_r16_games", 
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location='python_load',
    session=session,
    replace=True,
    execute_as='caller'
)

In [None]:
# SPROC used for all knockout games
# It takes the predictions from the ML model and calculates 
# which teams won and should move to the next round

def process_knockout_predictions(session: snowflake.snowpark.Session) -> str:

    prediction = session.table('predictions')
    fixture_rank = session.table('data_for_predictions')

    prediction = prediction.join(
        fixture_rank,
        (prediction.col('id') == fixture_rank.col('id')),
        'left'
    ).select(
        prediction.col('id').alias('id'),
        prediction.col('team_1').alias('team_1'),
        prediction.col('output_game_outcome').alias('output_game_outcome'),
        fixture_rank.col('team_1_vs_team_2_rank').alias('team_1_vs_team_2_rank')
    )

    outcome_udtf = F.table_function("calc_game_outcome_knockout")

    df_round = prediction.join_table_function(
        outcome_udtf(
            prediction.col('id').cast(FloatType()),
            prediction.col('team_1').cast(StringType()),
            prediction.col('output_game_outcome').cast(FloatType()),
            prediction.col('team_1_vs_team_2_rank').cast(FloatType())).over(partition_by='id')
        ).select(
            F.col('match_id'),
            F.col('t1').alias('team_1'),
            F.col('t2').alias('team_2'),
            F.col('winner'),
            F.col('penalty_shootout'),
        ) 
    
    df_round.write.save_as_table('results_ko_stage',mode='overwrite',table_type='temp')
    return 'done'

session.sproc.register(
    func=process_knockout_predictions, 
    name="process_knockout_predictions", 
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location='python_load',
    session=session,
    replace=True,
    execute_as='caller')

In [None]:
# SPROC to calculate the next round of matches for the knockout phase
# It requires the round_name parameter

def calc_knockout_games(session: snowflake.snowpark.Session, round_name: str) -> str:

    df_round = session.table('results_ko_stage')
    
    if round_name == 'Quarter Finals':
        round_add = 4

    if round_name == 'Semi Finals':
        round_add = 2

    if round_name == 'Final':
        round_add = 1

    # at this point Im just going to randomly match teams...
    window_spec = Window.order_by(df_round['winner'].desc())

    df_round = df_round.with_column('winner_rank_join',F.row_number().over(window_spec))
    df_round.order_by('"WINNER_RANK_JOIN"')

    df_fixture = session.table('fixture_live').filter(F.col('"ROUND NUMBER"') == round_name)

    window_spec = Window.order_by(df_fixture['"MATCH NUMBER"'].asc())
    df_fixture = df_fixture.with_column('rank_join',F.row_number().over(window_spec))

    # populate home_team column
    df_fixture = df_fixture.join(
        df_round,
        (df_round.col('winner_rank_join') == df_fixture.col('rank_join')),
        'left'
    ).select(
        df_fixture.col('"MATCH NUMBER"').alias('match_id'),
        df_round.col('winner').alias('home_team'),
        df_fixture.col('rank_join')
    )

    # populate away_team column
    df_fixture = df_fixture.join(
        df_round,
        (df_round.col('winner_rank_join') == (df_fixture.col('rank_join') + round_add)),
        'left'
    ).select(
        df_fixture.col('match_id').alias('match_id'),
        df_fixture.col('home_team'),
        df_round.col('winner').alias('away_team')
    )

    target = session.table('fixture_live')
    source = df_fixture

    # first merge pass
    target.merge(
        source,
        (target['"MATCH NUMBER"'] == source['match_id']),
        [
            F.when_matched().update(
                {'"HOME TEAM"': source.col('home_team')}
            )
        ]
    )

    target.collect()

    target.merge(
        source,
        (target['"MATCH NUMBER"'] == source['match_id']),
        [
            F.when_matched().update(
                {'"AWAY TEAM"': source.col('away_team')}
            )
        ]
    )

    target.collect()
    
    return 'done'

session.sproc.register(
    func=calc_knockout_games, 
    name="calc_knockout_games", 
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location='python_load',
    session=session,
    replace=True,
    execute_as='caller'
)

# Summary

Let's double check we have all the objects we created.

### Stored Procedures:

- ```CALCULATE_R16_GAMES()```
- ```CALC_KNOCKOUT_GAMES(VARCHAR)```
- ```PREP_PREDICTION_DATA(NUMBER, NUMBER)```
- ```PROCESS_GROUP_PREDICTIONS()```
- ```PROCESS_KNOCKOUT_PREDICTIONS()```

### UDTFs:
- ```CALC_GAME_OUTCOME(FLOAT, VARCHAR, FLOAT)```
- ```CALC_GAME_OUTCOME_KNOCKOUT(FLOAT, VARCHAR, FLOAT, FLOAT)```

(note - we also created a function and UDTF earlier, they will appear in the list too)

In [None]:
-- we should now see all our procs 

show user procedures in schema public;

In [None]:
-- and our UDFs & UDTFs

show user functions in schema public;