In [1]:
# %%writefile ../src/salary_predict/pyspark_data_loader_preprocessor.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def create_spark_session():
    """Create a Spark session."""
    spark = SparkSession.builder.appName("SalaryPrediction").getOrCreate()
    return spark

def load_data(file_path, spark):
    """Load data using PySpark."""
    try:
        data = spark.read.csv(file_path, header=True, inferSchema=True)
        logger.info(f"Data loaded. Schema: {data.printSchema()}")
        return data
    except Exception as e:
        logger.error(f"Failed to load data from {file_path}: {e}")
        raise

def format_season(data):
    """Format the Season column."""
    try:
        # Assuming the 'Season' column contains strings like '2018-19', extract the first 4 characters
        data = data.withColumn('Season', col('Season').substr(1, 4).cast('int'))
        logger.info(f"Seasons in data: {data.select('Season').distinct().collect()}")
        return data
    except Exception as e:
        logger.error(f"Failed to format season data: {e}")
        raise


def clean_data(data):
    """Clean the dataset."""
    try:
        columns_to_drop = ['Injury_Periods', '2nd Apron', 'Wins', 'Losses']
        data_clean = data.drop(*columns_to_drop)
        
        percentage_cols = ['3P%', '2P%', 'FT%', 'TS%']
        for col_name in percentage_cols:
            mean_value = data_clean.agg({col_name: 'mean'}).collect()[0][0]
            data_clean = data_clean.fillna(mean_value, subset=[col_name])
        
        data_clean = data_clean.na.drop()
        logger.info(f"Data cleaned. Remaining shape: ({data_clean.count()}, {len(data_clean.columns)})")
        return data_clean
    except Exception as e:
        logger.error(f"Failed to clean data: {e}")
        raise



def engineer_features(data):
    """Engineer new features for the dataset."""
    try:
        data = data.withColumn('PPG', col('PTS') / col('GP'))
        data = data.withColumn('APG', col('AST') / col('GP'))
        data = data.withColumn('RPG', col('TRB') / col('GP'))
        data = data.withColumn('SPG', col('STL') / col('GP'))
        data = data.withColumn('TOPG', col('TOV') / col('GP'))
        data = data.withColumn('Availability', col('GP') / 82)
        data = data.withColumn('SalaryPct', col('Salary') / col('Salary_Cap_Inflated'))
        data = data.withColumn('Efficiency', 
                               (col('PTS') + col('TRB') + col('AST') + col('STL') + col('BLK')) / 
                               (col('FGA') + col('FTA') + col('TOV') + 1))
        data = data.withColumn('ValueOverReplacement', col('VORP') / (col('Salary') + 1))
        data = data.withColumn('ExperienceSquared', col('Years of Service') ** 2)
        data = data.withColumn('Days_Injured_Percentage', col('Total_Days_Injured') / col('GP'))
        data = data.withColumn('WSPG', col('WS') / col('GP'))
        data = data.withColumn('DWSPG', col('DWS') / col('GP'))
        data = data.withColumn('OWSPG', col('OWS') / col('GP'))
        data = data.withColumn('PFPG', col('PF') / col('GP'))
        data = data.withColumn('ORPG', col('ORB') / col('GP'))
        data = data.withColumn('DRPG', col('DRB') / col('GP'))
        
        columns_to_drop = ['GP', '2PA', 'OBPM', 'BPM', 'DBPM', '2P', 'GS', 'PTS', 'AST', 'TRB', 'STL', 'BLK',
                           'TOV', 'MP', 'FG', 'FGA', 'FG%', '3P', '3PA', '2P', '2PA', 'FT', 'FTA', 'ORB', 'DRB', 
                           'TRB', 'TS%', 'ORB%', 'DRB%', 'TRB%', 'AST%', 'STL%', 'BLK%', 'TOV%', 'USG%', 
                           'Luxury Tax', '1st Apron', 'BAE', 'Standard /Non-Taxpayer', 'Taxpayer', 
                           'Team Room /Under Cap', 'WS', 'DWS', 'WS/48', 'PF', 'OWS', 'Injured']
        
        data = data.drop(*columns_to_drop)
        logger.info("New features added.")
        return data
    except Exception as e:
        logger.error(f"Failed to engineer features: {e}")
        raise

def encode_injury_risk(data):
    """Encode Injury_Risk into numerical categories."""
    risk_mapping = {'Low': 0, 'Medium': 1, 'High': 2}
    data = data.replace({'Injury_Risk': risk_mapping})
    return data, risk_mapping

def filter_seasons(data, predict_season):
    """Filter the dataset into prior seasons and the target season for prediction."""
    prior_seasons_data = data.filter(col('Season') < predict_season)
    target_season_data = data.filter(col('Season') == predict_season)
    logger.info(f"Data filtered. Prior seasons shape: ({prior_seasons_data.count()}, {len(prior_seasons_data.columns)}), Target season shape: ({target_season_data.count()}, {len(target_season_data.columns)})")
    return target_season_data, prior_seasons_data

if __name__ == "__main__":
    try:
        spark = create_spark_session()
        file_path = '../data/processed/nba_player_data_final_inflated.csv'
        predict_season = 2023

        # Load, preprocess, and engineer features using PySpark
        data = load_data(file_path, spark)
        data = format_season(data)
        target_season_data, prior_seasons_data = filter_seasons(data, predict_season)
        prior_seasons_data = clean_data(prior_seasons_data)
        prior_seasons_data = engineer_features(prior_seasons_data)

        logger.info("Data preprocessing completed with PySpark. Ready for further processing.")
        
        # Optionally, you can convert PySpark DataFrame to pandas DataFrame if needed
        pandas_df = prior_seasons_data.toPandas()
        print(pandas_df.head())
        
    except Exception as e:
        logger.critical(f"Critical error in PySpark data processing pipeline: {e}")
        raise


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/18 13:54:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/18 13:54:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2024-08-18 13:54:12,600 - INFO - Data loaded. Schema: None


root
 |-- Season: string (nullable = true)
 |-- Player: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Team: string (nullable = true)
 |-- TeamID: double (nullable = true)
 |-- Years of Service: double (nullable = true)
 |-- GP: double (nullable = true)
 |-- GS: double (nullable = true)
 |-- MP: double (nullable = true)
 |-- FG: double (nullable = true)
 |-- FGA: double (nullable = true)
 |-- FG%: double (nullable = true)
 |-- 3P: double (nullable = true)
 |-- 3PA: double (nullable = true)
 |-- 3P%: double (nullable = true)
 |-- 2P: double (nullable = true)
 |-- 2PA: double (nullable = true)
 |-- 2P%: double (nullable = true)
 |-- eFG%: double (nullable = true)
 |-- FT: double (nullable = true)
 |-- FTA: double (nullable = true)
 |-- FT%: double (nullable = true)
 |-- ORB: double (nullable = true)
 |-- DRB: double (nullable = true)
 |-- TRB: double (nullable = true)
 |-- AST: double (nullable = true)
 |-- STL: double (nullable =

2024-08-18 13:54:13,166 - INFO - Seasons in data: [Row(Season=2018), Row(Season=2023), Row(Season=2022), Row(Season=2019), Row(Season=2020), Row(Season=2021)]
2024-08-18 13:54:13,569 - INFO - Data filtered. Prior seasons shape: (2294, 66), Target season shape: (476, 66)
24/08/18 13:54:14 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2024-08-18 13:54:14,421 - INFO - Data cleaned. Remaining shape: (2274, 62)
2024-08-18 13:54:14,594 - INFO - New features added.
2024-08-18 13:54:14,595 - INFO - Data preprocessing completed with PySpark. Ready for further processing.


   Season           Player        Position   Age Team        TeamID  \
0    2018     Aaron Gordon         Forward  23.0  ORL  1.610613e+09   
1    2018    Aaron Holiday           Guard  22.0  IND  1.610613e+09   
2    2018      Abdel Nader         Forward  25.0  OKC  1.610613e+09   
3    2018       Al Horford  Center-Forward  33.0  BOS  1.610613e+09   
4    2018  Al-Farouq Aminu         Forward  28.0  POR  1.610613e+09   

   Years of Service   3P%   2P%  eFG%  ...  Efficiency  ValueOverReplacement  \
0               4.0  0.35  0.50  0.51  ...    1.519836          7.873684e-08   
1               0.0  0.34  0.46  0.48  ...    1.365439          5.223348e-08   
2               2.0  0.32  0.51  0.50  ...    1.471223         -1.451123e-07   
3              11.0  0.36  0.60  0.59  ...    1.966341          1.244438e-07   
4               8.0  0.34  0.51  0.51  ...    1.877235          2.156069e-07   

   ExperienceSquared  Days_Injured_Percentage      WSPG     DWSPG     OWSPG  \
0            

In [2]:
# %%writefile ../src/pyspark_model_trainer.py

import joblib
import xgboost as xgb
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def create_spark_session():
    """Create a Spark session."""
    spark = SparkSession.builder.appName("SalaryPrediction").getOrCreate()
    return spark

def load_data(file_path, spark):
    """Load data using PySpark."""
    try:
        data = spark.read.csv(file_path, header=True, inferSchema=True)
        logger.info(f"Data loaded. Schema: {data.printSchema()}")
        return data
    except Exception as e:
        logger.error(f"Failed to load data from {file_path}: {e}")
        raise

def vectorize_data(data, features):
    """Convert the feature columns to a single vector column."""
    assembler = VectorAssembler(inputCols=features, outputCol="features")
    data = assembler.transform(data)
    return data

def scale_features(data):
    """Scale the features using StandardScaler."""
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
    scaler_model = scaler.fit(data)
    data = scaler_model.transform(data)
    return data, scaler_model

def train_random_forest(data, target_col):
    """Train a Random Forest model using PySpark."""
    rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol=target_col)

    # Create a parameter grid for hyperparameter tuning
    param_grid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [100, 200, 300]) \
        .addGrid(rf.maxDepth, [5, 10, 15]) \
        .build()

    # Set up cross-validator
    crossval = CrossValidator(estimator=rf, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(labelCol=target_col), numFolds=5)

    # Train the model using cross-validation
    cv_model = crossval.fit(data)
    return cv_model.bestModel

def evaluate_model(model, data, target_col):
    """Evaluate the model using RMSE, MAE, and R2 metrics."""
    predictions = model.transform(data)
    evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction")

    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

    logger.info(f"Evaluation Metrics - RMSE: {rmse}, MAE: {mae}, R2: {r2}")
    return rmse, mae, r2

def save_model(model, model_save_path):
    """Save the trained model."""
    model.write().overwrite().save(model_save_path)
    logger.info(f"Model saved at {model_save_path}")

if __name__ == "__main__":
    try:
        spark = create_spark_session()

        # Load the preprocessed data using PySpark
        file_path = '../data/processed/nba_player_data_final_inflated.csv'
        predict_season = 2023

        # Load, preprocess, and engineer features using PySpark
        data = load_data(file_path, spark)
        data = format_season(data)
        target_season_data, prior_seasons_data = filter_seasons(data, predict_season)
        prior_seasons_data = clean_data(prior_seasons_data)
        
        # Engineer features and check if they were added correctly
        prior_seasons_data = engineer_features(prior_seasons_data)
        logger.info(f"Columns after feature engineering: {prior_seasons_data.columns}")

        # Define the features and target column
        features = ['PPG', 'APG', 'RPG', 'SPG', 'TOPG', 'Years of Service', 'PER', 'VORP', 'WSPG', 'OWSPG']
        target_col = 'SalaryPct'

        # Vectorize and scale the features
        prior_seasons_data = vectorize_data(prior_seasons_data, features)
        prior_seasons_data, scaler_model = scale_features(prior_seasons_data)

        # Train the Random Forest model
        model = train_random_forest(prior_seasons_data, target_col)

        # Evaluate the model on the target season data
        target_season_data = vectorize_data(target_season_data, features)
        target_season_data = scaler_model.transform(target_season_data)
        evaluate_model(model, target_season_data, target_col)

        # Save the trained model
        # model_save_path = 'data/models/salary_prediction_rf'
        # save_model(model, model_save_path)

    except Exception as e:
        logger.critical(f"Critical error in PySpark model training pipeline: {e}")
        raise


2024-08-18 13:54:15,985 - INFO - Data loaded. Schema: None
2024-08-18 13:54:16,106 - INFO - Seasons in data: [Row(Season=2018), Row(Season=2023), Row(Season=2022), Row(Season=2019), Row(Season=2020), Row(Season=2021)]


root
 |-- Season: string (nullable = true)
 |-- Player: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Team: string (nullable = true)
 |-- TeamID: double (nullable = true)
 |-- Years of Service: double (nullable = true)
 |-- GP: double (nullable = true)
 |-- GS: double (nullable = true)
 |-- MP: double (nullable = true)
 |-- FG: double (nullable = true)
 |-- FGA: double (nullable = true)
 |-- FG%: double (nullable = true)
 |-- 3P: double (nullable = true)
 |-- 3PA: double (nullable = true)
 |-- 3P%: double (nullable = true)
 |-- 2P: double (nullable = true)
 |-- 2PA: double (nullable = true)
 |-- 2P%: double (nullable = true)
 |-- eFG%: double (nullable = true)
 |-- FT: double (nullable = true)
 |-- FTA: double (nullable = true)
 |-- FT%: double (nullable = true)
 |-- ORB: double (nullable = true)
 |-- DRB: double (nullable = true)
 |-- TRB: double (nullable = true)
 |-- AST: double (nullable = true)
 |-- STL: double (nullable =

2024-08-18 13:54:16,268 - INFO - Data filtered. Prior seasons shape: (2294, 66), Target season shape: (476, 66)
2024-08-18 13:54:16,715 - INFO - Data cleaned. Remaining shape: (2274, 62)
2024-08-18 13:54:16,844 - INFO - New features added.
2024-08-18 13:54:16,845 - INFO - Columns after feature engineering: ['Season', 'Player', 'Position', 'Age', 'Team', 'TeamID', 'Years of Service', '3P%', '2P%', 'eFG%', 'FT%', 'PER', 'VORP', 'Salary', 'Total_Days_Injured', 'Injury_Risk', 'Salary Cap', 'Salary_Cap_Inflated', 'PPG', 'APG', 'RPG', 'SPG', 'TOPG', 'Availability', 'SalaryPct', 'Efficiency', 'ValueOverReplacement', 'ExperienceSquared', 'Days_Injured_Percentage', 'WSPG', 'DWSPG', 'OWSPG', 'PFPG', 'ORPG', 'DRPG']
24/08/18 13:54:19 WARN DAGScheduler: Broadcasting large task binary with size 1058.3 KiB
24/08/18 13:54:20 WARN DAGScheduler: Broadcasting large task binary with size 1875.7 KiB
24/08/18 13:54:20 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/08/18 13:54:21 WAR