| Version | Date       | Developer | Remark             |
|---------|------------|-----------|--------------------|
| 1.0     | Feb-1-2025 | Johnson | Initial version:developed pipeline & function for data engineering    |

In [0]:
# !pip install synapseml
# !pip install shap

In [0]:
import numpy as np; np.__version__ = '1.24.0'
import shap
import pandas as pd  
import matplotlib.pyplot as plt  
import seaborn as sns  
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *


from sklearn.metrics import *
from sklearn.model_selection  import train_test_split,GridSearchCV,cross_val_score
from sklearn.model_selection  import train_test_split
from sklearn.ensemble import RandomForestRegressor






In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, MinMaxScaler, VectorAssembler

# Step 1: StringIndexer for categorical columns
sex_indexer = StringIndexer(inputCol="sex", outputCol="sex_index")
smoker_indexer = StringIndexer(inputCol="smoker", outputCol="smoker_index")
region_indexer = StringIndexer(inputCol="region", outputCol="region_index")

# Step 2: OneHotEncoder for region
region_encoder = OneHotEncoder(inputCol="region_index", outputCol="region_encoded")

# Step 3: VectorAssembler to combine "bmi" for MinMaxScaler
bmi_assembler = VectorAssembler(inputCols=["bmi"], outputCol="bmi_assembled")
age_assembler = VectorAssembler(inputCols=["age"], outputCol="age_assembled")

# Step 4: MinMaxScaler for normalization
bmi_scaler = MinMaxScaler(inputCol="bmi_assembled", outputCol="bmi_normalized")
age_scaler = MinMaxScaler(inputCol="age_assembled", outputCol="age_normalized")

# Step 5: Create Pipeline
pipeline = Pipeline(stages=[
    sex_indexer, smoker_indexer, region_indexer, region_encoder,  # Categorical encoding
    bmi_assembler, bmi_scaler,  # bmi normalization
    age_assembler, age_scaler   # age normalization
])

In [0]:
from pyspark.ml.feature import VectorAssembler

def vectorize_features(df_transformed):
    # Step 1: Select the features to vectorize
    input_columns = ["sex_index", "smoker_index", "region_encoded", "bmi_normalized", "age_normalized", "children"]

    # Step 2: Create a new VectorAssembler to combine features
    assembler = VectorAssembler(inputCols=input_columns, outputCol="features")

    # Step 3: Rename expenses to label
    df_transformed = df_transformed.withColumnRenamed("expenses", "label")

    # Step 4: Vectorize the features using VectorAssembler
    df_vector = assembler.transform(df_transformed)
    df_final = df_vector.select("label", "features")

    return df_final


In [0]:
def plot_learning_curve(rf, para, start, end, step,cv=3):
    results = []
    for i in range(start, end, step):
        rf.set_params(**{para: i})
        scores = cross_val_score(rf, X_train, y_train, cv=cv, scoring='r2')
        results.append(scores.mean())
    plt.plot(range(start, end, step), results, color="red", label="r2")
    plt.legend()
    plt.title(f"{para}")
    plt.show()

In [0]:
def show_shap_summary(rf, X_train, X_test, feature_names):
    explainer = shap.Explainer(rf, X_train)
    shap_values = explainer(X_test, check_additivity=False)
    shap.summary_plot(shap_values, feature_names=feature_names)

In [0]:
def show_shap_sample(X_train, X_test,feature_names, sample_idx):
    explainer = shap.Explainer(rf, X_train)
    shap_values = explainer(X_test, check_additivity=False)
    shap_values_for_sample = shap_values[sample_idx]
    return shap.force_plot(explainer.expected_value, shap_values_for_sample.values, X_test[sample_idx], feature_names=feature_names)

In [0]:
def application_predict_expenses(path_saved, age, sex, bmi, children, smoker, region, expenses):
    loaded_model = PipelineModel.load(path_saved)
    
    new_data = spark.createDataFrame([(age, sex, bmi, children, smoker, region, expenses)], ["age", "sex", "bmi", "children", "smoker", "region", "expenses"])
    
    predictions = loaded_model.transform(new_data)
    
    expected_expenses = predictions.withColumn("Expected_Expenses", round("prediction", 2))
    expected_expenses = expected_expenses.withColumn("Actual_Expenses", round("expenses", 2))

    expected_expenses = expected_expenses.withColumn("Claim_Status", when(col("Expected_Expenses") < col("Actual_Expenses"), "Abnormal Claim").otherwise("Normal Claim"))
    
    display(expected_expenses.select("Claim_Status", "Expected_Expenses", "Actual_Expenses"))

In [0]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("example").getOrCreate()


# def display(df):
#     """
#     Mimics Databricks' display() function by converting a Spark DataFrame to Pandas 
#     and displaying it nicely in Jupyter notebooks.
    
#     Args:
#         df (pyspark.sql.DataFrame): The Spark DataFrame to display.
#     """
#     try:
#         from IPython.display import display as ipy_display
#         ipy_display(df.toPandas())  # Convert to Pandas and display
#     except:
#         print(df.show())  # Fallback to show() if Pandas conversion fails