In [None]:
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_poisson_deviance
import numpy as np


In [None]:

np.seterr(divide = 'ignore') 

def ImportData(FreqFile, SevFile):
  dfFreq=pd.read_csv(FreqFile, header='infer')
  dfSev=pd.read_csv(SevFile, header='infer')
  dfFreq["IDpol"] = dfFreq["IDpol"].astype(int)
  dfFreq.set_index("IDpol", inplace=True)
  dfSev = dfSev.groupby("IDpol").sum()
  df = dfFreq.join(dfSev, how="left")
  df["ClaimAmount"].fillna(0, inplace=True)
  # unquote string fields
  for column_name in df.columns[df.dtypes.values == object]:
        df[column_name] = df[column_name].str.strip("'")
  df["PurePremium"] = df["ClaimAmount"] / df["Exposure"]
  df["Frequency"] = df["ClaimNb"] / df["Exposure"]
  df["Severity"] = df["ClaimAmount"] / np.fmax(df["ClaimNb"], 1)
  df["Log_PurePremium"]=np.log(df['PurePremium'])
  df["Log_Frequency"]=np.log(df['Frequency'])
  df["Log_Severity"]=np.log(df['Severity'])
  df["Log_ClaimNb"]=np.log(df['ClaimNb'])
  df["Log_ClaimAmount"]=np.log(df['ClaimAmount'])
  df.replace([np.inf, -np.inf], np.nan, inplace=True)
  return df


In [None]:
def ExploratoryAnalysis(df, mlflow, mountpoint, targetColumns, title):
  for targetColumn in targetColumns:
    print(f"Profiling dataset for {targetColumn}...")
    my_report  = sweetviz.analyze([df,'Train'], target_feat=targetColumn)
    ReportPath=f'{mountPoint}/MyDrive/ColabNotebooks/InsuranceClaims/Reports/{targetColumn}_profile_Report.html'
    my_report.show_html(ReportPath)
    mlflow.log_artifact(ReportPath, title)

In [None]:
def FeatureEngineering_Pre(df):
  df2=df.copy()
  df2.loc[ (df2["Region"]=="Auvergne") | \
          (df2["Region"]=="Limousin") | \
          (df2["Region"]=="Corse") |
          (df2["Region"]=="Champagne-Ardenne") |
          (df2["Region"]=="Alsace") |
          (df2["Region"]=="Franche-Comte") 
          ,"Region"]="Other"

  df2.loc[ (df2["VehBrand"]=="B13") | \
          (df2["VehBrand"]=="B14") 
          ,"VehBrand"]="B13+B14"


  df2.loc[ (df2["DrivAge"]>=83) ,"DrivAge"]=83

  df2.loc[ (df2["VehAge"]>=50) ,"VehAge"]=50

  df2.loc[(df2["ClaimAmount"] == 0) & (df2["ClaimNb"] >= 1), "ClaimNb"] = 0


  df2["ClaimNb"] = df2["ClaimNb"].clip(upper=4)
  df2["Exposure"] = df2["Exposure"].clip(upper=1)
  df2["ClaimAmount"] = df2["ClaimAmount"].clip(upper=200000)

  return df2

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder
from sklearn.preprocessing import StandardScaler, KBinsDiscretizer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline


def FeatureEngineering(df):
  df2=FeatureEngineering_Pre(df)

  log_scale_transformer = make_pipeline(
    FunctionTransformer(func=np.log), StandardScaler()
  )

  preprocessor = ColumnTransformer(
      transformers=[
          ("binned_numeric", KBinsDiscretizer(n_bins=10), ["VehAge", "DrivAge"]),
          (
              "onehot_categorical",
              OneHotEncoder(),
              ["VehBrand", "VehPower", "VehGas", "Region", "Area"],
          ),
          ("passthrough_numeric", "passthrough", ["BonusMalus"]),
          ("log_scaled_numeric", log_scale_transformer, ["Density"])

      ],
      remainder="drop",
  )

  clf = Pipeline(steps=[('preprocessor', preprocessor)])

  X = clf.fit_transform(df)

  x1=clf.named_steps['preprocessor'].transformers_[0][1].get_feature_names_out()
  x2=clf.named_steps['preprocessor'].transformers_[1][1].get_feature_names_out()
  columnNames=np.concatenate((x1, x2, np.array(["BonusMalus", "Density"])), axis=0)

  return X, df2, columnNames





In [None]:
def score_model_metrics(df_test, X_test, VariableWeight, VarTarget, estimator):
  
  if len(estimator) == 1:
    y_pred = estimator[0].predict(X_test)
  else:
    y_pred=estimator[0].predict(X_test) * estimator[1].predict(X_test)
  return (
            round(
              mean_absolute_error(
              df_test[VarTarget], y_pred, sample_weight=df_test[VariableWeight]
              ),3
            ),
            round(
            mean_squared_error(
                df_test[VarTarget], y_pred, sample_weight=df_test[VariableWeight]
            ),3)
          )

In [None]:
from sklearn.metrics import auc

def lorenz_curve(y_true, y_pred, exposure):
    y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
    exposure = np.asarray(exposure)
    y_true=y_true.astype(float)
    # order samples by increasing predicted risk:
    ranking = np.argsort(y_pred)
    ranked_exposure = exposure[ranking]
    ranked_pure_premium = y_true[ranking]
    cumulated_claim_amount = np.cumsum(ranked_pure_premium * ranked_exposure)
    cumulated_claim_amount /= cumulated_claim_amount[-1]
    cumulated_samples = np.linspace(0, 1, len(cumulated_claim_amount))
    return cumulated_samples, \
           cumulated_claim_amount,\
           1 - 2 * auc(cumulated_samples, cumulated_claim_amount)


1


In [None]:

from sklearn.model_selection import GridSearchCV

def algorithm_pipeline(X_train_data, X_test_data, y_train_data, y_test_data, 
                       model, param_grid, cv=10, scoring_fit='neg_mean_squared_error',
                       do_probabilities = False):
    gs = GridSearchCV(
        estimator=model,
        param_grid=param_grid, 
        cv=cv, 
        n_jobs=-1, 
        scoring=scoring_fit,
        verbose=2
    )
    fitted_model = gs.fit(X_train_data, y_train_data)
    
    if do_probabilities:
      pred = fitted_model.predict_proba(X_test_data)
    else:
      pred = fitted_model.predict(X_test_data)
    
    return fitted_model, pred

In [None]:
from sklearn.model_selection import RandomizedSearchCV

def algorithm_pipeline_random(X_train_data, X_test_data, y_train_data, y_test_data, 
                       model, param_grid, cv=10, scoring_fit='neg_mean_squared_error',
                       do_probabilities = False, iterations=10000):
    gs = RandomizedSearchCV(
        estimator=model,
        param_distributions=param_grid, 
        cv=cv, 
        n_jobs=-1, 
        scoring=scoring_fit,
        verbose=2,
        n_iter=iterations
    )
    #gs.set_params(param_grid)
    fitted_model = gs.fit(X_train_data, y_train_data)
    
    if do_probabilities:
      pred = fitted_model.predict_proba(X_test_data)
    else:
      pred = fitted_model.predict(X_test_data)
    
    return fitted_model, pred