# 🚀 Spaceship Titanic

## Description:
Welcome to the year 2912, where your data science skills are needed to solve a cosmic mystery. We've received a transmission from four lightyears away and things aren't looking good.

The Spaceship Titanic was an interstellar passenger liner launched a month ago. With almost 13,000 passengers on board, the vessel set out on its maiden voyage transporting emigrants from our solar system to three newly habitable exoplanets orbiting nearby stars.

While rounding Alpha Centauri en route to its first destination—the torrid 55 Cancri E—the unwary Spaceship Titanic collided with a spacetime anomaly hidden within a dust cloud. Sadly, it met a similar fate as its namesake from 1000 years before. Though the ship stayed intact, almost half of the passengers were transported to an alternate dimension!

To help rescue crews and retrieve the lost passengers, you are challenged to predict which passengers were transported by the anomaly using records recovered from the spaceship’s damaged computer system.

Help save them and change history!

## ⚙️ Setup

In [None]:
pip install --upgrade --force-reinstall git+https://github.com/FlorianTeich/kaggle_tools

In [None]:
import kaggle_tools

kaggle_tools.get_data_path()

In [None]:
!pip install sentence-transformers pygwalker kedro mlflow kedro-viz pyspark==3.3.0 xgboost==2.0.2 hyperopt

## 📊 Exploratory Data Analysis

In [None]:
kaggle_tools.get_dataset()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *


spark = SparkSession \
    .builder \
    .appName("spaceship-titanic") \
    .getOrCreate()

train = spark.read.option("header", True).csv(str(kaggle_tools.get_data_path()) + "/input/spaceship-titanic/train.csv") \
            .withColumn("Transported", col("Transported").cast(BooleanType()))
train.show()

## ✨ Feature Extraction

In [None]:
import mlflow
import shutil
import subprocess
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sentence_transformers import SentenceTransformer
from functools import partial
from pyspark import keyword_only
from pyspark.ml import UnaryTransformer, Pipeline, Transformer, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, VectorSizeHint
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf


subprocess.Popen(["mlflow","ui"])

sentences = ["This is an example sentence", "Each sentence is converted"]

model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
embeddings = model.encode(sentences)

        
class StringTransformer(
    Transformer,
    HasInputCol,
    HasOutputCol,
    DefaultParamsReadable,
    DefaultParamsWritable,
):
    default_value = Param(
        Params._dummy(),
        "default_value",
        "default_value",
        typeConverter=TypeConverters.toString,
    )

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, default_value=None):
        super(StringTransformer, self).__init__()
        self.default_value = Param(self, "default_value", "unknown")
        self._setDefault(default_value=[])
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, default_value=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setDefaultValue(self, value):
        return self._set(default_value=value)

    def getDefaultValue(self):
        return self.getOrDefault(self.default_value)

    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        from pyspark.sql.functions import col, udf
        from pyspark.sql.types import StringType
        from sentence_transformers import SentenceTransformer
        
        out_col = self.getOutputCol()
        in_col = self.getInputCol()
        
        model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
    
        upperCaseUDF = udf(lambda x:model.encode(str(x), show_progress_bar=False).tolist(), ArrayType(FloatType()))

        return dataset.withColumn(
            out_col,
            upperCaseUDF(col(in_col)),
        )
    
        
class ColumnCastTransformer(
        Transformer, HasInputCol, HasOutputCol,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(ColumnCastTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        from pyspark.sql.functions import lit

        if "Transported" not in dataset.columns:
            dataset = dataset.withColumn("Transported", lit(None).cast(BooleanType()))

        return dataset.withColumn("CryoSleep", dataset["CryoSleep"].cast(BooleanType())) \
            .withColumn("VIP", dataset["VIP"].cast(BooleanType())) \
            .withColumn("Age", dataset["Age"].cast(DoubleType())) \
            .withColumn("RoomService", dataset["RoomService"].cast(DoubleType())) \
            .withColumn("FoodCourt", dataset["FoodCourt"].cast(DoubleType())) \
            .withColumn("ShoppingMall", dataset["ShoppingMall"].cast(DoubleType())) \
            .withColumn("Spa", dataset["Spa"].cast(DoubleType())) \
            .withColumn("VRDeck", dataset["VRDeck"].cast(DoubleType()))


class ListToArrayTransformer(
        Transformer, HasInputCol, HasOutputCol,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(ListToArrayTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        out_col = self.getOutputCol()
        return dataset.withColumn(out_col, array_to_vector(self.getInputCol()))
    
cct = ColumnCastTransformer()
    
homeplanetIndexer = StringIndexer(inputCol="HomePlanet", outputCol="HomePlanet_processed",
    stringOrderType="frequencyDesc", handleInvalid="keep")
destinationIndexer = StringIndexer(inputCol="Destination", outputCol="Destination_processed",
    stringOrderType="frequencyDesc", handleInvalid="keep")
ohe_homeplanet = OneHotEncoder().setInputCol("HomePlanet_processed").setOutputCol("HomePlanet_onehot")
ohe_destination = OneHotEncoder().setInputCol("Destination_processed").setOutputCol("Destination_onehot")

name_ = StringTransformer().setInputCol('Name').setOutputCol("Name_Processed")
cabin_ = StringTransformer().setInputCol('Cabin').setOutputCol("Cabin_Processed")

sizeHintName = VectorSizeHint(inputCol="Name_Processed_vec", size=384)
sizeHintCabin = VectorSizeHint(inputCol="Cabin_Processed_vec", size=384)

name_vec_ = ListToArrayTransformer().setInputCol('Name_Processed').setOutputCol("Name_Processed_vec")
cabin_vec_ = ListToArrayTransformer().setInputCol('Cabin_Processed').setOutputCol("Cabin_Processed_vec")

vecAssembler = VectorAssembler(handleInvalid="keep", outputCol="features")
vecAssembler.setInputCols(["Name_Processed_vec",
                           "Cabin_Processed_vec",
                           "CryoSleep",
                           "HomePlanet_onehot",
                           "Destination_onehot",
                           "Age",
                           "VIP",
                           "RoomService",
                           "FoodCourt",
                           "ShoppingMall",
                           "Spa",
                           "VRDeck",
                          ])

pipeline = Pipeline(stages=[cct, name_, cabin_, homeplanetIndexer, destinationIndexer, ohe_homeplanet, ohe_destination, 
                            name_vec_, cabin_vec_, sizeHintName, sizeHintCabin, vecAssembler])

In [None]:
try:
    shutil.rmtree(str(kaggle_tools.get_data_path()) + "/working/pipeline")
except:
    pass

pipelineModel = pipeline.fit(train)
pipelineModel.save(str(kaggle_tools.get_data_path()) + "/working/pipeline")

In [None]:
try:
    mlflow.create_experiment("titanic")
except:
    pass
mlflow.set_experiment("titanic")

train = spark.read.option("header", True).csv(str(kaggle_tools.get_data_path()) + "/input/spaceship-titanic/train.csv") \
            .withColumn("Transported", col("Transported").cast(BooleanType()))
test = spark.read.option("header", True).csv(str(kaggle_tools.get_data_path()) + "/input/spaceship-titanic/test.csv")
sample_submission = spark.read.option("header", True).csv(str(kaggle_tools.get_data_path()) + "/input/spaceship-titanic/sample_submission.csv")

pipelineModel = PipelineModel.load(str(kaggle_tools.get_data_path()) + "/working/pipeline")

try:
    shutil.rmtree(str(kaggle_tools.get_data_path()) + "/working/train_data")
except:
    pass
processed_train_df = pipelineModel.transform(train)
processed_train_df.write.parquet("working/train_data")

try:
    shutil.rmtree(str(kaggle_tools.get_data_path()) + "/working/test_data")
except:
    pass
processed_test_df = pipelineModel.transform(test)
processed_test_df.write.parquet("working/test_data")

In [None]:
# Read transformed data from files
processed_train_df = spark.read.parquet("working/train_data")
processed_test_df = spark.read.parquet("working/test_data")

In [None]:
feats_test = processed_test_df.select("features", "Transported").collect()
feats_train = processed_train_df.select("features", "Transported").collect()

In [None]:
X_train_and_val_transformed = np.stack([feats_train[x]["features"].toArray() for x in range(len(feats_train))])
X_test = np.stack([feats_test[x]["features"].toArray() for x in range(len(feats_test))])
y_train_and_val = np.stack([feats_train[x]["Transported"] for x in range(len(feats_train))])

# Training

In [None]:
from hyperopt import fmin, tpe, hp, anneal, Trials
from sklearn.model_selection import KFold, cross_val_score

X_train, X_val, y_train, y_val = train_test_split(X_train_and_val_transformed, y_train_and_val, test_size=0.1, random_state=42)

random_state = 42
n_iter = 2
num_folds = 5
kf = KFold(n_splits=num_folds, random_state=random_state, shuffle=True)

def gb_acc_cv(params, random_state=random_state, cv=kf, X=X_train, y=y_train):
    # the function gets a set of variable parameters in "param"
    params = {'n_estimators': int(params['n_estimators']), 
            'max_depth': int(params['max_depth']), 
            'learning_rate': params['learning_rate']}
    
    # we use this params to create a new LGBM Regressor
    model = XGBClassifier(random_state=random_state, **params)
    
    # and then conduct the cross validation with the same folds as before
    score = cross_val_score(model, X, y, cv=cv, scoring="accuracy", n_jobs=-1).mean()

    return score

# possible values of parameters
space={'n_estimators': hp.quniform('n_estimators', 10, 500, 1),
       'max_depth' : hp.quniform('max_depth', 2, 20, 1),
       'learning_rate': hp.loguniform('learning_rate', -5, 0)
      }

# trials will contain logging information
trials = Trials()

best=fmin(fn=gb_acc_cv, # function to optimize
          space=space, 
          algo=anneal.suggest, # optimization algorithm, hyperotp will select its parameters automatically
          max_evals=n_iter, # maximum number of iterations
          trials=trials, # logging
          rstate=np.random.default_rng(random_state) # fixing random state for the reproducibility
         )
mlflow.set_tracking_uri("http://127.0.0.1:5000")
with mlflow.start_run():
    dataset_train = mlflow.data.from_numpy(np.hstack([X_train, np.expand_dims(y_train, 1)]))
    dataset_val = mlflow.data.from_numpy(np.hstack([X_val, np.expand_dims(y_val, 1)]))
    # computing the score on the test set
    model = XGBClassifier(random_state=random_state, n_estimators=int(best['n_estimators']),
                        max_depth=int(best['max_depth']),learning_rate=best['learning_rate'])
    model.fit(X_train, y_train)
    sa_test_score=accuracy_score(y_val, model.predict(X_val))
    mlflow.xgboost.log_model(model, artifact_path="model")
    mlflow.log_params(model.get_params())
    mlflow.log_metric("accuracy", sa_test_score)
    mlflow.log_input(dataset_train, context="training")
    mlflow.log_input(dataset_val, context="validation")

print("Best Acc {:.3f} params {}".format(gb_acc_cv(best), best))

In [None]:
model = XGBClassifier(random_state=random_state, n_estimators=int(best['n_estimators']),
                      max_depth=int(best['max_depth']),learning_rate=best['learning_rate'])
model.fit(X_train_and_val_transformed, y_train_and_val)
preds = model.predict(X_test)
preds