In [None]:
import copy
import io
import os

import numpy as np
import pandas as pd
import plotly
import plotly.graph_objects as go
from hyperopt import Trials, fmin, hp, tpe
from PIL import Image

import helper.event_data as ed_help
import helper.io as io_help

# import helper functions coming with this project<
import helper.plotly as py_help
import helper.tracking_data as td_help
import helper.Visualizer as viz_help

if os.getcwd().split(os.sep)[-1] == "notebooks":
    os.chdir("../")


# this is very useful as it makes sure that always all columns and rows of a data frame are displayed
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

In [None]:
import importlib

importlib.reload(viz_help)

In [None]:
game = 1
df_events, df_formations = io_help.read_metrica_event_data(game=game)
df_track = io_help.read_tracking_data(game=game, clean=True)
df_track = td_help.add_position_delta(df_track)
df_track["vx"] = df_track["dx"] / df_track["dt"]
df_track["vy"] = df_track["dy"] / df_track["dt"]

# Add heatmap to visualizer

In [None]:
# initialize the tracking data visualizer
viz = viz_help.trackingDataVisualizer(df_track, df_events)
viz.set_sequence_by_frames(start_frame=300, end_frame=400)
viz.add_player_number()
fig = viz.get_figure()
# when watching a video we always need to call the figure like this, as validation is extremely slow for videos
plotly.offline.iplot(fig, validate=False, auto_play=False)

In [None]:
lst_vals = []
x = np.arange(106)
y = np.arange(69)
for i in np.arange(len(y)):
    lst_vals.append(list())
    for j in np.arange(len(x)):
        lst_vals[i].append(i + j)

z = np.array(lst_vals)

viz.add_heatmap([x] * 101, [y] * 101, [z] * 101)
pic = viz.get_single_picture(321)
pic.show()

In [None]:
"""
import time
start_time = time.time()
for frame in np.arange(301, 350):
    pic = viz.get_single_picture(frame) 
    fname = os.path.join(PATH, f"images//{frame}.png")
    pic.write_image(fname)
    #img_bytes = pic.to_image(format="png")
print("--- %s seconds ---" % (time.time() - start_time))
image = Image.open(io.BytesIO(img_bytes))
display(image)
ffmpeg.input(os.path.join(PATH, "images","*.png"), pattern_type="glob", framerate=25).output(os.path.join(PATH, "test.mp4")).run()
"""

# Pitch control model

In [None]:
def default_model_params(time_to_control_veto=3):
    """
    default_model_params()
    
    Returns the default parameters that define and evaluate the model. See Spearman 2018 for more details.
    
    Parameters
    -----------
    time_to_control_veto: If the probability that another team or player can get to the ball and control it is less than 10^-time_to_control_veto, ignore that player.
    
    
    Returns
    -----------
    
    params: dictionary of parameters required to determine and calculate the model
    
    """
    # key parameters for the model, as described in Spearman 2018
    params = {}
    # model parameters
    params[
        "max_player_accel"
    ] = 7.0  # maximum player acceleration m/s/s, not used in this implementation
    params["max_player_speed"] = 5.0  # maximum player speed m/s
    params[
        "reaction_time"
    ] = 0.7  # seconds, time taken for player to react and change trajectory. Roughly determined as vmax/amax
    params[
        "tti_sigma"
    ] = 0.45  # Standard deviation of sigmoid function in Spearman 2018 ('s') that determines uncertainty in player arrival time
    params[
        "kappa_def"
    ] = 1.72  # 1. # kappa parameter in Spearman 2018 (=1.72 in the paper) that gives the advantage defending players to control ball, I have set to 1 so that home & away players have same ball control probability
    params["lambda_att"] = 4.3  # ball control parameter for attacking team
    params["lambda_def"] = (
        4.3 * params["kappa_def"]
    )  # ball control parameter for defending team
    params["average_ball_speed"] = 30  # 15. # average ball travel speed in m/s
    # numerical parameters for model evaluation
    params["int_dt"] = 0.04  # integration timestep (dt)
    params["max_int_time"] = 1  # 10 # upper limit on integral time
    params[
        "model_converge_tol"
    ] = 0.01  # assume convergence when PPCF>0.99 at a given location.
    # The following are 'short-cut' parameters. We do not need to calculated PPCF explicitly when a player has a sufficient head start.
    # A sufficient head start is when the a player arrives at the target location at least 'time_to_control' seconds before the next player
    params["time_to_control_att"] = (
        time_to_control_veto
        * np.log(10)
        * (np.sqrt(3) * params["tti_sigma"] / np.pi + 1 / params["lambda_att"])
    )
    params["time_to_control_def"] = (
        time_to_control_veto
        * np.log(10)
        * (np.sqrt(3) * params["tti_sigma"] / np.pi + 1 / params["lambda_def"])
    )
    return params

In [None]:
params = default_model_params()

In [None]:
df_frame = df_track[(df_track["frame"] == 346)]

In [None]:
class Player(object):
    """
    methods include:
    -----------
    simple_time_to_intercept(r_final): time take for player to get to target position (r_final) given current position
    probability_intercept_ball(T): probability player will have controlled ball at time T given their expected 
                                   time_to_intercept
    """

    # player object holds position, velocity, time-to-intercept and pitch control contributions for each player
    def __init__(self, row, params):
        self.id = row["playerId"]
        self.teamname = row["team"]
        self.playername = f"{row['team']}_{row['playerId']}"
        self.vmax = params[
            "max_player_speed"
        ]  # player max speed in m/s. Could be individualised
        self.reaction_time = params[
            "reaction_time"
        ]  # player reaction time in 's'. Could be individualised
        self.tti_sigma = params[
            "tti_sigma"
        ]  # standard deviation of sigmoid function (see Eq 4 in Spearman, 2018)
        self.get_position(row)
        self.get_velocity(row)
        self.PPCF = 0.0  # initialise this for later
        self.PIP = 0.0  # pass interception probability
        self.t_int = None

        self.V_max = 7.8
        self.alpha = 1.3
        self.sigma = 0.45
        self.player_height = 2

    def get_position(self, row):
        self.position = np.array([row["xPos"], row["yPos"]])
        self.inframe = not np.any(np.isnan(self.position))

    def get_velocity(self, row):
        self.velocity = np.array([row["vx"], row["vy"]])
        if np.any(np.isnan(self.velocity)):
            self.velocity = np.array([0.0, 0.0])

    def simple_time_to_intercept(self, r_final):
        self.PPCF = 0.0  # initialise this for later
        # Time to intercept assumes that the player continues moving at current velocity for 'reaction_time' seconds
        # and then runs at full speed to the target position.
        r_reaction = self.position + self.velocity * self.reaction_time
        self.time_to_intercept = (
            self.reaction_time + np.linalg.norm(r_final - r_reaction) / self.vmax
        )
        return self.time_to_intercept

    def probability_intercept_ball(self, T):
        # probability of a player arriving at target location at time 'T' given their expected time_to_intercept (time of arrival), as described in Spearman 2018
        f = 1 / (
            1.0
            + np.exp(
                -np.pi / np.sqrt(3.0) / self.tti_sigma * (T - self.time_to_intercept)
            )
        )
        return f

    def compute_center_and_radius(self, t):
        center = (
            self.position
            + (1 - math.exp(-self.alpha * t * 0.04)) / self.alpha * self.velocity
        )
        radius = self.V_max * (
            t * 0.04 - (1 - math.exp(-self.alpha * t * 0.04)) / self.alpha
        )
        return center, radius

    def interception_probability(self, t, ball_pos):

        center, radius = self.compute_center_and_radius(t)

        if len(ball_pos) == 2:
            reach = np.linalg.norm(center - ball_pos) < radius

        if len(ball_pos) == 3:
            reach = (np.linalg.norm(center - ball_pos[:2]) < radius) and (
                ball_pos[2] < self.player_height
            )

        if reach:
            if self.t_int is None:
                if self.id == 21:
                    if t >= 0:
                        self.t_int = t
                else:
                    self.t_int = t

            if self.t_int is not None:
                P_int = 1 / (
                    1.0
                    + np.exp(
                        -np.pi / np.sqrt(3.0) / self.sigma * (t - self.t_int) * 0.04
                    )
                )
            else:
                P_int = 0

        else:
            if self.t_int is not None:
                self.t_int = None
            P_int = 0

        return P_int

In [None]:
def get_team_players(df_frame, team):
    team_players = []
    for _, row in df_frame.iterrows():
        if row["team"] == team:
            team_players.append(Player(row, params))
    return team_players

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Apr 19 14:52:19 2020
Module for calculating a Pitch Control surface using MetricaSports's tracking & event data.
Pitch control (at a given location on the field) is the probability that a team will gain 
possession if the ball is moved to that location on the field. 
Methdology is described in "Off the ball scoring opportunities" by William Spearman:
http://www.sloansportsconference.com/wp-content/uploads/2018/02/2002.pdf
GitHub repo for this code can be found here:
https://github.com/Friends-of-Tracking-Data-FoTD/LaurieOnTracking
Data can be found at: https://github.com/metrica-sports/sample-data
Functions
----------
calculate_pitch_control_at_target(): calculate the pitch control probability for the attacking and defending teams at a specified target position on the ball.
generate_pitch_control_for_event(): this function evaluates pitch control surface over the entire field at the moment
of the given event (determined by the index of the event passed as an input)
Classes
---------
The 'player' class collects and stores trajectory information for each player required by the pitch control calculations.
@author: Laurie Shaw (@EightyFivePoint)
"""

import numpy as np


def calculate_pitch_control_at_target(
    target_position, attacking_players, defending_players, ball_start_pos, params
):
    """ calculate_pitch_control_at_target
    
    Calculates the pitch control probability for the attacking and defending teams at a specified target position on the ball.
    
    Parameters
    -----------
        target_position: size 2 numpy array containing the (x,y) position of the position on the field to evaluate 
                         pitch control
        attacking_players: list of 'player' objects (see player class above) for the players on the attacking team (team in possession)
        defending_players: list of 'player' objects (see player class above) for the players on the defending team
        ball_start_pos: Current position of the ball (start position for a pass). If set to NaN, function will assume that 
                        the ball is already at the target position.
        params: Dictionary of model parameters (default model parameters can be generated using default_model_params() )
        
    Returrns
    -----------
        PPCFatt: Pitch control probability for the attacking team
        PPCFdef: Pitch control probability for the defending team ( 1-PPCFatt-PPCFdef <  params['model_converge_tol'] )
    """
    # calculate ball travel time from start position to end position.
    if ball_start_pos is None or any(
        np.isnan(ball_start_pos)
    ):  # assume that ball is already at location
        ball_travel_time = 0.0
    else:
        # ball travel time is distance to target position from current ball position divided assumed average ball speed
        ball_travel_time = (
            np.linalg.norm(target_position - ball_start_pos)
            / params["average_ball_speed"]
        )

    # first get arrival time of 'nearest' attacking player (nearest also dependent on current velocity)
    tau_min_att = np.nanmin(
        [p.simple_time_to_intercept(target_position) for p in attacking_players]
    )
    tau_min_def = np.nanmin(
        [p.simple_time_to_intercept(target_position) for p in defending_players]
    )

    # check whether we actually need to solve equation 3
    if (
        tau_min_att - max(ball_travel_time, tau_min_def)
        >= params["time_to_control_def"]
    ):
        # if defending team can arrive significantly before attacking team, no need to solve pitch control model
        return 0.0, 1.0
    elif (
        tau_min_def - max(ball_travel_time, tau_min_att)
        >= params["time_to_control_att"]
    ):
        # if attacking team can arrive significantly before defending team, no need to solve pitch control model
        return 1.0, 0.0
    else:
        # solve pitch control model by integrating equation 3 in Spearman et al.
        # first remove any player that is far (in time) from the target location
        attacking_players = [
            p
            for p in attacking_players
            if p.time_to_intercept - tau_min_att < params["time_to_control_att"]
        ]
        defending_players = [
            p
            for p in defending_players
            if p.time_to_intercept - tau_min_def < params["time_to_control_def"]
        ]

        # set up integration arrays
        dT_array = np.arange(
            ball_travel_time - params["int_dt"],
            ball_travel_time + params["max_int_time"],
            params["int_dt"],
        )

        PPCFatt = np.zeros_like(dT_array)
        PPCFdef = np.zeros_like(dT_array)
        # integration equation 3 of Spearman 2018 until convergence or tolerance limit hit (see 'params')
        ptot = 0.0
        i = 1
        while 1 - ptot > params["model_converge_tol"] and i < dT_array.size:
            T = dT_array[i]
            for player in attacking_players:
                # calculate ball control probablity for 'player' in time interval T+dt
                dPPCFdT = (
                    (1 - PPCFatt[i - 1] - PPCFdef[i - 1])
                    * player.probability_intercept_ball(T)
                    * params["lambda_att"]
                )
                # make sure it's greater than zero
                assert (
                    dPPCFdT >= 0
                ), "Invalid attacking player probability (calculate_pitch_control_at_target)"
                player.PPCF += (
                    dPPCFdT * params["int_dt"]
                )  # total contribution from individual player
                PPCFatt[
                    i
                ] += (
                    player.PPCF
                )  # add to sum over players in the attacking team (remembering array element is zero at the start of each integration iteration)
            for player in defending_players:
                # calculate ball control probablity for 'player' in time interval T+dt
                dPPCFdT = (
                    (1 - PPCFatt[i - 1] - PPCFdef[i - 1])
                    * player.probability_intercept_ball(T)
                    * params["lambda_def"]
                )
                # make sure it's greater than zero
                assert (
                    dPPCFdT >= 0
                ), "Invalid defending player probability (calculate_pitch_control_at_target)"
                player.PPCF += (
                    dPPCFdT * params["int_dt"]
                )  # total contribution from individual player
                PPCFdef[
                    i
                ] += player.PPCF  # add to sum over players in the defending team
            ptot = PPCFdef[i] + PPCFatt[i]  # total pitch control probability
            i += 1
        if i >= dT_array.size:
            pass
            # print("Integration failed to converge: %1.3f" % (ptot) )
        return PPCFatt[i - 1], PPCFdef[i - 1]

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Apr 19 14:52:19 2020
Module for calculating a Pitch Control surface using MetricaSports's tracking & event data.
Pitch control (at a given location on the field) is the probability that a team will gain 
possession if the ball is moved to that location on the field. 
Methdology is described in "Off the ball scoring opportunities" by William Spearman:
http://www.sloansportsconference.com/wp-content/uploads/2018/02/2002.pdf
GitHub repo for this code can be found here:
https://github.com/Friends-of-Tracking-Data-FoTD/LaurieOnTracking
Data can be found at: https://github.com/metrica-sports/sample-data
Functions
----------
calculate_pitch_control_at_target(): calculate the pitch control probability for the attacking and defending teams at a specified target position on the ball.
generate_pitch_control_for_event(): this function evaluates pitch control surface over the entire field at the moment
of the given event (determined by the index of the event passed as an input)
Classes
---------
The 'player' class collects and stores trajectory information for each player required by the pitch control calculations.
@author: Laurie Shaw (@EightyFivePoint)
"""

import numpy as np


def calculate_pitch_control_at_target(
    target_position, attacking_players, defending_players, ball_start_pos, params
):
    """ calculate_pitch_control_at_target
    
    Calculates the pitch control probability for the attacking and defending teams at a specified target position on the ball.
    
    Parameters
    -----------
        target_position: size 2 numpy array containing the (x,y) position of the position on the field to evaluate 
                         pitch control
        attacking_players: list of 'player' objects (see player class above) for the players on the attacking team (team in possession)
        defending_players: list of 'player' objects (see player class above) for the players on the defending team
        ball_start_pos: Current position of the ball (start position for a pass). If set to NaN, function will assume that 
                        the ball is already at the target position.
        params: Dictionary of model parameters (default model parameters can be generated using default_model_params() )
        
    Returrns
    -----------
        PPCFatt: Pitch control probability for the attacking team
        PPCFdef: Pitch control probability for the defending team ( 1-PPCFatt-PPCFdef <  params['model_converge_tol'] )
    """
    # calculate ball travel time from start position to end position.
    if ball_start_pos is None or any(
        np.isnan(ball_start_pos)
    ):  # assume that ball is already at location
        ball_travel_time = 0.0
    else:
        # ball travel time is distance to target position from current ball position divided assumed average ball speed
        ball_travel_time = (
            np.linalg.norm(target_position - ball_start_pos)
            / params["average_ball_speed"]
        )

    # first get arrival time of 'nearest' attacking player (nearest also dependent on current velocity)
    tau_min_att = np.nanmin(
        [p.simple_time_to_intercept(target_position) for p in attacking_players]
    )
    tau_min_def = np.nanmin(
        [p.simple_time_to_intercept(target_position) for p in defending_players]
    )

    # solve pitch control model by integrating equation 3 in Spearman et al.
    # first remove any player that is far (in time) from the target location
    attacking_players = [
        p
        for p in attacking_players
        if p.time_to_intercept - tau_min_att < params["time_to_control_att"]
    ]
    defending_players = [
        p
        for p in defending_players
        if p.time_to_intercept - tau_min_def < params["time_to_control_def"]
    ]

    # set up integration arrays
    dT_array = np.arange(
        ball_travel_time - params["int_dt"],
        ball_travel_time + params["max_int_time"],
        params["int_dt"],
    )

    PPCFatt = np.zeros_like(dT_array)
    PPCFdef = np.zeros_like(dT_array)
    # integration equation 3 of Spearman 2018 until convergence or tolerance limit hit (see 'params')
    ptot = 0.0
    i = 1
    while 1 - ptot > params["model_converge_tol"] and i < dT_array.size:
        T = dT_array[i]
        for player in attacking_players:
            # calculate ball control probablity for 'player' in time interval T+dt
            dPPCFdT = (
                (1 - PPCFatt[i - 1] - PPCFdef[i - 1])
                * player.probability_intercept_ball(T)
                * params["lambda_att"]
            )
            # make sure it's greater than zero
            assert (
                dPPCFdT >= 0
            ), "Invalid attacking player probability (calculate_pitch_control_at_target)"
            player.PPCF += (
                dPPCFdT * params["int_dt"]
            )  # total contribution from individual player
            PPCFatt[
                i
            ] += (
                player.PPCF
            )  # add to sum over players in the attacking team (remembering array element is zero at the start of each integration iteration)
        for player in defending_players:
            # calculate ball control probablity for 'player' in time interval T+dt
            dPPCFdT = (
                (1 - PPCFatt[i - 1] - PPCFdef[i - 1])
                * player.probability_intercept_ball(T)
                * params["lambda_def"]
            )
            # make sure it's greater than zero
            assert (
                dPPCFdT >= 0
            ), "Invalid defending player probability (calculate_pitch_control_at_target)"
            player.PPCF += (
                dPPCFdT * params["int_dt"]
            )  # total contribution from individual player
            PPCFdef[i] += player.PPCF  # add to sum over players in the defending team
        ptot = PPCFdef[i] + PPCFatt[i]  # total pitch control probability
        i += 1
    return PPCFatt[i - 1], PPCFdef[i - 1]

In [None]:
def generate_pitch_control_for_event(
    event_id,
    events,
    tracking_home,
    tracking_away,
    params,
    field_dimen=(106.0, 68.0,),
    n_grid_cells_x=50,
):
    """ generate_pitch_control_for_event
    
    Evaluates pitch control surface over the entire field at the moment of the given event (determined by the index of the event passed as an input)
    
    Parameters
    -----------
        event_id: Index (not row) of the event that describes the instant at which the pitch control surface should be calculated
        events: Dataframe containing the event data
        tracking_home: tracking DataFrame for the Home team
        tracking_away: tracking DataFrame for the Away team
        params: Dictionary of model parameters (default model parameters can be generated using default_model_params() )
        field_dimen: tuple containing the length and width of the pitch in meters. Default is (106,68)
        n_grid_cells_x: Number of pixels in the grid (in the x-direction) that covers the surface. Default is 50.
                        n_grid_cells_y will be calculated based on n_grid_cells_x and the field dimensions
        
    Returrns
    -----------
        PPCFa: Pitch control surface (dimen (n_grid_cells_x,n_grid_cells_y) ) containing pitch control probability for the attcking team.
               Surface for the defending team is just 1-PPCFa.
        xgrid: Positions of the pixels in the x-direction (field length)
        ygrid: Positions of the pixels in the y-direction (field width)
    """
    # get the details of the event (frame, team in possession, ball_start_position)
    pass_frame = events.loc[event_id]["Start Frame"]
    pass_team = events.loc[event_id].Team
    ball_start_pos = np.array(
        [events.loc[event_id]["Start X"], events.loc[event_id]["Start Y"]]
    )
    # break the pitch down into a grid
    n_grid_cells_y = int(n_grid_cells_x * field_dimen[1] / field_dimen[0])
    xgrid = np.linspace(-field_dimen[0] / 2.0, field_dimen[0] / 2.0, n_grid_cells_x)
    ygrid = np.linspace(-field_dimen[1] / 2.0, field_dimen[1] / 2.0, n_grid_cells_y)
    # initialise pitch control grids for attacking and defending teams
    PPCFa = np.zeros(shape=(len(ygrid), len(xgrid)))
    PPCFd = np.zeros(shape=(len(ygrid), len(xgrid)))
    # initialise player positions and velocities for pitch control calc (so that we're not repeating this at each grid cell position)
    if pass_team == "Home":
        attacking_players = initialise_players(
            tracking_home.loc[pass_frame], "Home", params
        )
        defending_players = initialise_players(
            tracking_away.loc[pass_frame], "Away", params
        )
    elif pass_team == "Away":
        defending_players = initialise_players(
            tracking_home.loc[pass_frame], "Home", params
        )
        attacking_players = initialise_players(
            tracking_away.loc[pass_frame], "Away", params
        )
    else:
        assert False, "Team in possession must be either home or away"
    # calculate pitch pitch control model at each location on the pitch
    for i in range(len(ygrid)):
        for j in range(len(xgrid)):
            target_position = np.array([xgrid[j], ygrid[i]])
            PPCFa[i, j], PPCFd[i, j] = calculate_pitch_control_at_target(
                target_position,
                attacking_players,
                defending_players,
                ball_start_pos,
                params,
            )
    # check probabilitiy sums within convergence
    checksum = np.sum(PPCFa + PPCFd) / float(n_grid_cells_y * n_grid_cells_x)
    assert 1 - checksum < params["model_converge_tol"], "Checksum failed: %1.3f" % (
        1 - checksum
    )
    return PPCFa, xgrid, ygrid

In [None]:
defending_players = get_team_players(df_frame, "Home")
attacking_players = get_team_players(df_frame, "Away")
df_ball = df_frame[df_frame["playerId"] == -1].iloc[0]
ball_position = np.array([df_ball["xPos"], df_ball["yPos"]])
target_position = np.array([30, 34])

In [None]:
lst_off_vals = []
lst_def_vals = []
x = np.arange(106)
y = np.arange(69)
for y_pos in np.arange(len(y)):
    lst_off_vals.append(list())
    lst_def_vals.append(list())
    for x_pos in np.arange(len(x)):
        target_position = np.array([x_pos, y_pos])
        control = calculate_pitch_control_at_target(
            target_position, attacking_players, defending_players, ball_position, params
        )
        lst_off_vals[y_pos].append(control[0])
        lst_def_vals[y_pos].append(control[1])

z = np.array(lst_off_vals)
total_reach = z + np.array(lst_def_vals)

z = (z / total_reach - 0.5) * total_reach

In [None]:
# initialize the tracking data visualizer
viz = viz_help.trackingDataVisualizer(df_track, df_events)
viz.set_sequence_by_frames(start_frame=300, end_frame=400)
viz.add_player_number()

In [None]:
viz.add_heatmap([x] * 101, [y] * 101, [z] * 101)
pic = viz.get_single_picture(346)
pic.show()

# Sumpter

In [None]:
import math

In [None]:
class PassProbabilityModeler:
    def __init__(self, three_dim=True):

        self.dt = 0.04

        # player params
        self.lambda_player = 4.3
        self.player_height = 2

        self.eps = 0.00001

        # ball params
        self.m = 0.42
        self.rho = 1.225
        self.drag = 0.25
        self.area = 0.038
        self.dt = 0.04
        self.mu = 0.55
        self.g = 9.81
        self.ball_factor = -1 / self.m * self.rho * self.drag * self.area

        self.ball_stop_times = self._get_stopping_time_by_resistance()

        if three_dim:
            self.min_angles_overpass = self._get_minimum_required_angle_to_overpass()

        # frame variables
        self.defending_players = None
        self.attacking_players = None
        self.ball_position = [0, 0]

        # pass variables
        self.ball_pos_per_frame = None

    def set_current_frame(self, df_frame, team_ball_position):

        self.frame = df_frame.copy()

        def_team = "Home" if team_ball_position == "Away" else "Away"
        att_team = "Away" if team_ball_position == "Away" else "Home"

        self.defending_players = get_team_players(df_frame, def_team)
        self.attacking_players = get_team_players(df_frame, att_team)

        df_ball = df_frame[df_frame["playerId"] == -1].iloc[0]
        self.ball_position = np.array([df_ball["xPos"], df_ball["yPos"]])

    def compute_ball_position_at_every_time(self, angle, speed):

        tmp_r = copy.deepcopy(self.ball_position)

        if type(angle) != float and type(angle) != int:

            if angle[1] < self.eps:
                angle = angle[0]

            else:
                angle[1] = np.pi / 2 - angle[1]

                pass_direction = np.array(
                    [
                        math.cos(angle[0]) * math.sin(angle[1]),
                        math.sin(angle[0]) * math.sin(angle[1]),
                        math.cos(angle[1]),
                    ]
                )
                tmp_r = np.array(list(tmp_r) + [0.0])

        if type(angle) == float or type(angle) == int:
            pass_direction = np.array([math.cos(angle), math.sin(angle)])

        tmp_v = speed * pass_direction

        r = [tmp_r]
        v = [tmp_v]
        a = []

        if type(angle) == float or type(angle) == int:

            t_max = self.ball_stop_times[speed]

            for i in np.arange(t_max):

                if i < 2 / 3 * t_max:
                    tmp_a = self.ball_factor * np.linalg.norm(tmp_v) * tmp_v
                else:
                    tmp_a = -1 * self.mu * self.g * pass_direction

                tmp_v = tmp_v + tmp_a * self.dt
                tmp_r = tmp_r + tmp_v * self.dt
                r.append(tmp_r)

        else:

            for i in np.arange(249):

                a_direction = self.ball_factor * np.linalg.norm(tmp_v[:2]) * tmp_v[:2]
                a_height = -1 * self.g

                tmp_a = np.array(list(a_direction) + [a_height])

                tmp_v = tmp_v + tmp_a * self.dt
                tmp_r = tmp_r + tmp_v * self.dt
                tmp_r[2] = np.max([tmp_r[2], 0.0])
                r.append(tmp_r)

        return r

    def _ball_stopped_by_resistance(self, speed, T):

        tmp_r = np.array([0.0, 0.0])
        pass_direction = np.array([1, 0])

        tmp_v = speed * pass_direction

        for i in np.arange(T):

            if i < 2 / 3 * T:
                tmp_a = self.ball_factor * np.linalg.norm(tmp_v) * tmp_v
            else:
                tmp_a = -1 * self.mu * self.g * pass_direction

            if any(np.abs(tmp_v) - np.abs(tmp_a * self.dt) < -1 * self.eps):
                return True

            tmp_v = tmp_v + tmp_a * self.dt
            tmp_r = tmp_r + tmp_v * self.dt

        return False

    def _get_stopping_time_by_resistance(self, min_speed=1, max_speed=20):

        dict_t_max = {}

        for speed in np.arange(min_speed, max_speed + 1):

            min_val = 0
            max_val = 300
            mid_val = int((max_val + min_val) / 2)

            while True:
                stopped_ball = self._ball_stopped_by_resistance(speed, mid_val)
                if stopped_ball:
                    max_val = mid_val
                    mid_val = int((max_val + min_val) / 2)
                else:
                    min_val = mid_val
                    mid_val = int((max_val + min_val) / 2)

                if min_val == mid_val:
                    break

            dict_t_max[speed] = mid_val

        return dict_t_max

    def _get_minimum_required_angle_to_overpass(self, min_speed=1, max_speed=20):

        dict_z_angle = dict()

        for speed in np.arange(min_speed, max_speed + 1):

            r = model.compute_ball_position_at_every_time([0, np.pi / 3], speed)

            if max([pos[2] for pos in r]) < self.player_height:
                min_angle = np.pi / 3
            else:
                for angle in np.arange(np.pi / (3 * 30), np.pi / 3, np.pi / (3 * 30)):
                    r = model.compute_ball_position_at_every_time([0, angle], speed)

                    if max([pos[2] for pos in r]) > self.player_height:
                        min_angle = angle - np.pi / (3 * 30)
                        break

            dict_z_angle[speed] = min_angle

        return dict_z_angle

    def get_pass_success_probability(self, angle, speed, specific_players=None):

        if type(specific_players) == int:
            specific_players = [specific_players]

        for player in self.attacking_players:
            player.PIP = 0

        for player in self.defending_players:
            player.PIP = 0

        # compute ball positions at the different times
        r = self.compute_ball_position_at_every_time(angle, speed)

        pass_prob_att = np.zeros(250)
        pass_prob_def = np.zeros(250)
        r = np.array(r + [r[-1]] * (250 - len(r)))

        ball_in_play = True
        total_proba = 0

        t = 1

        while 1 - total_proba > self.eps and ball_in_play and t < 250:

            ball_pos = r[t]

            for player in self.attacking_players:

                if specific_players is not None and player.id not in specific_players:
                    continue

                P_int = player.interception_probability(t, ball_pos)
                dPdT = (
                    (1 - pass_prob_att[t - 1] - pass_prob_def[t - 1])
                    * P_int
                    * self.lambda_player
                )

                player.PIP += dPdT * self.dt
                pass_prob_att[t] += player.PIP

            for player in self.defending_players:

                P_int = player.interception_probability(t, ball_pos)
                dPdT = (
                    (1 - pass_prob_att[t - 1] - pass_prob_def[t - 1])
                    * P_int
                    * self.lambda_player
                )

                player.PIP += dPdT * self.dt
                pass_prob_def[t] += player.PIP

            if (
                (ball_pos[0] > 105)
                or (ball_pos[0] < 0)
                or (ball_pos[1] > 68)
                or (ball_pos[1] < 0)
            ):
                ball_in_play = False
                pass_prob_def[t] = 1 - pass_prob_att[t - 1]
                pass_prob_att[t] = pass_prob_att[t - 1]

            total_proba = pass_prob_att[t] + pass_prob_def[t]

            t += 1

        poss_intercept = pass_prob_att + pass_prob_def > self.eps

        # only get the times when there is a chance for a player to touch the ball
        rel_a = pass_prob_att[poss_intercept]
        rel_d = pass_prob_def[poss_intercept]
        rel_r = r[poss_intercept]
        rel_t = np.arange(250)[poss_intercept] * 0.04

        # make sure the total interception probability is at most 1
        last_prob = rel_d[-1] + rel_a[-1]
        rel_d[-1] /= last_prob
        rel_a[-1] /= last_prob

        # get the probability for the ball to be touched in every frame
        prob_a = np.insert(np.diff(rel_a), 0, rel_a[0])
        prob_d = np.insert(np.diff(rel_d), 0, rel_d[0])

        # save all the relevant information in a data frame
        df_interception = pd.DataFrame()
        df_interception["defProba"] = prob_d
        df_interception["attProba"] = prob_a
        df_interception["time"] = rel_t
        df_interception["xPos"] = rel_r[:, 0]
        df_interception["yPos"] = rel_r[:, 1]

        if type(angle) != float and type(angle) != int:
            df_interception["zPos"] = rel_r[:, 2]
            df_interception["angle"] = angle[0]
            df_interception["angleHeight"] = angle[1]
        else:
            df_interception["angle"] = angle

        df_interception["speed"] = speed

        return rel_a[-1], df_interception

# Identify ground passes

In [None]:
df_pass = df_events[
    (df_events["eventName"] == "Pass")
    & (df_events["startFrame"] < df_events["endFrame"])
    & (df_events["startFrame"] >= 5)
].copy()

df_pass = df_pass[
    [
        "id",
        "matchId",
        "startFrame",
        "endFrame",
        "eventName",
        "subEventName",
        "team",
        "playerId",
        "toPlayerId",
        "accurate",
    ]
].copy()

In [None]:
df_ball = df_track[df_track["playerId"] == -1][
    ["frame", "xPos", "yPos", "ballInPlay"]
].copy()

df_ball.columns = ["startFrame", "xPosStart", "yPosStart", "ballInPlayStart"]
df_pass = pd.merge(df_pass, df_ball, how="left")

df_ball.columns = ["endFrame", "xPosEnd", "yPosEnd", "ballInPlayEnd"]
df_pass = pd.merge(df_pass, df_ball, how="left")
df_pass = df_pass[df_pass["xPosEnd"].notnull()].copy()

In [None]:
lst_passes = list()

for att_team in ["Home", "Away"]:

    df_players_att = df_track[df_track["team"] == att_team].copy()
    df_pass_team = df_pass[df_pass["team"] == att_team].copy()
    df_pass_team.rename(columns={"endFrame": "frame"}, inplace=True)

    df_close_att = td_help.closest_player_to_point(
        df_players_att, df_pass_team, ["xPosEnd", "yPosEnd"], "closestPlayerAttack"
    )

    df_players_def = df_track[
        (df_track["team"] != att_team) & (df_track["playerId"] != -1)
    ].copy()

    df_close_def = td_help.closest_player_to_point(
        df_players_def, df_pass_team, ["xPosEnd", "yPosEnd"], "closestPlayerDefense"
    )

    df_close = pd.merge(
        df_close_att[["startFrame", "closestPlayerAttack"]],
        df_close_def[["startFrame", "closestPlayerDefense"]],
    )

    df_pass_team = pd.merge(df_pass_team, df_close)

    lst_passes.append(df_pass_team)

df_pass = pd.concat(lst_passes)
df_pass = df_pass[df_pass["ballInPlayStart"] == 1].copy()
df_pass.reset_index(inplace=True, drop=True)

In [None]:
model = PassProbabilityModeler()

In [None]:
df_pass_acc = df_pass[df_pass["accurate"] == 1].copy()

In [None]:
pass_probs = list()

print(f"Total passes: {len(df_pass_acc)}")

for i, row in df_pass_acc.iterrows():

    if i % 50 == 0:
        print(i)

    df_frame = df_track[df_track["frame"] == row["startFrame"]].copy()
    model.set_current_frame(df_frame, row["team"])

    vec = [row["xPosEnd"] - row["xPosStart"], row["yPosEnd"] - row["yPosStart"]]
    vec = vec / np.linalg.norm(vec)
    angle = math.atan2(vec[1], vec[0])
    player = int(row["toPlayerId"])

    best_prob = 0
    for speed in np.arange(1, 21):
        prob, _ = model.get_pass_success_probability(angle, speed, player)

        if prob > best_prob:
            best_prob = prob
            best_speed = speed

    pass_probs.append([row["id"], row["startFrame"], best_speed, best_prob])

df_acc_pass_ground = pd.DataFrame(
    pass_probs, columns=["id", "startFrame", "bestSpeedGround", "probGround"]
)

In [None]:
pass_probs = list()

print(f"Total passes: {len(df_pass_acc)}")

for i, row in df_pass_acc.iterrows():

    if i % 50 == 0:
        print(i)

    df_frame = df_track[df_track["frame"] == row["startFrame"]].copy()

    model.set_current_frame(df_frame, row["team"])

    vec = [row["xPosEnd"] - row["xPosStart"], row["yPosEnd"] - row["yPosStart"]]
    vec = vec / np.linalg.norm(vec)
    ground_angle = math.atan2(vec[1], vec[0])
    player = int(row["toPlayerId"])

    best_prob = 0
    for speed in np.arange(1, 21):

        min_angle = model.min_angles_overpass[speed]
        for height_angle in np.arange(min_angle, np.pi / 3, np.pi / (3 * 30)):
            prob, _ = model.get_pass_success_probability(
                [ground_angle, height_angle], speed, player
            )

            if prob > best_prob:
                best_prob = prob
                best_speed = speed
                best_angle = height_angle

    pass_probs.append([row["id"], row["startFrame"], best_speed, best_angle, best_prob])

df_acc_pass_high = pd.DataFrame(
    pass_probs,
    columns=["id", "startFrame", "bestSpeedHigh", "bestAngleHigh", "probHigh"],
)

In [None]:
df_all = pd.merge(df_acc_pass_ground, df_acc_pass_high, on=["id", "startFrame"])
df_all["prior"] = 1 - np.sqrt((df_all["probHigh"] - df_all["probGround"]).clip(lower=0))
df_all["bayesProb"] = (
    df_all["probGround"]
    * df_all["prior"]
    / (
        df_all["probGround"] * df_all["prior"]
        + df_all["probHigh"] * (1 - df_all["prior"])
    )
)

In [None]:
interesting = [47995, 124536, 1110]

In [None]:
check = [117240, 117826]

In [None]:
df_inacc_pass = df_pass[
    (df_pass["accurate"] == 0) & (df_pass["endFrame"] > df_pass["startFrame"])
].copy()

In [None]:
len(df_inacc_pass)

In [None]:
df_inacc_pass.head()

In [None]:
df_ball = df_track[df_track["playerId"] == -1].copy()

In [None]:
att_team = "Away"

df_players_att = df_track[df_track["team"] == att_team].copy()
df_pass_att = df_inacc_pass[df_inacc_pass["team"] == att_team].copy()
df_point = df_pass_att[["startFrame", "endFrame"]].copy()
df_point.rename(columns={"endFrame": "frame"}, inplace=True)
df_point = pd.merge(
    df_point, df_ball[["frame", "xPos", "yPos", "ballInPlay"]], how="left", on="frame"
)

df_close_att = td_help.closest_player_to_point(
    df_players_att, df_point, ["xPos", "yPos"], "closestPlayerAttack"
)

df_players_def = df_track[
    (df_track["team"] != att_team) & (df_track["playerId"] != -1)
].copy()
df_close_def = td_help.closest_player_to_point(
    df_players_def, df_point, ["xPos", "yPos"], "closestPlayerDefense"
)

df_close = pd.merge(
    df_close_att[["startFrame", "closestPlayerAttack"]],
    df_close_def[["startFrame", "closestPlayerDefense"]],
)

In [None]:
sers = pd.merge(df_pass, df_close)
sers.head()

In [None]:
pass_probs = list()

print(f"Total passes: {len(sers)}")

for i, row in sers.iterrows():

    if i % 50 == 0:
        print(i)

    df_frame = df_track[df_track["frame"] == row["startFrame"]].copy()

    if min(df_frame["ballInPlay"]) == 0:
        continue

    model.set_current_frame(df_frame, row["team"])

    vec = [
        row["posAfterXMeters"] - model.ball_position[0],
        row["posAfterYMeters"] - model.ball_position[1],
    ]
    vec = vec / np.linalg.norm(vec)
    angle = math.atan2(vec[1], vec[0])
    player = int(row["closestPlayerAttack"])

    best_prob = 0
    for speed in np.arange(1, 21):

        model.get_pass_success_probability(angle, speed, player)

        def_player = [
            player
            for player in model.defending_players
            if player.id == int(row["closestPlayerDefense"])
        ][0]
        prob = def_player.PIP

        if prob > best_prob:
            best_prob = prob
            best_speed = speed

    pass_probs.append([row["id"], row["startFrame"], best_speed, best_prob])

df_inacc_pass_probs_ground = pd.DataFrame(
    pass_probs, columns=["id", "startFrame", "bestSpeed", "bestProb"]
)

In [None]:
df_inacc_pass_probs_ground.columns = [
    "id",
    "startFrame",
    "bestSpeedGround",
    "probGround",
]

In [None]:
pass_probs = list()

print(f"Total passes: {len(sers)}")

for i, row in sers.iterrows():

    if i % 50 == 0:
        print(i)

    df_frame = df_track[df_track["frame"] == row["startFrame"]].copy()

    if min(df_frame["ballInPlay"]) == 0:
        continue

    model.set_current_frame(df_frame, row["team"])

    vec = [
        row["posAfterXMeters"] - model.ball_position[0],
        row["posAfterYMeters"] - model.ball_position[1],
    ]
    vec = vec / np.linalg.norm(vec)
    ground_angle = math.atan2(vec[1], vec[0])
    player = int(row["closestPlayerAttack"])

    best_prob = 0
    for speed in np.arange(1, 21):

        min_angle = model.min_angles_overpass[speed]

        for height_angle in np.arange(min_angle, np.pi / 3, np.pi / (3 * 30)):
            model.get_pass_success_probability(
                [ground_angle, height_angle], speed, player
            )

            def_player = [
                player
                for player in model.defending_players
                if player.id == int(row["closestPlayerDefense"])
            ][0]
            prob = def_player.PIP

            if prob > best_prob:
                best_prob = prob
                best_speed = speed
                best_angle = height_angle

    pass_probs.append([row["id"], row["startFrame"], best_speed, best_angle, best_prob])

df_inacc_pass_probs_high = pd.DataFrame(
    pass_probs, columns=["id", "startFrame", "bestSpeed", "bestHeightAngle", "bestProb"]
)

In [None]:
df_inacc_pass_probs_high.columns = [
    "id",
    "startFrame",
    "bestSpeedHigh",
    "bestAngleHigh",
    "probHigh",
]

In [None]:
df_all_inacc = pd.merge(
    df_inacc_pass_probs_ground, df_inacc_pass_probs_high, on=["id", "startFrame"]
)
df_all_inacc["prior"] = 1 - np.sqrt(
    (df_all_inacc["probHigh"] - df_all_inacc["probGround"]).clip(lower=0)
)
df_all_inacc["bayesProb"] = (
    df_all_inacc["probGround"]
    * df_all_inacc["prior"]
    / (
        df_all_inacc["probGround"] * df_all_inacc["prior"]
        + df_all_inacc["probHigh"] * (1 - df_all_inacc["prior"])
    )
)

In [None]:
df_all_inacc.head(40)

In [None]:
# initialize the tracking data visualizer
frame = 63980
viz = viz_help.trackingDataVisualizer(df_track, df_events)
viz.set_sequence_by_frames(start_frame=frame, end_frame=frame + 150)
viz.add_player_number()
fig = viz.get_figure()
# when watching a video we always need to call the figure like this, as validation is extremely slow for videos
plotly.offline.iplot(fig, validate=False, auto_play=False)

In [None]:
import plotly.express as px

fig = px.histogram(df_all, x="bayesProb", nbins=20)
fig.show()

In [None]:
row = sers[sers["startFrame"] == 4709].iloc[0]

In [None]:
row

In [None]:
df_frame = df_track[df_track["frame"] == row["startFrame"]].copy()
    model.set_current_frame(df_frame, row["team"])

    vec = [row["posAfterXMeters"] - model.ball_position[0], row["posAfterYMeters"] - model.ball_position[1]]
    vec = vec / np.linalg.norm(vec)
    angle = math.atan2(vec[1], vec[0])
    #angle = [angle, np.pi/6]
    #player = int(row["toPlayerId"])
    player = int(row["closestPlayerAttack"])
    
    best_prob = 0
    """
    for speed in np.arange(1, 21):
        prob, _ = model.get_pass_success_probability(angle, speed, player)
        
        print(prob)

        if prob > best_prob:
            best_prob = prob
            best_speed = speed
    """
    speed = 17
    
    prob, sers  = model.get_pass_success_probability(angle, speed, player)
    
    prob

In [None]:
sers

In [None]:
for tmp_player in model.defending_players:
    print(tmp_player.id)
    print(tmp_player.PIP)
    print("###")

In [None]:
sers

In [None]:
sers

In [None]:
for player in model.attacking_players:
    if player.id == 27:
        tmp_player = player
    print(player.id)
    print(player.PIP)

In [None]:
tmp_player.velocity

In [None]:
tmp_player.position

In [None]:
tmp_player.compute_center_and_radius(75)

In [None]:
sers

In [None]:
prob

In [None]:
row

In [None]:
df_pass.iloc[218]

In [None]:
df_pass_probs = pd.DataFrame(
    pass_probs, columns=["id", "startFrame", "bestSpeed", "bestProb"]
)

In [None]:
df_pass_probs

In [None]:
pass_probs

In [None]:
pass_probs

In [None]:
pass_probs

In [None]:
row

In [None]:
model.set_current_frame(df_frame, "Away")

In [None]:
vec = [51.45 - 47.25, 36.04 - 46.24]
vec = vec / np.linalg.norm(vec)
angle = math.atan2(vec[1], vec[0])

player = 21

In [None]:
angle

In [None]:
model.get_pass_success_probability(angle, 15, player)

Identify players that can reach the ball

In [None]:
nb_angles = 30
angles = np.arange(0, 2 * np.pi, 2 * np.pi / nb_angles)
speed_vals = np.arange(1, 21)

test_indices = np.arange(1, nb_angles + 1)
np.random.shuffle(test_indices)
test_indices = [0] + list(test_indices)

In [None]:
all_players = [player.id for player in model.attacking_players]
reachable_players = list()

for player in all_players:

    print(player)
    break_val = 0

    vec = model.attacking_players[0].position - model.ball_position
    vec = vec / np.linalg.norm(vec)
    tmp_angle = math.atan2(vec[1], vec[0])
    tmp_angles = np.array([tmp_angle] + list(angles))

    for i in test_indices:
        angle = tmp_angles[i]
        if break_val == 1:
            break
        for speed in speed_vals:
            prob, _ = model.get_pass_success_probability(angle, speed, player)
            if prob > 0.05:
                reachable_players.append([player, angle, speed])
                break_val = 1
                break

In [None]:
reachable_players

In [None]:
def get_pass_impact(df):
    df["distToGoal"] = np.sqrt(
        df["xPos"] * df["xPos"] + (df["yPos"] - 34) * (df["yPos"] - 34)
    )
    df["positionImpact"] = 1 / df["distToGoal"]
    pass_impact = sum(df["positionImpact"] * df["attProba"])
    return pass_impact

In [None]:
def opt_func(params):

    min_prob = 0
    attack_prob, df_interception = model.get_pass_success_probability(
        params[0], params[1]
    )

    if attack_prob >= min_prob:
        pass_impact = get_pass_impact(df_interception)
    else:
        pass_impact = 0
    return -1 * pass_impact

In [None]:
def opt_func_2(params):

    min_prob = 0
    attack_prob, df_interception = model.get_pass_success_probability(
        params[0], params[1], specific_players=21
    )

    if attack_prob >= min_prob:
        pass_impact = get_pass_impact(df_interception)
    else:
        pass_impact = 0
    return -1 * pass_impact

In [None]:
def opt_func_3(params, player):

    attack_prob, df_interception = model.get_pass_success_probability(
        params[0], params[1], player
    )
    return -1 * attack_prob

In [None]:
model = PassProbabilityModeler()

In [None]:
model.set_current_frame(df_frame, "Away")

In [None]:
def optimize_full_team(opt_func, nb_iterations):

    tpe_algo = tpe.suggest
    space = [
        hp.uniform("angle", 0, 2 * np.pi),
        hp.choice("speed", np.arange(1, 21, dtype=int)),
    ]

    lst_results = []

    # TODO: Use optuna for early stopping
    for i in np.arange(1):
        tpe_trials = Trials()
        tpe_best = fmin(
            fn=opt_func,
            space=space,
            algo=tpe_algo,
            trials=tpe_trials,
            max_evals=nb_iterations,
        )

        tmp_results = pd.DataFrame(
            {
                "loss": [x["loss"] for x in tpe_trials.results],
                "iteration": tpe_trials.idxs_vals[0]["angle"],
                "angle": tpe_trials.idxs_vals[1]["angle"],
                "speed": tpe_trials.idxs_vals[1]["speed"],
            }
        )

        tmp_results["iteration"] = (100 * i) + tmp_results["iteration"]

        lst_results.append(tmp_results)

        if min([x["loss"] for x in tpe_trials.results]) < -1 + 0.001:
            break

    df_results = pd.concat(lst_results)
    df_results["speed"] += 1
    df_results.sort_values("loss", inplace=True)

    return df_results

In [None]:
player = 18

In [None]:
sers = optimize_full_team(lambda x: opt_func_3(x, player), 500)
sers["player"] = player
sers.rename(columns={"loss": "passingProb"}, inplace=True)
sers["passingProb"] = -1 * sers["passingProb"]
sers.drop_duplicates("player", inplace=True)

In [None]:
sers.drop_duplicates("player", inplace=True)

In [None]:
sers

In [None]:
sers

In [None]:
model.get_pass_success_probability(1.030188, 17, 15)

In [None]:
sers.head()

In [None]:
sers

In [None]:
math.cos(0.24)

In [None]:
sers.head()

In [None]:
model.get_pass_success_probability(3.2395, 14, specific_players=21)

In [None]:
tpe_results[tpe_results["loss"] < -0.0178]

In [None]:
np.max(d)

In [None]:
for player in model.attacking_players:
    print(player.playername)
    print(player.PIP)
for player in model.defending_players:
    print(player.playername)
    print(player.PIP)

In [None]:
model.ball_pos_per_frame

In [None]:
model.defending_players[0].id

In [None]:
# player params
V_max = 7.8
alpha = 1.3
lambda_player = 4.3
sigma = 0.45

eps = 0.00001

# ball params
m = 0.42
rho = 1.225
drag = 0.25
area = 0.038
dt = 0.04
mu = 0.55
g = 9.81

In [None]:
def find_t_max(T, r0, v0, angle):

    tmp_r = copy.deepcopy(r0)
    tmp_v = copy.deepcopy(v0)

    factor = -1 / m * rho * drag * area

    for i in np.arange(T):

        if i < 2 / 3 * T:
            tmp_a = factor * np.linalg.norm(tmp_v) * tmp_v
        else:
            tmp_a = -1 * mu * g * angle

        if any(np.abs(tmp_v) - np.abs(tmp_a * dt) < -1 * eps):
            return 1

        tmp_v = tmp_v + tmp_a * dt
        tmp_r = tmp_r + tmp_v * dt

    return -1


def get_t_max_for_different_speed(min_speed=1, max_speed=30):

    dict_t_max = {}
    for speed in np.arange(min_speed, max_speed + 1):

        r0 = np.array([0.0, 0.0])
        angle = np.array([1, 0])
        v0 = speed * angle

        min_val = 0
        max_val = 300
        mid_val = int((max_val + min_val) / 2)

        while True:
            tmp_val = find_t_max(mid_val, r0, v0, angle)
            if tmp_val == 1:
                max_val = mid_val
                mid_val = int((max_val + min_val) / 2)
            elif tmp_val == -1:
                min_val = mid_val
                mid_val = int((max_val + min_val) / 2)

            if min_val == mid_val:
                break

        dict_t_max[speed] = mid_val

    return dict_t_max

In [None]:
sers = get_t_max_for_different_speed()

In [None]:
sers

In [None]:
def compute_ball_position_at_every_time(r0, v0, t_max):
    tmp_r = copy.deepcopy(r0)
    tmp_v = copy.deepcopy(v0)

    factor = -1 / m * rho * drag * area

    r = [tmp_r]
    v = [tmp_v]
    a = []

    for i in np.arange(t_max):

        if i < 2 / 3 * t_max:
            tmp_a = factor * np.linalg.norm(tmp_v) * tmp_v
        else:
            tmp_a = -1 * mu * g * angle

        tmp_v = tmp_v + tmp_a * dt
        tmp_r = tmp_r + tmp_v * dt

        r.append(tmp_r)

    return r

In [None]:
dict_t_max = get_t_max_for_different_speed()

In [None]:
df_frame[df_frame["playerId"].isin([-1, 18])]

In [None]:
defending_players = get_team_players(df_frame, "Home")
attacking_players = get_team_players(df_frame, "Away")

df_ball = df_frame[df_frame["playerId"] == -1].iloc[0]
ball_position = np.array([df_ball["xPos"], df_ball["yPos"]])

In [None]:
def get_pass_success_probability(
    angle, speed, ball_position, attacking_players, defending_players, dict_t_max
):

    for player in attacking_players:
        player.PIP = 0

    for player in defending_players:
        player.PIP = 0

    # compute ball positions at the different times
    pass_direction = [math.cos(angle), math.sin(angle)]
    pass_direction = pass_direction / np.linalg.norm(pass_direction)
    v0 = speed * pass_direction
    r = compute_ball_position_at_every_time(ball_position, v0, dict_t_max[speed])

    pass_prob_att = np.zeros(250)
    pass_prob_def = np.zeros(250)
    r = np.array(r + [r[-1]] * (250 - len(r)))

    ball_in_play = True
    total_proba = 0

    t = 1

    while 1 - total_proba > 0.0001 and ball_in_play:

        ball_pos = r[t]

        for player in attacking_players:

            P_int = player.interception_probability(t, ball_pos)
            dPdT = (
                (1 - pass_prob_att[t - 1] - pass_prob_def[t - 1])
                * P_int
                * lambda_player
            )

            player.PIP += dPdT * dt
            pass_prob_att[t] += player.PIP

        for player in defending_players:

            P_int = player.interception_probability(t, ball_pos)
            dPdT = (
                (1 - pass_prob_att[t - 1] - pass_prob_def[t - 1])
                * P_int
                * lambda_player
            )

            player.PIP += dPdT * dt
            pass_prob_def[t] += player.PIP

        if (
            (ball_pos[0] > 105)
            or (ball_pos[0] < 0)
            or (ball_pos[1] > 68)
            or (ball_pos[1] < 0)
        ):
            ball_in_play = False
            pass_prob_def[t] = 1 - pass_prob_att[t - 1]
            pass_prob_att[t] = pass_prob_att[t - 1]

        total_proba = pass_prob_att[t] + pass_prob_def[t]

        t += 1

    return pass_prob_def, pass_prob_att, r, attacking_players, defending_players

In [None]:
d, a, r, attack, defense = get_pass_success_probability(
    0, 10, ball_position, attacking_players, defending_players, dict_t_max
)

In [None]:
for player in attack:
    print(player.playername)
    print(player.PIP)
for player in defense:
    print(player.playername)
    print(player.PIP)
print(r)

In [None]:
def get_pass_interception_points(angle, speed, df_frame, dict_t_max, min_prob=0):

    d, a, r = get_pass_success_probability(angle, speed, df_frame, dict_t_max)

    poss_intercept = a + d > 0.00001
    rel_a = a[poss_intercept]
    rel_d = d[poss_intercept]
    rel_r = r[poss_intercept]
    rel_t = np.arange(250)[poss_intercept] * 0.04

    last_prob = rel_d[-1] + rel_a[-1]
    rel_d[-1] /= last_prob
    rel_a[-1] /= last_prob

    if rel_a[-1] < min_prob:
        return rel_a[-1], None

    prob_a = np.insert(np.diff(rel_a), 0, rel_a[0])
    prob_d = np.insert(np.diff(rel_d), 0, rel_d[0])

    df_pass = pd.DataFrame()
    df_pass["defProba"] = prob_d
    df_pass["attProba"] = prob_a
    df_pass["time"] = rel_t
    df_pass["xPos"] = rel_r[:, 0]
    df_pass["yPos"] = rel_r[:, 1]
    df_pass["angle"] = angle
    df_pass["speed"] = speed

    return rel_a[-1], df_pass

In [None]:
a, b = get_pass_interception_points(2, 10, df_frame, dict_t_max, min_prob=0.1)

In [None]:
b

In [None]:
def get_pass_impact(df):
    df["distToGoal"] = np.sqrt(
        df["xPos"] * df["xPos"] + (df["yPos"] - 34) * (df["yPos"] - 34)
    )
    df["positionImpact"] = 1 / df["distToGoal"]
    pass_impact = sum(df["positionImpact"] * df["attProba"])
    return pass_impact

# Hyperopt

In [None]:
from hyperopt import Trials, fmin, hp, tpe

In [None]:
def opt_func_pass_prob(x):
    min_prob = 0.1
    attack_prob, sers = get_pass_interception_points(
        x, 10, df_frame, dict_t_max, min_prob=0.1
    )
    return -1 * attack_prob

    if attack_prob >= min_prob:
        pass_impact = get_pass_impact(sers)
    else:
        pass_impact = 0
    return -1 * pass_impact

In [None]:
def opt_func(params):
    min_prob = 0.1
    attack_prob, sers = get_pass_interception_points(
        params[0], params[1], df_frame, dict_t_max, min_prob=0.1
    )

    if attack_prob >= min_prob:
        pass_impact = get_pass_impact(sers)
    else:
        pass_impact = 0
    return -1 * pass_impact

In [None]:
tpe_algo = tpe.suggest
tpe_trials = Trials()
space = [
    hp.uniform("angle", 0, 2 * np.pi),
    hp.choice("speed", np.arange(1, 21, dtype=int)),
]

# Run 2000 evals with the tpe algorithm
tpe_best = fmin(
    fn=opt_func, space=space, algo=tpe_algo, trials=tpe_trials, max_evals=1000
)

print(tpe_best)
angle = tpe_best["angle"]
speed = tpe_best["speed"]
print(angle)
print(speed)

In [None]:
space = [
    hp.uniform("angle", 0, 2 * np.pi),
    hp.choice("speed", np.arange(1, 21, dtype=int)),
]

In [None]:
# Run 2000 evals with the tpe algorithm
tpe_best = fmin(
    fn=opt_func, space=space, algo=tpe_algo, trials=tpe_trials, max_evals=1000
)

print(tpe_best)
angle = tpe_best["angle"]
speed = tpe_best["speed"]
print(angle)
print(speed)

tpe_results = pd.DataFrame(
    {
        "loss": [x["loss"] for x in tpe_trials.results],
        "iteration": tpe_trials.idxs_vals[0]["angle"],
        "angle": tpe_trials.idxs_vals[1]["angle"],
        "speed": tpe_trials.idxs_vals[1]["speed"],
    }
)

tpe_results["speed"] += 1

print(tpe_results.head())

import plotly.express as px

fig = px.histogram(tpe_results, x="x", nbins=50)
fig.show()

In [None]:
for i in [0.95, 1, 1.05]:
    for j in [-1, 0, 1]:
        print(i)
        print(j)
        print(opt_func([angle * i, speed + j]))
        print("#############")

In [None]:
tpe_results = pd.DataFrame(
    {
        "loss": [x["loss"] for x in tpe_trials.results],
        "iteration": tpe_trials.idxs_vals[0]["angle"],
        "angle": tpe_trials.idxs_vals[1]["angle"],
        "speed": tpe_trials.idxs_vals[1]["speed"],
    }
)

tpe_results["speed"] += 1

tpe_results.head()

In [None]:
tpe_results["speed"].max()

In [None]:
len(tpe_results)

In [None]:
get_pass_interception_points(-3, 10, df_frame, dict_t_max, min_prob=2)[0]

In [None]:
math.cos(0.7)

In [None]:
math.sin(0.7)

In [None]:
opt_func(6)

In [None]:
math.sin(3.5)

In [None]:
import plotly.express as px

fig = px.histogram(tpe_results, x="x", nbins=50)
fig.show()