![](pngs/nba_html_to_mongodb.png)
1. Define Foundation
2. Player Data to MongoDB Cloud
<br> - 1/2 MVP + Defensive Player of the Year
<br> - 2/2 All NBA Team + All Defensive Team
3. Team Data to MongoDB Cloud
---

### ➤ 1 Define Foundation 

In [1]:
import os
import sys
from urllib.parse import urljoin

import numpy as np
import pandas as pd
import pymongo
from bs4 import BeautifulSoup

In [2]:
MAIN_URL = r"https://www.basketball-reference.com/"
ALPHABET_URL = r"https://www.basketball-reference.com/players/"

DATA_PATH = r"C:\Users\knaue\Documents\Data\NBA"
PLAYER_HTML_PATH = os.path.join(DATA_PATH, "PLAYER_HTML")
AWARD_HTML_PATH = os.path.join(DATA_PATH, "AWARD_HTML")
SEASON_HTML_PATH = os.path.join(DATA_PATH, "SEASON_HTML")

PLAYER_PATH = os.path.join(DATA_PATH, "Player_Urls.csv")
AWARD_PATH = os.path.join(DATA_PATH, "Award_Urls.csv")
SEASON_PATH = os.path.join(DATA_PATH, "Season_Urls.csv")

PARSER = 'lxml'
ONLY_ACTIVE_PLAYER = None 

USERNAME = "" # YOUR USERNAME
PASSWORD = "" # YOUR PASSWORD
DB_NAME = "nba"
# PLAYER
COLLECTION_PLAYER = "player"
PLAYER_TABLE_IDS = ["per_game", "playoffs_per_game", "advanced", "playoffs_advanced", "totals"] # field names in document
PLAYER_FIELD_STANDARD_LIST = ["name", "position", "height", "weight", "hall_of_fame", "active"]
PLAYER_FIELD_CHAMPION = "champion"
PLAYER_FIELD_MVP = "mvp"
PLAYER_FIELD_DPOY = "dpoy"
PLAYER_FIELD_ALL_NBA = "all_nba"
PLAYER_FIELD_ALL_DEFENSIVE = "all_defensive"
# TEAM SEASONS
COLLECTION_TEAM = "team"
TEAM_TABLE_IDS = ['per_game-team', 'per_game-opponent', 'advanced-team']
TEAM_FIELD_PLAYOFF = "playoff"
TEAM_FIELD_CONFERENCE = "conference"

In [3]:
def season_to_int(cell_value: str):
    if cell_value[-2:] == "00":
        return (int(cell_value[:2]) + 1)*100
    else:
        return int(cell_value[:2] + cell_value[-2:])   

In [4]:
class MongoDBAgent:
    name = "MongoDBAgent"

    def __init__(self, con_string: str, db: str):
        self.__client = pymongo.MongoClient(con_string)
        self.__db = self.__client[db]
        self.__connect_db()


    def __connect_db(self):
        self.__client.server_info()


    def find(self, collection_str: str, query: dict, count=False):
        collection = self.__db[collection_str]
        documents = collection.find(query)
        if count: return collection.count_documents(query)
        if collection.count_documents(query) == 0: return None
        return documents


    def insert_one(self, collection_str: str, data: dict):
        collection = self.__db[collection_str]
        return_statement = collection.insert_one(data)


    def update_one(self, collection_str: str, filter: dict, data):
        collection = self.__db[collection_str]
        collection.update_one(filter=filter, update=data) 

### ➤ 2 Player Data to MongoDB Cloud

In [5]:
mongodb_agent = MongoDBAgent(con_string=f"mongodb+srv://{USERNAME}:{PASSWORD}@maincluster.grb4d.mongodb.net/test", db=DB_NAME)

In [6]:
df_player = pd.read_csv(PLAYER_PATH, encoding="utf-8-sig", index_col=False)

if ONLY_ACTIVE_PLAYER != None:
    df_player = df_player[df_player['Active'] == ONLY_ACTIVE_PLAYER]

i=0
for player, pos, ht, wt, h_o_f, active, url, path in df_player[['Player', 'Pos', 'Ht', 'Wt', 'Hall_of_Fame', 'Active', 'Url', 'Path']].values:
    i += 1
    sys.stdout.write(f"\r{i}/{len(df_player)}...")
    
    # Normal Stats
    for field, value in zip(PLAYER_FIELD_STANDARD_LIST, [player, pos, ht, wt, h_o_f, active]):
        mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$set": {field: value} })
    
    with open(path, 'r', encoding="utf-8-sig") as f:
        content = BeautifulSoup(f.read(), PARSER)
        for table_id in PLAYER_TABLE_IDS: 
            table = content.find("table", id=table_id)
            if table == None: continue
            
            # Filter Row/Columns
            df_table = pd.read_html(str(table))[0]
            df_table = df_table[df_table['Season'].notna()]
            df_table.drop([col for col in df_table.columns if "Unnamed:" in col], axis="columns", inplace=True)
            
            # Season
            df_table = df_table[df_table['Season'].str.contains('-')] 
            df_table['Season'] = df_table['Season'].apply(lambda x: season_to_int(x))
            
            # Lg
            df_table['Lg'] = df_table['Lg'][df_table['Lg'] == "NBA"]
            
            # Tm
            team_ids = []
            for tr in table.find("tbody").find_all("tr")[:len(df_table)]:
                td = tr.find("td", attrs={"data-stat":"team_id"})
                
                if td == None:
                    team_ids.append(urljoin(MAIN_URL, "DidNotPlay")) 
                    continue
                if td.a == None:
                    team_ids.append(urljoin(MAIN_URL, td.text))
                    continue
                team_ids.append(urljoin(MAIN_URL, td.a['href']))
             
            df_table.insert(loc=3, column='Tm_id', value=team_ids)
            
            # Insert/Update
            player_count = mongodb_agent.find(collection_str=COLLECTION_PLAYER, query={"player_id": url}, count=True) 
            if player_count == 0:
                mongodb_agent.insert_one(collection_str=COLLECTION_PLAYER, data={"player_id": url, table_id: df_table.to_dict("records")}) 
            else: 
                mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$set": {table_id: df_table.to_dict("records")} })

            # Champions
            if table_id == "playoffs_per_game": 
                for span in table.find("tbody").findAll("span", class_="sr_ring"):
                    mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$addToSet": {PLAYER_FIELD_CHAMPION: {"Season": season_to_int(span.previous)} } })    
            
        f.close()

5023/5023...

### ➤ 2 Player Data to MongoDB Cloud
#### 1/2 MVP + Defensive Player of the Year

In [7]:
df_award = pd.read_csv(AWARD_PATH, encoding="utf-8-sig")

i=0
for season, path in df_award[['Season', 'Voting_Path']].values[:-2]:
    i += 1
    sys.stdout.write(f"\r{i}/{len(df_award) - 2}...")
    
    with open(path, 'r', encoding="utf-8-sig") as f:
        content = BeautifulSoup(f.read(), PARSER)
        
        for table_id in ["mvp", "dpoy"]:
            table = content.find("table", id=table_id)
            
            if table == None and table_id == "mvp": 
                table = content.find("table", id=f"nba_{table_id}")
            if table == None: 
                continue
            
            df_table = pd.read_html(str(table))[0]
            df_table = df_table.droplevel(0, axis=1)
            df_table = df_table[['Rank', 'Player', 'Share']]
            if df_table['Rank'].dtype != np.int64:
                df_table['Rank'] = df_table['Rank'].apply(lambda cell: int(cell.replace("T", "")))
            
            player_urls = []
            for td in table.find("tbody").find_all("td", attrs={"data-stat":"player"}):
                player_urls.append(urljoin(MAIN_URL, td.a['href']))
                
            df_table['Player_Urls'] = player_urls
            for rk, share, url in df_table[['Rank', 'Share', 'Player_Urls']].values:
                if table_id == "mvp":
                    mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$addToSet": {PLAYER_FIELD_MVP: {"Season": int(season), "Rank": rk, "Share": share}} })
                else:
                    mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$addToSet": {PLAYER_FIELD_DPOY: {"Season": int(season), "Rank": rk, "Share": share}} })
    f.close()

67/67...

### ➤ 2 Player Data to MongoDB Cloud
#### 2/2 All NBA Team + All Defensive Team

In [8]:
df_award = pd.read_csv(AWARD_PATH, encoding="utf-8-sig")
for field, path in df_award[['Season', 'Voting_Path']].values[-2:]:
    with open(path, 'r', encoding="utf-8-sig") as f:
        content = BeautifulSoup(f.read(), PARSER)
        if field == "All_NBA":
            print(field)
            table = content.find("table", id="awards_all_league")
        elif field == "All_DEFENSIVE":
            print(f"\n{field}")
            table = content.find("table", id="awards_all_defense") 
            
        df_table = pd.read_html(str(table))[0]
        
        df_table = df_table[(df_table['Season'].notna()) & (df_table['Lg'] == "NBA") & ((df_table['Tm'] == "1st") | (df_table['Tm'] == "2nd") | (df_table['Tm'] == "3rd"))]
        df_table.drop([col for col in df_table.columns if col in ['Lg', 'Tm', 'Voting']], axis="columns", inplace=True)
        df_table['Season'] = df_table['Season'].apply(lambda x: season_to_int(x))
        
        list_tr = table.find("tbody").findAll("tr")
        list_tr = [tr for idx, tr in enumerate(list_tr) if idx in list(df_table.index)]
        
        j=0
        for tr, season in zip(list_tr, df_table['Season'].values):
                j += 1
                sys.stdout.write(f"\r{j}/{len(df_table)}...")
                
                start = 1
                end = start + 5
                while tr.find("td", class_="left", attrs={"data-stat":str(start)}) == None: 
                    start = end
                    end = start + 5
                
                for i in range(start,end,1): 
                        td = tr.find("td", class_="left", attrs={"data-stat":str(i)})
                        if td == None: 
                            print("break")
                            break
                        url = urljoin(MAIN_URL, td.a['href'])
                        if field == "All_NBA":
                            mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$addToSet": {PLAYER_FIELD_ALL_NBA: {"Season": int(season)} } })
                        if field == "All_DEFENSIVE":  
                            mongodb_agent.update_one(collection_str=COLLECTION_PLAYER, filter={"player_id": url}, data={ "$addToSet": {PLAYER_FIELD_ALL_DEFENSIVE: {"Season": int(season)} } })                
        f.close()

All_NBA
180/180...
All_DEFENSIVE
108/108...

### ➤ 3 Team Data to MongoDB Cloud

In [9]:
def get_season_summary(season: int, lg: str, url: str):
    html_path = os.path.join(SEASON_HTML_PATH, url.replace("/", "{").replace(":", "}"))
    with open(html_path, 'r', encoding="utf-8-sig") as f:
        content = BeautifulSoup(f.read(), PARSER)
        
        # Conference 
        for conference, table_id in [("East", "divs_standings_E"), ("West", "divs_standings_W")]:
            table = content.find("table", id=table_id)
            
            # Before 1970
            if table == None: 
                curr_conference = conference 
                table = content.find("table", id="divs_standings_") 
                
                for tr in table.find("tbody").findAll("tr"):
                    if tr['class'][0] == "thead" and "East" in tr.text: curr_conference = "East"
                    elif tr['class'][0] == "thead" and "West" in tr.text: curr_conference = "West" 
                    
                    if tr['class'][0] == "full_table":
                        mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": urljoin(MAIN_URL, tr.a['href'])}, data={ "$set": {TEAM_FIELD_CONFERENCE: curr_conference} })  
                break
            
            # Until 1971
            for th in table.find("tbody").findAll("th", class_="left", attrs={"scope":"row", "data-stat": "team_name"}):
                mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": urljoin(MAIN_URL, th.a['href'])}, data={ "$set": {TEAM_FIELD_CONFERENCE: conference} })
        
        
        for table_id in TEAM_TABLE_IDS: 
            table = content.find("table", id=table_id)
            df_team = pd.read_html(str(table))[0]

            # Change advanced-team columns
            if table_id == 'advanced-team': 
                df_team.columns = [col[1] if 'Unnamed:' in col[0] else '|'.join([str(level_col) for level_col in col]) for col in df_team.columns]
            
            # Filter Row/Columns
            df_team.drop([col for col in df_team.columns if "Unnamed:" in col], axis="columns", inplace=True)
            del df_team['Rk']
            del df_team['Team']
            df_team = df_team[:-1]
            
            # Change per_game-opponent columns
            if table_id == 'per_game-opponent': 
                df_team.columns = [f"{col}_opp" for col in df_team.columns]
                
            # Get team url -> team id    
            teams_url = []
            for td in table.find("tbody").findAll("td", class_="left", attrs={"data-stat":"team"}):
                teams_url.append(urljoin(MAIN_URL, td.a['href']))
                
            if len(teams_url) != len(df_team):
                ValueError()
            
            # Insert/Update
            for team_url in teams_url:
                team_count = mongodb_agent.find(collection_str=COLLECTION_TEAM, query={"team_id": team_url}, count=True) 
                if team_count == 0:
                    mongodb_agent.insert_one(collection_str=COLLECTION_TEAM, data={"team_id": team_url, "season": int(season), "lg": lg}) 
                
            for team_url, team_dict in zip(teams_url, df_team.to_dict("records")):
                mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": team_url}, data={ "$set": {table_id: team_dict} })

        f.close()

In [10]:
def get_season_standing(champ: str, url: str):
    html_path = os.path.join(SEASON_HTML_PATH, url.replace("/", "{").replace(":", "}"))
    table_id = "expanded_standings"
    with open(html_path, 'r', encoding="utf-8-sig") as f:
        content = BeautifulSoup(f.read(), PARSER)
        table = content.find("table", id=table_id)
        df_team = pd.read_html(str(table))[0]
        
        # Filter Row/Columns
        df_team = df_team.droplevel(0, axis=1)
        df_team.drop([col for col in df_team.columns if col not in ['Rk', 'Team', 'Overall', 'Home', 'Road', 'Pre', 'Post', '≤3', '≥10', 'Oct', 'Nov', 'Dec', 'Jan', 'Feb', 'Mar', 'Apr']],
                     axis="columns",
                     inplace=True)
        
        # Season
        df_team.rename(columns={'Rk':'Rk_Season'}, inplace=True)
        
        # Champion
        champ_list = df_team['Team'].str.contains(champ).to_list()
        
        # Team
        team_list = df_team.pop(item="Team")
        
        # Get team url -> team id
        teams_url = []
        for td in table.find("tbody").findAll("td", class_="left", attrs={"data-stat":"team_name"}):
            teams_url.append(urljoin(MAIN_URL, td.a['href']))
            
        if len(teams_url) != len(df_team) != len(champ_list) != len(team_list):
            ValueError()
        
        # Insert/Update
        for team_url, team_dict, champ_bool, team_name in zip(teams_url, df_team.to_dict("records"), champ_list, team_list):
            mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": team_url}, data={ "$set": {"name": team_name}})
            mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": team_url}, data={ "$set": {"champion": champ_bool}})
            mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": team_url}, data={ "$set": {table_id: team_dict} })
        
        f.close()

In [11]:
def champion_in_percent(cell: float):
    if cell == 0: return 0 
    if cell == 1: return 100
    if cell == 2: return 50
    if cell == 3 or cell == 4: return 25
    if cell >= 5 and cell <= 8: return 12.5
    if cell >= 9 and cell <= 16: return 6.25
    if cell >= 17 and cell <= 32: return 3.125

In [12]:
def get_win(cell: str)-> int:
    if pd.isna(cell):
        return 0
    return int(cell.split('-')[0])

In [13]:
def get_playoff_standing(url: str):
    html_path = os.path.join(SEASON_HTML_PATH, url.replace("/", "{").replace(":", "}"))
    table_id = "expanded_standings"
    with open(html_path, 'r', encoding="utf-8-sig") as f:
        content = BeautifulSoup(f.read(), PARSER)
        table = content.find("table", id=table_id)
        df_team = pd.read_html(str(table))[0]

        # Filter Row/Columns
        df_team = df_team.droplevel(0, axis=1)
        df_team.drop([col for col in df_team.columns if col not in ['Rk', 'Overall']],
                     axis="columns", 
                     inplace=True)
        
        # Champion Percent
        df_team['Champion_Percent'] = df_team['Rk'].apply(lambda cell: champion_in_percent(cell=cell))
        
        # Champion Win share
        df_team['Overall'] = df_team['Overall'].apply(lambda cell: get_win(cell))
        max_wins = df_team['Overall'].values[0]
        df_team['Overall'] = df_team['Overall'].apply(lambda cell: cell/max_wins)
        df_team.rename(columns={'Overall':'Champion_Win_Share'}, inplace=True)
        
        # Get team url -> team id
        teams_url = []
        for td in table.find("tbody").findAll("td", class_="left", attrs={"data-stat":"team_name"}):
            teams_url.append(urljoin(MAIN_URL, td.a['href']))
            
        if len(teams_url) != len(df_team):
            ValueError()
        
        # Insert/Update
        for team_url, team_dict in zip(teams_url, df_team.to_dict("records")):
            mongodb_agent.update_one(collection_str=COLLECTION_TEAM, filter={"team_id": team_url}, data={ "$set": {TEAM_FIELD_PLAYOFF: team_dict} })
        
        f.close()

In [14]:
df = pd.read_csv(SEASON_PATH, encoding="utf-8-sig")

# SEASON SUMMARY
print("Season Summary")
i = 0
for url in df['Url_Season_Summary'].unique(): 
    i += 1
    sys.stdout.write(f"\r{i}/{len(df['Url_Season_Summary'].unique())}...")
    season = df.loc[df['Url_Season_Summary'] == url, "Season"].values[0]
    lg = df.loc[df['Url_Season_Summary'] == url, "Lg"].values[0]
    get_season_summary(season=season, lg=lg, url=url)
    

# SEASON STANDINGS  
print("\nSeason Standings")
i = 0     
for url in df['Url_Season_Standings'].unique():
    i += 1
    sys.stdout.write(f"\r{i}/{len(df['Url_Season_Standings'].unique())}...")
    champ = df.loc[df['Url_Season_Standings'] == url, "Champion"].values[0]
    get_season_standing(champ=champ, url=url)
    

# PLAYOFF STANDINGS    
print("\nPlayoff Standings")
i = 0     
for url in df['Url_Playoff_Standings'].unique():
    i += 1
    sys.stdout.write(f"\r{i}/{len(df['Url_Playoff_Standings'].unique())}...")
    get_playoff_standing(url=url)

Season Summary
73/73...
Season Standings
73/73...
Playoff Standings
73/73...