In [1]:
import requests
from bs4 import BeautifulSoup, SoupStrainer
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
import numpy as np

In [2]:


class DataScraper:
    def __init__(self, years):
        self.years = years
        self.player_urls = [f'https://www.pro-football-reference.com/years/{year}/fantasy.htm' for year in years]
        self.team_url = 'https://www.pro-football-reference.com/years/{}/'
        self.session = requests.Session()

    @lru_cache(maxsize=None)
    def scrape_data(self, url):
        response = self.session.get(url)
        response_text = response.text

        parse_only = SoupStrainer('table')
        soup = BeautifulSoup(response_text, 'lxml', parse_only=parse_only)
        tables = soup.find_all('table')

        df_list = [pd.read_html(str(tables[i]))[0] for i in range(min(len(tables), 2))]
        if df_list:
            year = url.split('/')[-2]  # Extract the year from the URL
            df = pd.concat(df_list)
            df['Year'] = str(year)
            return df

    def scrape_player_data(self):
        with ThreadPoolExecutor() as executor:
            player_data_frames = list(executor.map(self.scrape_data, self.player_urls))
        return pd.concat([df for df in player_data_frames if df is not None], ignore_index=True)

    def scrape_team_data(self):
        with ThreadPoolExecutor() as executor:
            team_urls = [self.team_url.format(year) for year in self.years]
            team_data_frames = list(executor.map(self.scrape_data, team_urls))
        return pd.concat([df for df in team_data_frames if df is not None], ignore_index=True)

In [3]:
class DataPreprocessor:
    
    dictionary = {'New York Giants': 'NYG',
 'Las Vegas Raiders': 'LVR',
 'Los Angeles Chargers': 'LAC',
 'Denver Broncos': 'DEN',
 'Green Bay Packers': 'GNB',
 'Jacksonville Jaguars': 'JAX',
 'Washington Redskins': 'WAS',
 'Los Angeles Rams': 'LAR',
 'Arizona Cardinals': 'ARI',
 'Carolina Panthers': 'CAR',
 'Baltimore Ravens': 'BAL',
 'New York Jets': 'NYJ',
 'Miami Dolphins': 'MIA',
 'Minnesota Vikings': 'MIN',
 'Oakland Raiders': 'OAK',
 'Chicago Bears': 'CHI',
 'New England Patriots': 'NWE',
 'Tennessee Titans': 'TEN',
 'New Orleans Saints': 'NOR',
 'Cleveland Browns': 'CLE',
 'Tampa Bay Buccaneers': 'TAM',
 'Buffalo Bills': 'BUF',
 'Cincinnati Bengals': 'CIN',
 'Houston Texans': 'HOU',
 'San Francisco 49ers': 'SFO',
 'Atlanta Falcons': 'ATL',
 'Washington Football Team': 'WAS',
 'Indianapolis Colts': 'IND',
 'Seattle Seahawks': 'SEA',
 'Pittsburgh Steelers': 'PIT',
 'Dallas Cowboys': 'DAL',
 'Detroit Lions': 'DET',
 'Philadelphia Eagles': 'PHI',
 'Kansas City Chiefs': 'KAN'}
    
    def __init__(self, data):
        self.data = data
        
    def try_convert_to_float(self, x):
        try:
            return float(x)
        except:
            if x == '' or pd.isna(x):
                return np.nan
            else:
                return x

    def convert_columns_to_float(self):
        # Convert all columns to float if possible
        self.data = self.data.applymap(self.try_convert_to_float)
        self.data = self.data.astype(float, errors='ignore')

    def flatten_multiindex_header(self):
        # Flatten multi-level column index to single level column index if it exists
        if isinstance(self.data.columns, pd.MultiIndex):
            level1 = self.data.columns.get_level_values(0)
            level2 = self.data.columns.get_level_values(1)

            # Count duplicates in level2
            duplicates = level2.value_counts() > 1

            # Create new column names
            new_columns = []
            for col_level1, col_level2 in zip(level1, level2):
                if duplicates[col_level2]:
                    new_columns.append(f'{col_level1}_{col_level2}')
                elif col_level2 == '':
                    new_columns.append(col_level1)
                else:
                    new_columns.append(col_level2)

            # Replace the MultiIndex header with the new flattened header
            self.data.columns = new_columns
        return self.data
 
    def calculate_yards_per_attempt(self):
    # Calculate yards per rushing attempt
        if 'Rushing_Att' in self.data.columns and 'Rushing_Yds' in self.data.columns:
            self.data['Y/A'] = self.data.apply(lambda x: 0 if (pd.isna(x['Rushing_Att']) or x['Rushing_Att'] == 0) else x['Rushing_Yds'] / x['Rushing_Att'], axis=1)

    def calculate_yards_per_reception(self):
    # Calculate yards per reception
        if 'Rec' in self.data.columns and 'Receiving_Yds' in self.data.columns:
            self.data['Y/R'] = self.data.apply(lambda x: 0 if (pd.isna(x['Rec']) or x['Rec'] == 0) else x['Receiving_Yds'] / x['Rec'], axis=1)
            
    def handle_missing_values(self, thresh=0.5):
    
        # Drop rows where most of the columns have string data
        self.data = self.data[~(np.sum(np.vectorize(isinstance)(self.data.values, str), axis=1) > thresh * len(self.data.columns))]

        def should_fill(col):
            na_and_zero_count = (col.apply(lambda x: pd.isna(x) or x == 0)).sum()
            return na_and_zero_count / len(col) >= 0.7

        should_fill_mask = self.data.apply(should_fill)
        fill_mask = self.data.apply(lambda col: col.isna() & should_fill_mask[col.name])
        self.data = self.data.mask(fill_mask, 0.0)

        # Drop columns where most of the rows are null
        self.data = self.data.drop(columns=self.data.columns[self.data.isnull().mean() > thresh], errors='ignore')

        # Drop rows where most of the columns are null
        self.data = self.data.dropna(thresh=thresh * len(self.data.columns))

        # Drop rows where PPR data is null (if present in dataframe)
        if 'PPR' in self.data.columns:
            self.data = self.data.dropna(subset=['PPR'])

         # Drop rows where Rk > 350 if Rk is a column
        if 'Rk' in self.data.columns:
            self.data = self.data[self.data['Rk'] <= 400]

    def replace_team_names(self, dictionary):
        for key, value in dictionary.items():
            mask = self.data['Tm'].str.startswith(key)
            if mask.any():
                self.data.loc[mask, 'Tm'] = value
        return self.data
        
    def preprocess_data(self):
        self.convert_columns_to_float()
        self.flatten_multiindex_header()
        self.handle_missing_values()
        self.replace_team_names(dictionary=self.dictionary)  # pass dictionary argument
        self.calculate_yards_per_attempt()
        self.calculate_yards_per_reception()
        # Call other preprocessing methods in the correct order
        return self.data

In [4]:
class MergeAndProcess:
    def __init__(self, player_data, team_data):
        self.player_data = player_data
        self.team_data = team_data

    def merge(self):
        # Create a dictionary to map team data to new columns in player data
        team_data_map = {}
        for col in self.team_data.columns:
            if col not in ['Year', 'Tm']:
                if col in self.player_data.columns:
                    team_data_map[col] = col
                else:
                    team_data_map[col] = f'Team_{col}'

        # Create new columns in player data for team data
        for col in team_data_map.values():
            self.player_data[col] = np.nan

        # Map team data to new columns in player data based on team and year
        self.player_data.set_index(['Year', 'Tm'], inplace=True)
        self.team_data.set_index(['Year', 'Tm'], inplace=True)
        self.player_data.update(self.team_data.rename(columns=team_data_map))

        # Flatten the column index
        if isinstance(self.player_data.columns, pd.MultiIndex):
            level1 = self.player_data.columns.get_level_values(0)
            level2 = self.player_data.columns.get_level_values(1)
            new_columns = []
            for col_level1, col_level2 in zip(level1, level2):
                if col_level1.startswith('Team_'):
                    new_columns.append(col_level2)
                else:
                    new_columns.append(col_level1)
            self.player_data.columns = new_columns

        return self.player_data.reset_index()

    def process(self):
        
        # Merge player and team data
        merged_data = self.merge()
        
        # Add next year PPR
        merged_data['next_year_PPR'] = merged_data.groupby('Player')['PPR'].shift(-1)

        # Add PPR per game
        merged_data['PPR_per_game'] = np.nan
        try:
            merged_data['PPR_per_game'] = merged_data['PPR'] / merged_data['G']
        except:
            pass
        
        # Convert column types
        for col in merged_data.columns:
            if merged_data[col].dtype == 'object':
                try:
                    merged_data[col] = pd.to_numeric(merged_data[col], errors='raise').astype('float')
                except ValueError:
                    merged_data[col] = merged_data[col].astype('string')
            else:
                dtype = merged_data[col].dtype
                merged_data[col] = merged_data[col].astype(dtype)

        # Drop missing values
        merged_data = merged_data.dropna()
        
        # Reset index
        merged_data = merged_data.reset_index(drop=True)
        
        # Replace non-alphanumeric characters in player names
        merged_data['Player'] = merged_data['Player'].str.replace(r'[^\w\s]+', '')
        
        return merged_data

In [5]:
years = list(range(2013, 2023))

In [6]:
scraper = DataScraper(years)
player_data = scraper.scrape_player_data()
team_data = scraper.scrape_team_data()

In [7]:
player_preprocessor = DataPreprocessor(player_data)
player_data = player_preprocessor.preprocess_data()

In [8]:
team_preprocessor = DataPreprocessor(team_data)
team_data = team_preprocessor.preprocess_data()

In [9]:
# create instance of MergeAndProcess class
merger = MergeAndProcess(player_data, team_data)

# call process_data method to merge and process data
merged_data = merger.process()

  merged_data['Player'] = merged_data['Player'].str.replace(r'[^\w\s]+', '')


In [10]:
# Split the merged_data DataFrame into training/validation (-2020) and testing (2021)
train_val_data = merged_data[merged_data['Year'] < merged_data['Year'].max()]
test_data = merged_data[merged_data['Year'] == merged_data['Year'].max()]

In [11]:
merged_data.to_csv('merged_data.csv', index=True)