In [1]:
#Import Libraries
import sqlite3
import pandas as pd
from collections import defaultdict
import matplotlib.pyplot as plt
import math
import numpy as np
from scipy.stats import gaussian_kde
import re
import numpy as np
from sklearn.linear_model import LogisticRegression

from scipy.stats import poisson

In [2]:
#Sqlite Connection
conn = sqlite3.connect("NBA_db.db")

In [94]:
#getters for DB
def getAllPlayers():
    df = pd.read_sql_query(f"SELECT * from Players",conn)
    return df

def getPlayer(Column,Value):
    df = pd.read_sql_query(f"SELECT * from Players where {Column} = '{Value}'",conn)
    return df

def getBoxScore(Column,Value):
    
    df1 = pd.read_sql_query(f"SELECT * from BoxScoreBasic where {Column} = '{Value}'",conn)
    df2 = pd.read_sql_query(f"SELECT * from BoxScoreAdvanced where {Column} = '{Value}'",conn)
    return df1,df2

def getPlayByPlay(Token):
    df = pd.read_sql_query(f"SELECT * from PlayByPlay where Token = '{Token}'",conn)
    return df

def getPlayByPlayIn(Column,values):
    placeholders = ','.join(['?'] * len(values))  # Create placeholders for query
    query = f"SELECT * FROM PlayByPlay WHERE {Column} IN ({placeholders})"
    return pd.read_sql(query, conn, params=values)

def getSchedule():
    df = pd.read_sql_query(f"SELECT * from Schedule",conn)
    return df

def get_play_by_type(play_type,token):
    # Dictionary of play type keywords
    play_keywords = ["makes 2-pt","makes 3-pt","rebound","makes","misses","offensive rebound", "defensive rebound","assists","turnover","foul","makes free throw","misses free throw", "steal","block","jump ball","enters","timeout","violation"]
    play_keywords = set(play_keywords)
    

    # Validate play type
    if play_type not in play_keywords:
        raise ValueError(f"Invalid play type: {play_type}. Choose from {play_keywords}")

    # Query database
    if token=="*":
        query = """
        SELECT * FROM PlayByPlay
        WHERE (AwayTeamPlay LIKE ? OR HomeTeamPlay LIKE ?);
        """
        params = (f"%{play_type}%", f"%{play_type}%")
        df = pd.read_sql(query, conn, params=params)
    else:
        query = """
        SELECT * FROM PlayByPlay
        WHERE Token LIKE ? 
        AND (AwayTeamPlay LIKE ? OR HomeTeamPlay LIKE ?);
        """
        params = (f"%{token}%", f"%{play_type}%", f"%{play_type}%")
        df = pd.read_sql(query, conn, params=params)


    
    return df


In [4]:
#Helper Functions

def minutesPlayedHelper(mp):
    mp = mp.split(':')
    return float(mp[0])+float(mp[1])/60

def timeHelper(t,qtr):
    t = t.split(':')
    t = float(t[0])+(float(t[1])/60)
    return ((float(qtr[0])-1)*12)+(12-t)

def playerBoxScoreHelper(Player,Game_Type,column):
    df1,df2 = getBoxScore("Player",Player)
    df = df1 if column in df1.columns.tolist() else df2
    df = df[df.Game_Type == f"{Game_Type}"]
    df = df[df.MP.str.contains(':')]
    if column == "MP":
            df["MP"] = df["MP"].apply(minutesPlayedHelper)
    return df[column]

def playerNameHelper(name):
    
    first_initial, last_name = name.split(". ")
    first_initial = first_initial.strip()  # Extract the letter before '.'
    df = getAllPlayers()
    # Filtering based on last name and first initial match
    matches = df[df["Player"].apply(lambda x: 
        last_name in x.split() and  # Match last name
        (x.startswith(first_initial) or x.startswith(first_initial + "."))  # Match first initial or "J."
    )]
    
    return matches["Player"].tolist()


In [78]:
#PlayByPlay Functions

# Returns all palyer intervals for the whole game
def playerIntervals(Token):
    tracker = {}
    lastSeen = {}
    df,_ = getBoxScore("Token",Token)
    df = df[(df.Game_Type == "game") & (df.BenchStatus == "S")]
    
    for player in df["Player"]:
        
        tracker[player]=[[0,-1]]
    df = getPlayByPlay(Token)
    df = df[df.AwayTeamPlay.str.contains(" enters the game for ") | df.HomeTeamPlay.str.contains(" enters the game for ") ]

    for play in df.itertuples():
        
        if " enters the game for " in play.AwayTeamPlay :
            
            subs = play.AwayTeamPlay.split(" enters the game for ")
            subs = [playerNameHelper(s)[0] for s in subs] 
        else:
            subs = play.HomeTeamPlay.split(" enters the game for ")
            subs = [playerNameHelper(s)[0] for s in subs] 
        
        t = timeHelper(play.Time,play.Quarter)
        
        if subs[1] not in tracker:
            tracker[subs[1]] = [[timeHelper("12:0",play.Quarter),t]]
        else:
            tracker[subs[1]][-1][1] = t
        if subs[1] in lastSeen:
            del lastSeen[subs[1]]

        if subs[0] in tracker:
            if subs[0] in lastSeen:
                tracker[subs[0]][-1][1] = timeHelper("0:0",lastSeen[subs[0]])
            tracker[subs[0]].append([t,-1])
        else:
            tracker[subs[0]] = [[t,-1]]
        lastSeen[subs[0]] = play.Quarter
    
    for k in lastSeen:
        tracker[k][-1][1] = timeHelper("0:0",lastSeen[k])
    return tracker

# Returns players with the same time interval for the input player
def playersOnCourt(player,Token):
    
    result = {}
    playing_intervals = playerIntervals(Token)
    player_intervals = playing_intervals.get(player, [])

    for other_player, intervals in playing_intervals.items():
        if other_player == player:
            continue 
   
        overlapping_intervals = []
        for p_start, p_end in player_intervals:
            for o_start, o_end in intervals:
                
                overlap_start = max(p_start, o_start)
                overlap_end = min(p_end, o_end)
                
                if overlap_start < overlap_end:  
                    overlapping_intervals.append([overlap_start, overlap_end])
        
        if overlapping_intervals:
            result[other_player] = overlapping_intervals

    return result

#Returns players based on a currentTime
def PlayersOnCourtAtTime(currentTime, token):
    tracker = playerIntervals(token)
    playersOnCourt = []
    
    for player, intervals in tracker.items():
        for interval in intervals:
            start, end = interval
            if start <= currentTime <= end:
                playersOnCourt.append(player)
                break

        if len(playersOnCourt) == 10:
            break
    
    return playersOnCourt

# Returns plays that occured while the player was on court
def playsOnCourt(token,Player):
    intervals = playerIntervals(token)[Player]
    plays = getPlayByPlay(token)
    plays = plays[plays.apply(lambda row: any(i[0] <= timeHelper(row.Time, row.Quarter) <= i[1] for i in intervals), axis=1)]
    return plays

# Return plays that occured with two players on the court
def playOnCourtWithPlayer(player,onCourtPlayer,Token):
    result = playersOnCourt(player,Token)

    result = result[onCourtPlayer]

    filteredDf = pd.DataFrame()
    df = getPlayByPlay(Token)
    for r in result:
        for play in df.itertuples():
            if r[0]<=timeHelper(play.Time,play.Quarter)<=r[1]:
                filteredDf = pd.concat([filteredDf, pd.DataFrame([play._asdict()])], ignore_index=True)
    return filteredDf

#Returns plays for a particular player
def playByPlayer(token,player,play_type):
    player = player.split()
    player = player[0][0]+". "+player[1]
    plays = get_play_by_type(play_type,token)
    plays = plays[(plays.HomeTeamPlay.str.contains(player))|(plays.AwayTeamPlay.str.contains(player))]
    return plays

#Returns timestamps of a play by a player
def playTypeTimeStamps(Player,play_type):

    previous_timestamps = []
    df = playByPlayer("*",Player,play_type)
    groups = df.groupby("Token")
    for name, group in groups:
        previous_timestamps.append(group.apply(lambda row: timeHelper(row.Time, row.Quarter), axis=1).to_list())
    return previous_timestamps

def playDetails(play):
    details = {}

    if "Jump ball" in play:
        match = re.findall(r'Jump ball:\s*(.*?)\s+vs\.\s+(.*?)\s*\((.*?) gains possession\)', play)
        if match:
            details["Players Contesting"] = [match[0][0],match[0][1]]
            details["Player Possesion"] = [match[0][2]]

    if "makes" in play or "misses" in play:
        if "at rim" in play:
            match = re.findall(r'([\w\.\- ]+) ([\w\.\- ]+) (\d-pt) ([\w\s]+) at rim(?: \(([\w\.\- ]+) by ([\w\.\- ]+)\))?',play)
            details["Player shot"]= match[0][0]
            details["Shot Made"] = True if match[0][1] == "makes" else False
            details["Shot"] = match[0][2]
            details["Shot Type"] = match[0][3]
            if match[0][4]!="":
                details[f"Player {match[0][4]}"] = match[0][5]
        elif "free throw" in play:
            if "technical" in play:
                match = re.findall(r'([\w\.\- ]+) ([\w\.\- ]+) technical free throw',play)
                details["Player shot"]= match[0][0]
                details["Shot Made"] = True if match[0][1] == "makes" else False
                details["Free throw Type"] = "technical"
            else:
                match = re.findall(r'([\w\.\- ]+) ([\w\.\- ]+) free throw (\d) of (\d)',play)
                details["Player shot"]= match[0][0]
                details["Shot Made"] = True if match[0][1] == "makes" else False
                details["Free Throw attempt"] = match[0][2]
                details["Free Throw total"] = match[0][3]
        else:
            match = re.findall(r'([\w\.\- ]+) ([\w\.\- ]+) (\d-pt) ([\w\s]+) from (\d+) ft(?: \(([\w\.\- ]+) by ([\w\.\- ]+)\))?',play)
            details["Player shot"]= match[0][0]
            details["Shot Made"] = True if match[0][1] == "makes" else False
            details["Shot"] = match[0][2]
            details["Shot Type"] = match[0][3]
            details["Distance"] = match[0][4]
            if match[0][5]!="":
                details[f"Player {match[0][5]}"] = match[0][6]

    if "rebound" in play:
        match = re.findall(r'([\w\.\- ]+) rebound by ([\w\.\- ]+)', play)
        if match:
            details["Rebound Type"] = match[0][0]
            details["Player Rebound"]= match[0][1]
    
    if "foul" in play and "Turnover" not in play:
        if "drawn" in play:
            match = re.findall(r'([\w\s]+) foul by ([\w\.\- ]+) \(drawn by ([\w\.\- ]+)\)', play)
        else:
            match = re.findall(r'([\w\s]+) foul by ([\w\.\- ]+)',play)
        
        if match:
            details["Foul Type"] = match[0][0] 
            details["Player by"] = match[0][1]
            if len(match[0]) > 2 :
                details["Player drawn"] = match[0][2] 
    if "enter" in play:
        match = re.findall(r'([\w\.\- ]+) enters the game for ([\w\.\- ]+)',play)
        if match:
            details["Player In"] = match[0][0] 
            details["Player Out"] = match[0][1]
    if "timeout" in play:
        match = re.findall(r'([\w\.\- ]+) full timeout',play)
        if match:
            details["Team"] = match[0] 
    if "Turnover" in play:
        if "steal" in play:
            play = play.split(";")
            match = re.findall(r'Turnover by ([\w\.\- ]+) \(([\w\.\- ]+)',play[0])
            details["Player Turnover"] = match[0][0]
            details["Turnover Type"] = match[0][1]
            match = re.findall(r' steal by ([\w\.\- ]+)\)',play[1])
            details["Player Steal"] = match[0]
        else:
            match = re.findall(r'Turnover by ([\w\.\- ]+) \(([\w\.\- ]+)\)',play)
            details["Player Turnover"] = match[0][0]
            details["Turnover Type"] = match[0][1]
            
    return details
#playDetails("Turnover by D. Å\xa0aric (bad pass; steal by K. Huerter)")
def playerCombinations(token):
    player_intervals = playerIntervals(token)
    time_points = set()
    for intervals in player_intervals.values():
        for start, end in intervals:
            time_points.add(start)
            time_points.add(end)
    time_points = sorted(time_points) 
    unique_combinations = set()
    for i in range(len(time_points) - 1):
        current_time = time_points[i]
        active_players = set()
        for player, intervals in player_intervals.items():
            for start, end in intervals:
                if start <= current_time < end:
                    active_players.add(player)
                    break
        unique_combinations.add(frozenset(active_players))
    return [set(players) for players in unique_combinations]

def playNext(player,play_type):
    curr = playByPlayer("*",player,play_type)
    df = getPlayByPlayIn("PlayByPlayID",[i+1 for i in list(curr.PlayByPlayID)])
    return df


IndexError: list index out of range

In [7]:

#Probablity functions:

def poisson_probability(lmbda, k):
    return (lmbda**k * math.exp(-lmbda)) / math.factorial(k)

def probablityOfOccurance(Player,stat,currentStat,currentTime):
    lmbda = getPlayer("Player",Player)[stat].mean()
    lmbda_remaining = lmbda * ((48.0 - currentTime) / 48.0)
    cumulative_prob = sum(poisson_probability(lmbda_remaining, k) for k in range(currentStat + 1))
    return 1 - cumulative_prob

def probabilityOfOccurrenceWithPrevData(player, play_type, current_stat, current_time):
   
    previous_timestamps = playTypeTimeStamps(player,play_type)
    total_occurrences = sum(len(timestamps) for timestamps in previous_timestamps)
    total_game_time = len(previous_timestamps) * 48.0  
    lambda_rate = total_occurrences / total_game_time if total_game_time > 0 else 0
    remaining_time = 48.0 - current_time
    current_rate = current_stat / current_time if current_time > 0 else 0
    adjusted_lambda = (lambda_rate + current_rate) / 2 * remaining_time 
    probability = 1 - poisson.pmf(0, adjusted_lambda)
    return probability

def probablityNextPlay(player,play):
    curr = playByPlayer("*",player, play)
    
    play_keywords = {
        "makes 2-pt": 0,
        "makes 3-pt": 0,
        "rebound": 0,
        "makes": 0,
        "misses": 0,
        "offensive rebound": 0,
        "defensive rebound": 0,
        "assists": 0,
        "turnover": 0,
        "foul": 0,
        "makes free throw": 0,
        "misses free throw": 0,
        "steal": 0,
        "block": 0,
        "jump ball": 0
    }

    def parse_play(play_str):
        play_str = str(play_str).lower()
        res = []
        for keyword in play_keywords:
            if keyword in play_str:
                res.append(keyword)
        return res
    total = 0
    for i, c in curr.iterrows():
        n = getPlayByPlayIn("PlaybyPlayId", [c.PlayByPlayID + 1])
        
        next_play = None
        for team_play in ['HomeTeamPlay', 'AwayTeamPlay']:
            play_str = n[team_play].values[0]
            parsed = parse_play(play_str)
            if parsed:
                next_play = parsed
                break
        if next_play:
            total+=1
            for next in next_play:
                if next in play_keywords:
                    play_keywords[next] += 1

    #print(f"Next play probabilities after {player} {play}:")
    probablities = {}
    for k, v in play_keywords.items():
        probablities[k] = v/total
        #if v > 0:
            #print(f"{k}: {v/total:.2%}")
    return probablities