In [110]:
from dotenv import load_dotenv
load_dotenv()
import os
os.environ['SPARK_HOME'] = os.getenv('SPARK_HOME')
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = 'lab'
os.environ['PYSPARK_PYTHON'] = "python"

In [111]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col

class DataLoader:
    
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.spark = SparkSession.builder.master('local[*]').appName('Mediacal_cost_prediction').getOrCreate()

    def load_data(self) -> DataFrame:
        df = self.spark.read.csv(self.file_path, header=True, inferSchema=True)
        return df

    def remove_outliers(self, df: DataFrame, outliers_dict: dict) -> DataFrame:
        conditions = [
            (col(col_name) >= min_val) & (col(col_name) <= max_val)
            for col_name, (min_val, max_val) in outliers_dict.items()
        ]
        combined_condition = conditions[0]
        for condition in conditions[1:]:
            combined_condition &= condition
        df = df.filter(combined_condition)
        return df

In [112]:
from pyspark.sql.functions import col, randn, when, lit, rand
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from typing import List
from pyspark.sql import functions as F
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

class FeatureEngineer:
    
    def __init__(self, df: DataFrame):
        self.df = df
        
    def string_indexer(self, input_cols: List[str], output_cols: List[str]) ->  "FeatureEngineer":
        string_indexer = StringIndexer(inputCols=input_cols, outputCols=output_cols)
        string_indexer = string_indexer.fit(self.df)
        self.df = string_indexer.transform(self.df)
        return self
        
    def One_hot_encoder(self, input_col: List[str], output_col: List[str]) ->  "FeatureEngineer": 
        one_hot_encoder = OneHotEncoder(inputCols=input_col, outputCols=output_col)
        one_hot_encoder = one_hot_encoder.fit(self.df)
        self.df = one_hot_encoder.transform(self.df)
        return self
        
    def convert_to_binary(self, cols_to_binary: List[str]) -> "FeatureEngineer":
        for column in cols_to_binary:
            self.df = self.df.withColumn(column, col(column).cast('int'))
        return self
    
    def charges_to_flaot(self):
        self.df = self.df.withColumn('charges', col('charges').cast('double'))
        return self
    
    def assemble_numerical_features(self, cols: List[str], output_col_name: str) ->  "FeatureEngineer":
        assembler = VectorAssembler(inputCols=cols, outputCol=output_col_name)
        self.df = assembler.transform(self.df)
        return self
    
    def normalize_fetures(self, numerical_col_vector: str, output_name: str) -> "FeatureEngineer":
        scaler = StandardScaler(inputCol=numerical_col_vector, outputCol=output_name,
                                withStd=True, withMean=True)
        scaler = scaler.fit(self.df)
        self.df = scaler.transform(self.df)
        return self
    
        
    def get_DataFrame(self) -> DataFrame:
        return self.df

In [113]:
from pyspark.mllib.evaluation import RegressionMetrics

class ModelHandler:
    
    def __init__(self, train, test):
        self.train = train
        self.test = test        
        
    def train_model(self, regressor):
        reg = regressor.fit(self.train)
        return reg

    def convert_to_rdd(self, model):
        def transform_and_convert(df):
            pred_df = model.transform(df).select('prediction', 'charges').dropna()
            return pred_df.rdd.map(tuple)
        
        pred_train_rdd = transform_and_convert(self.train)
        pred_test_rdd = transform_and_convert(self.test)
        return pred_train_rdd, pred_test_rdd

    
    def evaluate_model(self, model_name, rdd_train_data, rdd_test_data) -> None:
        
        metrics_train = RegressionMetrics(rdd_train_data)
        metrics_test = RegressionMetrics(rdd_test_data)
        
        print(f"\nModel name: {model_name}")
        print("Training Data Metrics:")
        print(f"  MSE: {metrics_train.meanSquaredError}")
        print(f"  RMSE: {metrics_train.rootMeanSquaredError}")
        print(f"  MAE: {metrics_train.meanAbsoluteError}")
        print(f"  R2: {metrics_train.r2}")

        print("\nTesting Data Metrics:")
        print(f"  MSE: {metrics_test.meanSquaredError}")
        print(f"  RMSE: {metrics_test.rootMeanSquaredError}")
        print(f"  MAE: {metrics_test.meanAbsoluteError}")
        print(f"  R2: {metrics_test.r2}")
            

In [114]:
from typing import List
from pyspark.sql.functions import col

class PipelineManager:
    
    def __init__(self, data_path):
        self.data_path = data_path
        
        
    def process_data(self, outliers_dict: dict,
                     string_indexer_cols: List[str],
                     string_indexer_output: str, 
                     cols_to_binary: List[str], 
                     cols_for_one_hot: List[str],
                     output_for_one_hot: List[str],
                     assemble_numerical_cols: List[str],
                     assmble_numerical_output_name: str,
                     cols_to_normalize: List[str],
                     normalized_output: str, 
                     final_cols: List[str],
                     final_output: str):
        
        data_loader = DataLoader(self.data_path)
        df = data_loader.load_data()
        df = data_loader.remove_outliers(df, outliers_dict)
            
        features_engineer = FeatureEngineer(df)        
        features_engineer.string_indexer(input_cols=string_indexer_cols, output_cols=string_indexer_output)
        features_engineer.convert_to_binary(cols_to_binary=cols_to_binary)
        features_engineer.charges_to_flaot()
        features_engineer.One_hot_encoder(input_col=cols_for_one_hot, output_col=output_for_one_hot)
        features_engineer.assemble_numerical_features(cols=assemble_numerical_cols, output_col_name=assmble_numerical_output_name)
        features_engineer.normalize_fetures(numerical_col_vector=cols_to_normalize, output_name=normalized_output)
        features_engineer.assemble_numerical_features(cols=final_cols, output_col_name=final_output)
        df = features_engineer.get_DataFrame()
        return df             

In [115]:
from dotenv import load_dotenv
load_dotenv()
import os
os.environ['SPARK_HOME'] = os.getenv('SPARK_HOME')
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = 'python'
os.environ['PYSPARK_PYTHON'] = "python"

from pyspark.ml.regression import GBTRegressor


def main():
    
    pipeline_manager = PipelineManager("../data/medical.csv")
    df = pipeline_manager.process_data(outliers_dict={"charges": (0, 35000), "bmi": (0, 45)},
                                  string_indexer_cols=["sex", "smoker", "region"], string_indexer_output=["sex_index", "smoker_index", "region_index"],
                                  cols_to_binary=["sex_index", "smoker_index"],
                                  cols_for_one_hot=['region_index'], output_for_one_hot=["region_one_hot"],
                                  assemble_numerical_cols=["bmi", "children", "age"], assmble_numerical_output_name='numerical_cols_vector',
                                  cols_to_normalize='numerical_cols_vector', normalized_output="scaled_numerical_cols_vector",
                                  final_cols=['sex_index', 'smoker_index', 'region_one_hot','scaled_numerical_cols_vector'], final_output='final_features_vector')   
    
    train, test = df.randomSplit([0.75, 0.25])
    
    model_handler = ModelHandler(train, test)
    regressor = model_handler.train_model(regressor=GBTRegressor(featuresCol='final_features_vector', labelCol='charges'))
    pred_train_rdd, pred_test_rdd = model_handler.convert_to_rdd(regressor)
    model_handler.evaluate_model("GBT Regressor", pred_train_rdd, pred_test_rdd)
    

if __name__ == "__main__":
    main()
    
    


Model name: GBT Regressor
Training Data Metrics:
  MSE: 10145338.263716992
  RMSE: 3185.1747618799495
  MAE: 1827.1931553927914
  R2: 0.8282466917987635

Testing Data Metrics:
  MSE: 13439840.613933718
  RMSE: 3666.038817843275
  MAE: 2183.592502942301
  R2: 0.7021241270836316
