[Dataset](https://www.kaggle.com/c/titanic)

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml import Transformer, Estimator
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, OneHotEncoder, StringIndexer, VectorAssembler

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE

In [None]:
#Sets the Spark master URL to connect to "spark://master:7077" to run on a Spark standalone cluster.
spark = SparkSession.builder.master("spark://cuong-Vostro-3578:7077").appName("Data preprocessing").getOrCreate()

# Read data

In [None]:
df = spark.read.option("inferSchema", "true").option("header", "true").csv("Data/titanic.csv")

# Basic exploration

In [None]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

## Shape of data

In [None]:
print((df.count(), len(df.columns)))

(891, 12)


## Data description

| Variable    | Definition  | Key |
| ----------- | ----------- |     |
| survival    | Survival    |   0 = No, 1 = Yes  |
| pclass      | Ticket class|1 = 1st, 2 = 2nd, 3 = 3rd|
| sex         | Sex         |     |
| Age         | Age in years|     |
| sibsp   | # of siblings / spouses aboard the Titanic|     |
| parch   | # of parents / children aboard the Titanic|     |
| ticket         | Ticket number	         |     |
| fare         | Passenger fare	         |     |
| cabin         | Cabin number	         |     |
| embarked         | Port of Embarkation|C = Cherbourg, Q = Queenstown, S = Southampton|

## Check duplicate

In [None]:
df.groupBy(df.columns)\
    .count()\
    .where(F.col('count') > 1)\
    .select(F.sum('count'))\
    .show()

+----------+
|sum(count)|
+----------+
|      null|
+----------+



## Check missing value

In [None]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



## Type of columns

In [None]:
df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

## Summary of numeric columns

In [None]:
columnNumeric = list(set([item[0] for item in df.dtypes if item[1].startswith(('int', 'double'))]) - set('Survived'))
df.select(columnNumeric).summary("count", "min", "25%", "50%", "75%", "max").show()

+-------+----+------+-----+--------+-----+--------+-----------+
|summary| Age|Pclass|SibSp|Survived|Parch|    Fare|PassengerId|
+-------+----+------+-----+--------+-----+--------+-----------+
|  count| 714|   891|  891|     891|  891|     891|        891|
|    min|0.42|     1|    0|       0|    0|     0.0|          1|
|    25%|20.0|     2|    0|       0|    0|  7.8958|        223|
|    50%|28.0|     3|    0|       0|    0| 14.4542|        446|
|    75%|38.0|     3|    1|       1|    0|    31.0|        669|
|    max|80.0|     3|    8|       1|    6|512.3292|        891|
+-------+----+------+-----+--------+-----+--------+-----------+



## Count distinct values of categorical columns

In [None]:
columnCat = [item[0] for item in df.dtypes if item[1].startswith(('string'))]
df.agg(*(F.countDistinct(F.col(c)).alias(c) for c in columnCat)).show()

+----+---+------+-----+--------+
|Name|Sex|Ticket|Cabin|Embarked|
+----+---+------+-----+--------+
| 891|  2|   681|  147|       3|
+----+---+------+-----+--------+



## Check imbalanced classes?

In [None]:
df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



# Split trainning set and validation set

In [None]:
train = df.sampleBy("Survived", fractions={0: 0.7, 1: 0.7}, seed=42)
val = df.subtract(train)
train.groupBy("Survived").count().show()
val.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  241|
|       0|  418|
+--------+-----+

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  101|
|       0|  131|
+--------+-----+



# Feature engineering

Extract title from `Name`, `Cabin`. Add  `Family_Size`, `Fare_Per_Person` and drop unused columns: `PassengerId`, `Ticket`, `SibSp`, `Parch`

In [None]:
class ColAdderDropper(Transformer, Estimator):
    def __init__(self, num_top_titles=1):
        self.num_top_titles = num_top_titles
        
    def _fit(self, X_df):
        title_col = X_df.withColumn('Name', F.regexp_extract(F.col('Name'), r'([a-zA-z]+)\.',1))
        self.title_counts_ = title_col.groupBy('Name').count().orderBy('count', ascending=False)
        titles = self.title_counts_.select('Name').collect()
        self.top_titles_ = titles[:max(1, min(self.num_top_titles, len(titles)))]
        self.top_titles_ = [ row.Name for row in self.top_titles_]

    def _transform(self, X_df):
        top_titles_ = self.top_titles_
        res = X_df.withColumn('Name', F.regexp_extract(F.col('Name'), r'([a-zA-z]+)\.',1))
        fn = F.udf(lambda x: x if x in top_titles_ else 'Others')
        res = res.withColumn('Name', fn(res['Name']))
        
        fn_1 = F.udf(lambda x: x[0] if x != None else 'Unknown')
        res = res.withColumn('Cabin', fn_1(res['Cabin']))    
        
        res = res.withColumn('Family_Size', sum(res[col] for col in ['SibSp','Parch']))
        
        res = res.withColumn('Fare_Per_Person',res['Fare'] / (res['Family_Size']+1))
                
        res = res.drop('PassengerId','Ticket','SibSp','Parch')
        return res

In [None]:
col_adderdropper = ColAdderDropper(num_top_titles=5)
col_adderdropper.fit(df)
col_adderdropper.transform(df).show(5)

+--------+------+----+------+----+-------+-------+--------+-----------+---------------+
|Survived|Pclass|Name|   Sex| Age|   Fare|  Cabin|Embarked|Family_Size|Fare_Per_Person|
+--------+------+----+------+----+-------+-------+--------+-----------+---------------+
|       0|     3|  Mr|  male|22.0|   7.25|Unknown|       S|          1|          3.625|
|       1|     1| Mrs|female|38.0|71.2833|      C|       C|          1|       35.64165|
|       1|     3|Miss|female|26.0|  7.925|Unknown|       S|          0|          7.925|
|       1|     1| Mrs|female|35.0|   53.1|      C|       S|          1|          26.55|
|       0|     3|  Mr|  male|35.0|   8.05|Unknown|       S|          0|           8.05|
+--------+------+----+------+----+-------+-------+--------+-----------+---------------+
only showing top 5 rows



## Handling numerical data

In [None]:
num_cols = ['Age', 'Family_Size', 'Fare_Per_Person','Fare']
imputer_num = Imputer(strategy='mean', inputCols=num_cols, outputCols=[c for c in num_cols])

## Handling categorical data

In [None]:
unorder_cate_cols = ['Sex', 'Embarked', 'Name','Cabin']
label_cate_cols = ['Sex_string_encoded', 'Embarked_string_encoded', 'Name_string_encoded','Cabin_string_encoded']
onehot_cate_cols = ['Sex_one_hot', 'Embarked_one_hot', 'Name_one_hot','Cabin_one_hot']
order_cate_cols = ['Pclass']

stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded", handleInvalid="keep") for c in unorder_cate_cols]

imputer_cate = Imputer(strategy='mode', inputCols=label_cate_cols+order_cate_cols, \
                       outputCols=[c for c in label_cate_cols+order_cate_cols])

stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in unorder_cate_cols]

Drop unused colums after Handling categorical data

In [None]:
class ColDropper(Transformer):
    def _transform(self, X_df):   
        res = X_df
        res = res.drop('Sex', 'Embarked', 'Name','Cabin', 'Sex_string_encoded', 'Embarked_string_encoded', 'Name_string_encoded','Cabin_string_encoded')
        return res
col_dropper = ColDropper()

## Put them together

In [None]:
vector_assembler = VectorAssembler(inputCols = num_cols+order_cate_cols+onehot_cate_cols, outputCol= "features")

## Modeling 

In [None]:
rf = RandomForestClassifier(labelCol = "Survived", featuresCol = "features", numTrees=250, subsamplingRate=0.5,\
                            minInstancesPerNode=3, maxDepth=4, seed=42)

## Create full pipeline

In [None]:
stage = [col_adderdropper] + [imputer_num] + stage_string + [imputer_cate]\
+ stage_one_hot + [col_dropper] + [vector_assembler] + [rf]
 
full_pipeline = Pipeline(stages=stage)

## Evaluation

In [None]:
model = full_pipeline.fit(train)
y_train =  model.transform(train)
y_val = model.transform(val)

evaluator= BCE(labelCol = "Survived", rawPredictionCol="probability", metricName= "areaUnderROC")
train_accuracy = evaluator.evaluate(y_train)
val_accuracy = evaluator.evaluate(y_val)

print(f"Train accuracy : {train_accuracy}")
print(f"Validation accuracy : {val_accuracy}")

Train accuracy : 0.8714387817903864
Validation accuracy : 0.9064696545990486


In [None]:
test = spark.read.option("inferSchema", "true").option("header", "true").csv("Data/test.csv")

In [None]:
y_test = model.transform(test)

PassengerId = test.select("PassengerId")
Survived = y_test.select("prediction")
Survived = Survived.withColumn("prediction", F.round(Survived["prediction"]).cast('integer'))

PassengerId = PassengerId.withColumn("id", F.monotonically_increasing_id())
Survived = Survived.withColumn("id", F.monotonically_increasing_id())

result = PassengerId.join(Survived, "id", "outer").drop("id").orderBy("PassengerId")
result = result.withColumnRenamed("prediction", "Survived")

result.toPandas().to_csv('Data/submission.csv', index=False)

Score on Kaggle: 0.79186 (top 8%)

In [None]:
spark.stop()

# Reference

[Feature engineering](https://triangleinequality.wordpress.com/2013/09/08/basic-feature-engineering-with-the-titanic-data/)