In [0]:
import numpy as np
import pandas as pd 
from pyspark.sql.functions import *
from databricks import feature_store



In [0]:

#Alter this path to point to the location of where you have uploaded your train.csv
dbfs_path = '/FileStore/tables/titanic_train.csv'
df_train = spark.read.csv(dbfs_path, header = "True", inferSchema="True")

In [0]:
# Display training data
display(df_train)

passenger_id,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest,survived
1216,3,"Smyth, Miss. Julia",female,,0,0,335432,7.7333,,Q,13,,,1
699,3,"Cacic, Mr. Luka",male,38.0,0,0,315089,8.6625,,S,,,Croatia,0
1267,3,"Van Impe, Mrs. Jean Baptiste (Rosalie Paula Govaert)",female,30.0,1,1,345773,24.15,,S,,,,0
449,2,"Hocking, Mrs. Elizabeth (Eliza Needs)",female,54.0,1,3,29105,23.0,,S,4,,"Cornwall / Akron, OH",1
576,2,"Veal, Mr. James",male,40.0,0,0,28221,13.0,,S,,,"Barre, Co Washington, VT",0
1083,3,"Olsen, Mr. Henry Margido",male,28.0,0,0,C 4001,22.525,,S,,173.0,,0
898,3,"Johnson, Mr. William Cahoone Jr",male,19.0,0,0,LINE,0.0,,S,,,,0
560,2,"Sinkkonen, Miss. Anna",female,30.0,0,0,250648,13.0,,S,10,,"Finland / Washington, DC",1
1079,3,"Ohman, Miss. Velin",female,22.0,0,0,347085,7.775,,S,C,,,1
908,3,"Jussila, Miss. Mari Aina",female,21.0,1,0,4137,9.825,,S,,,,0


In [0]:
#Renaming Columns

 

df_train = (df_train 
                 .withColumnRenamed("Pclass", "PassengerClass") 
                 .withColumnRenamed("SibSp", "SiblingsSpouses") 
                 .withColumnRenamed("Parch", "ParentsChildren")
                  .withColumnRenamed("name","Name")
                  .withColumnRenamed("cabin","Cabin")
                 
           )


In [0]:
# MAGIC %md # 4. Feature Engineering 🛠
# MAGIC Apply feature engineering steps 

In [0]:
df_train.select("Name").show()

+--------------------+
|                Name|
+--------------------+
|  Smyth, Miss. Julia|
|     Cacic, Mr. Luka|
|Van Impe, Mrs. Je...|
|Hocking, Mrs. Eli...|
|     Veal, Mr. James|
|Olsen, Mr. Henry ...|
|Johnson, Mr. Will...|
|Sinkkonen, Miss. ...|
|  Ohman, Miss. Velin|
|Jussila, Miss. Ma...|
|Widener, Mr. Harr...|
|Bucknell, Mrs. Wi...|
|Potter, Mrs. Thom...|
|"Hocking, Miss. E...|
|Abbott, Mr. Rossm...|
|Jensen, Mr. Niels...|
|Maybery, Mr. Fran...|
|  McCoy, Mr. Bernard|
|Bowerman, Miss. E...|
|Olsson, Mr. Oscar...|
+--------------------+
only showing top 20 rows



In [0]:
# These titles provides information on social status, profession, etc.
# Extract Title from Name, store in column "Title"
df = df_train.withColumn("Title",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))


In [0]:
# Sanitise and group titles
# 'Mlle', 'Mme', 'Ms' --> Miss
# 'Lady', 'Dona', 'Countess' --> Mrs
# 'Dr', 'Master', 'Major', 'Capt', 'Sir', 'Don' --> Mr
# 'Jonkheer' , 'Col' , 'Rev' --> Other
df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Master','Major','Lady','Dona','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
                ['Miss','Miss','Miss','Mr','Mr', 'Mr', 'Mrs',  'Mrs', 'Mrs', 'Other',  'Other','Other','Mr','Mr','Mr'])


In [0]:
display(df)

passenger_id,PassengerClass,Name,sex,age,SiblingsSpouses,ParentsChildren,ticket,fare,Cabin,embarked,boat,body,home.dest,survived,Title
1216,3,"Smyth, Miss. Julia",female,,0,0,335432,7.7333,,Q,13,,,1,Miss
699,3,"Cacic, Mr. Luka",male,38.0,0,0,315089,8.6625,,S,,,Croatia,0,Mr
1267,3,"Van Impe, Mrs. Jean Baptiste (Rosalie Paula Govaert)",female,30.0,1,1,345773,24.15,,S,,,,0,Mrs
449,2,"Hocking, Mrs. Elizabeth (Eliza Needs)",female,54.0,1,3,29105,23.0,,S,4,,"Cornwall / Akron, OH",1,Mrs
576,2,"Veal, Mr. James",male,40.0,0,0,28221,13.0,,S,,,"Barre, Co Washington, VT",0,Mr
1083,3,"Olsen, Mr. Henry Margido",male,28.0,0,0,C 4001,22.525,,S,,173.0,,0,Mr
898,3,"Johnson, Mr. William Cahoone Jr",male,19.0,0,0,LINE,0.0,,S,,,,0,Mr
560,2,"Sinkkonen, Miss. Anna",female,30.0,0,0,250648,13.0,,S,10,,"Finland / Washington, DC",1,Miss
1079,3,"Ohman, Miss. Velin",female,22.0,0,0,347085,7.775,,S,C,,,1,Miss
908,3,"Jussila, Miss. Mari Aina",female,21.0,1,0,4137,9.825,,S,,,,0,Miss


In [0]:
# MAGIC ###  Passenger's Cabins
# Did they have a Cabin?

df = df.withColumn('Has_Cabin', df.Cabin.isNotNull())


In [0]:
# Create a new feature called "Family_size". This feature is the summation of ParentsChildren (parents/children) and SiblingsSpouses(siblings/spouses). It gives us a combined data so that we can check if survival rate have anything to do with family size of the passengers
df = df.withColumn("Family_Size", col('SiblingsSpouses') + col('ParentsChildren') + 1)

In [0]:
titanic_feature = df.select("Name","Cabin","Title","Has_Cabin","Family_Size")

In [0]:
display(titanic_feature)

Name,Cabin,Title,Has_Cabin,Family_Size
"Smyth, Miss. Julia",,Miss,False,1
"Cacic, Mr. Luka",,Mr,False,1
"Van Impe, Mrs. Jean Baptiste (Rosalie Paula Govaert)",,Mrs,False,3
"Hocking, Mrs. Elizabeth (Eliza Needs)",,Mrs,False,5
"Veal, Mr. James",,Mr,False,1
"Olsen, Mr. Henry Margido",,Mr,False,1
"Johnson, Mr. William Cahoone Jr",,Mr,False,1
"Sinkkonen, Miss. Anna",,Miss,False,1
"Ohman, Miss. Velin",,Miss,False,1
"Jussila, Miss. Mari Aina",,Miss,False,2


In [0]:
 #use Feature Store library to create new feature tables

First, create the database where the feature tables will be stored.

In [0]:
%sql 
CREATE DATABASE IF NOT EXISTS feature_store_titanic;



In [0]:
%sql
drop database feature_store_titanic CASCADE ;

d ## Instantiate a Feature Store client and create table

In [0]:
fs = feature_store.FeatureStoreClient()
fs.create_table(
    name="feature_store_titanic.titanic_passengers_features_2",
    primary_keys = ["Name","Cabin"],
    df = titanic_feature,
    description = "Titanic Passenger Features")

2023/02/23 15:54:58 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.feature_store_titanic.titanic_passengers_features_2'.
  yield prop, self.__getattribute__(prop)
Out[61]: <FeatureTable: keys=['Name', 'Cabin'], tags={}>

In [0]:
#remove the duplicate records found while creating the feature store table
#titanic_feature.groupBy("Name","cabin").agg(count("*").alias("count")).filter(col("count")>1).show()
titanic_feature=titanic_feature.where("Name <> 'Kelly, Mr. James'")
#titanic_feature.filter(col("Name")=="Kelly, Mr. James").show()
 

In [0]:
# Write the features DataFrame to the feature store table
# 'merge' - upserts rows in df into the feature table.
# 'overwrite' - updates whole table.
fs.write_table(
  name = "feature_store_titanic.titanic_passengers_features_2",
  df = titanic_feature,
  mode = "merge")

d
  ##Get feature table’s metadata

In [0]:
ft = fs.get_table("feature_store_titanic.titanic_passengers_features_2")
print (ft.keys)
print (ft.description)

['Name', 'Cabin']
Titanic Passenger Features
  print (ft.keys)


md
  ## Reads contents of feature table

In [0]:
df = fs.read_table("feature_store_titanic.titanic_passengers_features_2")
display(df)

df.count()

Name,Cabin,Title,Has_Cabin,Family_Size
"Smyth, Miss. Julia",,Miss,False,1
"Cacic, Mr. Luka",,Mr,False,1
"Van Impe, Mrs. Jean Baptiste (Rosalie Paula Govaert)",,Mrs,False,3
"Hocking, Mrs. Elizabeth (Eliza Needs)",,Mrs,False,5
"Veal, Mr. James",,Mr,False,1
"Olsen, Mr. Henry Margido",,Mr,False,1
"Johnson, Mr. William Cahoone Jr",,Mr,False,1
"Sinkkonen, Miss. Anna",,Miss,False,1
"Ohman, Miss. Velin",,Miss,False,1
"Jussila, Miss. Mari Aina",,Miss,False,2


Out[64]: 1505

md 
  #  ML model

md
  ## Create training dataset

In [0]:
from databricks.feature_store import FeatureLookup

titanic_features_table = "feature_store_titanic.titanic_passengers_features_2"

# We choose to only use 2 of the newly created features
titanic_features_lookups = [
    FeatureLookup( 
      table_name = titanic_features_table,
      feature_names = "Title",
      lookup_key = ["Name","Cabin"],
    ),
    FeatureLookup( 
      table_name = titanic_features_table,
      feature_names = "Has_Cabin",
      lookup_key = ["Name","Cabin"],
    ),
#     FeatureLookup( 
#       table_name = titanic_features_table,
#       feature_names = "Family_Size",
#       lookup_key = ["Name"],
#     ),
]

In [0]:
 titanic_features_lookups

Out[66]: [<FeatureLookup: feature_name=None, lookup_key=['Name'], output_name=None, table_name='feature_store_titanic.titanic_passengers_features_2', timestamp_lookup_key=None>,
 <FeatureLookup: feature_name=None, lookup_key=['Name', 'Cabin'], output_name=None, table_name='feature_store_titanic.titanic_passengers_features_2', timestamp_lookup_key=None>]

In [0]:
# Create the training set that includes the raw input data merged with corresponding features from both feature tables
exclude_columns = [ "Name","PassengerId","ParentsChildren","SiblingsSpouses","ticket","Cabin","home.dest"]
training_set = fs.create_training_set(
                df_train,
                feature_lookups = titanic_features_lookups,
                label = 'survived',
                exclude_columns = exclude_columns
                )

md
  ## Train Model

md
  Train a LightGBM model on the data, then log the model with MLFlow. The model will be packaged with feature metadata.

In [0]:

from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient
import lightgbm as lgb
import mlflow
from sklearn.metrics import accuracy_score

# Load the TrainingSet into a dataframe which can be passed into sklearn for training a model
training_df = training_set.load_df()


In [0]:

# End any existing runs (in the case this notebook is being run for a second time)
mlflow.end_run()

# Start an mlflow run, which is needed for the feature store to log the model
mlflow.start_run(run_name="lgbm_feature_store") 

data = training_df.toPandas()
data_dum = pd.get_dummies(data, drop_first=True)

# Extract features & labels
X = data_dum.drop(["survived"], axis=1)
y = data_dum.survived

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

lgb_params = {
            'n_estimators': 50,
            'learning_rate': 1e-3,
            'subsample': 0.27670395476135673,
            'colsample_bytree': 0.6,
            'reg_lambda': 1e-1,
            'num_leaves': 50, 
            'max_depth': 8, 
            }

mlflow.log_param("hyper-parameters", lgb_params)
lgbm_clf  = lgb.LGBMClassifier(**lgb_params)
lgbm_clf.fit(X_train,y_train)
lgb_pred = lgbm_clf.predict(X_test)

accuracy=accuracy_score(lgb_pred, y_test)
print('LightGBM Model accuracy score: {0:0.4f}'.format(accuracy_score(y_test, lgb_pred)))
mlflow.log_metric('accuracy', accuracy)

LightGBM Model accuracy score: 0.6000


In [0]:
fs.log_model(
  lgbm_clf,
  artifact_path = "model_packaged",
  flavor = mlflow.sklearn,
  training_set = training_set,
  registered_model_name = "titanic_packaged_v1"
)

Successfully registered model 'titanic_packaged_v1'.
2023/02/23 16:52:12 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: titanic_packaged_v1, version 1
Created version '1' of model 'titanic_packaged_v1'.
