In [None]:
import pandas as pd

In [None]:
df = pd.read_csv("nba_games.csv", index_col=0)

In [None]:
df

In [None]:
df = df.sort_values("date")

In [None]:
df = df.reset_index(drop=True)

In [None]:
del df["mp.1"]
del df["mp_opp.1"]
del df["index_opp"]

In [None]:
def add_target(group):
    group["target"] = group["won"].shift(-1)
    return group


In [None]:
df = df.groupby("team", group_keys=False).apply(add_target)

In [None]:
df[df["team"] == "WAS"]

In [None]:
df["target"][pd.isnull(df["target"])] = 2
df["target"] = df["target"].astype(int, errors="ignore")

In [None]:
df["won"].value_counts()

In [None]:
nulls = pd.isnull(df).sum()

In [None]:
nulls = nulls[nulls > 0]

In [None]:
valid_columns = df.columns[~df.columns.isin(nulls.index)]

In [None]:
valid_columns

In [None]:
df = df[valid_columns].copy()

In [None]:
df

In [None]:
from sklearn.linear_model import RidgeClassifier
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.model_selection import TimeSeriesSplit

rr = RidgeClassifier(alpha=1)

split = TimeSeriesSplit(n_splits=3)

sfs = SequentialFeatureSelector(rr, 
                                n_features_to_select=30, 
                                direction="forward",
                                cv=split,
                                n_jobs=1
                               )

In [None]:
removed_columns = ["season", "date", "won", "target", "team", "team_opp"]
selected_columns = df.columns[~df.columns.isin(removed_columns)]

In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
df[selected_columns] = scaler.fit_transform(df[selected_columns])

In [None]:
df

In [None]:
sfs.fit(df[selected_columns], df["target"])

In [None]:
predictors = list(selected_columns[sfs.get_support()])

In [None]:
predictors

In [None]:
def backtest(data, model, predictors, start=2, step=1):
    all_predictions = []
    
    seasons = sorted(data["season"].unique())
    
    for i in range(start, len(seasons), step):
        season = seasons[i]
        train = data[data["season"] < season]
        test = data[data["season"] == season]
        
        model.fit(train[predictors], train["target"])
        
        preds = model.predict(test[predictors])
        preds = pd.Series(preds, index=test.index)
        combined = pd.concat([test["target"], preds], axis=1)
        combined.columns = ["actual", "prediction"]
        
        all_predictions.append(combined)
    return pd.concat(all_predictions)

In [None]:
predictions = backtest(df, rr, predictors)

In [None]:
from sklearn.metrics import accuracy_score

accuracy_score(predictions["actual"], predictions["prediction"])

In [None]:
df.groupby(["home"]).apply(lambda x: x[x["won"] == 1].shape[0] / x.shape[0])

In [None]:
df

In [None]:
selected_columns

In [None]:
df_rolling = df[list(selected_columns) + ["won", "team", "season"]]

def find_team_averages(team):
    rolling = team.rolling(10).mean()
    return rolling

df_rolling = df_rolling.groupby(["team", "season"], group_keys=False).apply(find_team_averages)

In [None]:
df_rolling

In [None]:
rolling_cols = [f"{col}_10" for col in df_rolling.columns]
df_rolling.columns = rolling_cols
df = pd.concat([df, df_rolling], axis=1)


In [None]:
df = df.dropna()

In [None]:
df

In [None]:
def shift_col(team, col_name):
    next_col = team[col_name].shift(-1)
    return next_col

def add_col(df, col_name):
    return df.groupby("team", group_keys=False).apply(lambda x: shift_col(x, col_name))

df["home_next"] = add_col(df, "home")
df["team_opp_next"] = add_col(df, "team_opp")
df["date_next"] = add_col(df, "date")

In [None]:
df

In [None]:
full = df.merge(
    df[rolling_cols + ["team_opp_next", "date_next", "team"]], 
        left_on=["team", "date_next"], 
        right_on=["team_opp_next", "date_next"])

In [None]:
full

In [None]:
full[["team_x", "team_opp_next_x", "team_y", "team_opp_next_y", "date_next"]]

In [None]:
removed_columns = list(full.columns[full.dtypes == "object"]) + removed_columns


In [None]:
removed_columns

In [None]:
selected_columns = full.columns[~full.columns.isin(removed_columns)]
sfs.fit(full[selected_columns], full["target"])

In [None]:
predictors = list(selected_columns[sfs.get_support()])

In [None]:
predictors

In [None]:
predictions = backtest(full, rr, predictors)

In [None]:
accuracy_score(predictions["actual"], predictions["prediction"])

In [None]:
"get data"

In [1]:
import os

SEASONS = list(range(2016,2023))

DATA_DIR = "data"
STANDINGS_DIR = os.path.join(DATA_DIR, "standings")
SCORES_DIR = os.path.join(DATA_DIR, "scores")

In [2]:
SEASONS

[2016, 2017, 2018, 2019, 2020, 2021, 2022]

In [3]:
# pip install pytest-playwright
# pip install --upgrade pip
# pip install playwright
# playwright install
# conda config --add channels conda-forge
# conda config --add channels microsoft
# conda install playwright
# playwright install

In [4]:
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeout
import time

In [5]:
async def get_html(url, selector, sleep=5, retries=3):
    html = None
    for i in range(1, retries+1):
        time.sleep(sleep * i)
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch()
                page = await browser.new_page()
                await page.goto(url)
                print(await page.title())
                html = await page.inner_html(selector)
        except PlaywrightTimeout:
            print(f"Timeout error on {url}")
            continue
        else:
            break
    return html

In [6]:
async def scrape_season(season):
    url = f"https://www.basketball-reference.com/leagues/NBA_{season}_games.html"
    html = await get_html(url, "#content .filter")
    
    soup = BeautifulSoup(html)
    links = soup.find_all("a")
    standings_pages = [f"https://www.basketball-reference.com{l['href']}" for l in links]
    
    for url in standings_pages:
        save_path = os.path.join(STANDINGS_DIR, url.split("/")[-1])
        if os.path.exists(save_path):
            continue
        
        html = await get_html(url, "#all_schedule")
        with open(save_path, "w+") as f:
            f.write(html)

In [7]:
for season in SEASONS:
    await scrape_season(season)

2015-16 NBA Schedule | Basketball-Reference.com
2015-16 NBA Schedule | Basketball-Reference.com


FileNotFoundError: [Errno 2] No such file or directory: 'data\\standings\\NBA_2016_games-october.html'

In [8]:
standings_files = os.listdir(STANDINGS_DIR)

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'data\\standings'

In [None]:
async def scrape_game(standings_file):
    with open(standings_file, 'r') as f:
        html = f.read()

    soup = BeautifulSoup(html)
    links = soup.find_all("a")
    hrefs = [l.get('href') for l in links]
    box_scores = [f"https://www.basketball-reference.com{l}" for l in hrefs if l and "boxscore" in l and '.html' in l]

    for url in box_scores:
        save_path = os.path.join(SCORES_DIR, url.split("/")[-1])
        if os.path.exists(save_path):
            continue

        html = await get_html(url, "#content")
        if not html:
            continue
        with open(save_path, "w+") as f:
            f.write(html)

In [None]:
import pandas as pd

for season in SEASONS:
    files = [s for s in standings_files if str(season) in s]
    
    for f in files:
        filepath = os.path.join(STANDINGS_DIR, f)
        
        await scrape_game(filepath)

In [None]:
"Parse data"

In [9]:
import os
import pandas as pd

SCORE_DIR = "data/scores"

In [10]:
box_scores = os.listdir(SCORE_DIR)
box_scores = [os.path.join(SCORE_DIR, f) for f in box_scores if f.endswith(".html")]

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'data/scores'

In [None]:
from bs4 import BeautifulSoup

def parse_html(box_score):
    with open(box_score) as f:
        html = f.read()

    soup = BeautifulSoup(html)
    [s.decompose() for s in soup.select("tr.over_header")]
    [s.decompose() for s in soup.select("tr.thead")]
    return soup

In [None]:
def read_season_info(soup):
    nav = soup.select("#bottom_nav_container")[0]
    hrefs = [a["href"] for a in nav.find_all('a')]
    season = os.path.basename(hrefs[1]).split("_")[0]
    return season

In [None]:
def read_line_score(soup):
    line_score = pd.read_html(str(soup), attrs = {'id': 'line_score'})[0]
    cols = list(line_score.columns)
    cols[0] = "team"
    cols[-1] = "total"
    line_score.columns = cols
    
    line_score = line_score[["team", "total"]]
    
    return line_score

In [None]:
def read_stats(soup, team, stat):
    df = pd.read_html(str(soup), attrs = {'id': f'box-{team}-game-{stat}'}, index_col=0)[0]
    df = df.apply(pd.to_numeric, errors="coerce")
    return df

In [None]:
games = []
base_cols = None
for box_score in box_scores:
    soup = parse_html(box_score)

    line_score = read_line_score(soup)
    teams = list(line_score["team"])

    summaries = []
    for team in teams:
        basic = read_stats(soup, team, "basic")
        advanced = read_stats(soup, team, "advanced")

        totals = pd.concat([basic.iloc[-1,:], advanced.iloc[-1,:]])
        totals.index = totals.index.str.lower()

        maxes = pd.concat([basic.iloc[:-1].max(), advanced.iloc[:-1].max()])
        maxes.index = maxes.index.str.lower() + "_max"

        summary = pd.concat([totals, maxes])
        
        if base_cols is None:
            base_cols = list(summary.index.drop_duplicates(keep="first"))
            base_cols = [b for b in base_cols if "bpm" not in b]
        
        summary = summary[base_cols]
        
        summaries.append(summary)
    summary = pd.concat(summaries, axis=1).T

    game = pd.concat([summary, line_score], axis=1)

    game["home"] = [0,1]

    game_opp = game.iloc[::-1].reset_index()
    game_opp.columns += "_opp"

    full_game = pd.concat([game, game_opp], axis=1)
    full_game["season"] = read_season_info(soup)
    
    full_game["date"] = os.path.basename(box_score)[:8]
    full_game["date"] = pd.to_datetime(full_game["date"], format="%Y%m%d")
    
    full_game["won"] = full_game["total"] > full_game["total_opp"]
    games.append(full_game)
    
    if len(games) % 100 == 0:
        print(f"{len(games)} / {len(box_scores)}")

In [None]:
games_df = pd.concat(games, ignore_index=True)

In [None]:
games_df

In [None]:
games_df.to_csv("nba_games.csv")