### 1. Step 1 - Verbose

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

customSchema = StructType([StructField("PassengerId", IntegerType(), True),
                           StructField("Survived", DoubleType(), True),
                           StructField("Pclass", IntegerType(), True),
                           StructField("Name", StringType(), True),
                           StructField("Sex", StringType(), True),
                           StructField("Age", DoubleType(), True),
                           StructField("SibSp", IntegerType(), True),
                           StructField("Parch", IntegerType(), True),
                           StructField("Ticket", StringType(), True),
                           StructField("Fare", DoubleType(), True),
                           StructField("Cabin", StringType(), True),
                           StructField("Embarked", StringType(), True)])

customSchema2 = StructType([StructField("PassengerId", IntegerType(), True),
                           StructField("Pclass", IntegerType(), True),
                           StructField("Name", StringType(), True),
                           StructField("Sex", StringType(), True),
                           StructField("Age", DoubleType(), True),
                           StructField("SibSp", IntegerType(), True),
                           StructField("Parch", IntegerType(), True),
                           StructField("Ticket", StringType(), True),
                           StructField("Fare", DoubleType(), True),
                           StructField("Cabin", StringType(), True),
                           StructField("Embarked", StringType(), True)])

train = sqlc.read.csv("train.csv", header=True, schema=customSchema)
test = sqlc.read.csv("test.csv", header=True, schema=customSchema2)

### 1. Step 1 - Short


In [None]:
customSchema = 'PassengerId int, Survived double, Pclass int, Name string, Sex string, Age float, SibSp int, Parch int, Ticket string, Fare float, Cabin string, Embarked string'
train = sqlc.read.csv("train.csv", header=True, schema=customSchema)

### Step 2 

In [None]:
# Calculating summary statistics and turning it into Pandas DF
train_desc = train.describe().toPandas().set_index('summary')

# Computing correlations between Survived and some features
print({col:train.stat.corr('Survived',col) for col in ['Pclass','Age','SibSp','Parch','Fare']})

# Checking which columns have NULL values
print({col:train.where(train[col].isNull()).count() for col in train.columns})

# Taking the mean age from the Pandas DF
ageMean = float(train_desc.loc['mean']['Age'])
print(ageMean)

embarked_mode = train.groupby('Embarked').count().orderBy('count', ascending=False).take(1)[0].Embarked
print(embarked_mode)

# Filling the Age in both train and test datasets
trainFilled = train.na.fill({'Age': ageMean, 'Embarked': embarked_mode})
testFilled = test.na.fill({'Age': ageMean, 'Embarked': embarked_mode})

from pyspark.sql import functions as F
train.groupby('Sex','PClass').agg(F.mean('age')).show()

### Step 3


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler

# Applying Estimators and Transformators
# Here, I actually fitted and transformed them on the training data
# with the purpose of being able to check the intermediate steps

indexer1 = (StringIndexer()
           .setInputCol("Embarked")
           .setOutputCol("nEmbarked")
           .setHandleInvalid('skip'))

indexed1 = indexer1.fit(trainFilled).transform(trainFilled)

indexer2 = (StringIndexer()
           .setInputCol("Sex")
           .setOutputCol("nSex")
           .setHandleInvalid('skip'))

indexed2 = indexer2.fit(indexed1).transform(indexed1)

encoder1 = OneHotEncoder().setInputCol("nEmbarked").setOutputCol("vEmbarked")
encoded1 = encoder1.transform(indexed2)

# Using a VectorAssembler to put together all feature columns
assembler = VectorAssembler(inputCols=['Pclass',
                                       'Age',
                                       'SibSp',
                                       'Parch',
                                       'Fare',
                                       'nSex',
                                       'vEmbarked'],
                            outputCol='vFeatures')

assembled = assembler.transform(encoded1)

# Keeping only the features and label columns to
assembled2 = assembled.select("Survived","vFeatures")

### Step 3 with R Formula


In [None]:
from pyspark.ml.feature import RFormula

rformula = RFormula(formula='Survived ~ Pclass + Age + SibSp + Parch + Fare + Sex + Embarked',
                    featuresCol='vFeatures', labelCol='Survived')
rmodel = rformula.fit(trainFilled)
features = rmodel.transform(trainFilled)
features.select('vFeatures', 'Survived').limit(5).toPandas()

### Step 4

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = (StandardScaler()
          .setInputCol("vFeatures")
          .setOutputCol("scaledFeat")
          .setWithStd(True)
          .setWithMean(True))

scalerModel = scaler.fit(assembled2)
scaled = scalerModel.transform(assembled2)

### Step 5

In [None]:
from pyspark.ml.pipeline import Pipeline

pipeline = Pipeline(stages=[indexer1,
                            indexer2,
                            encoder1,
                            assembler,
                            scaler])

pipeline = Pipeline(stages=[rformula, scaler])

model = pipeline.fit(trainFilled)
scaled = model.transform(trainFilled)

### Step 6

In [None]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.classification import RandomForestClassificationModel, LogisticRegressionModel

# Trains a RF classifier and make predictions
rfC = RandomForestClassifier(labelCol='Survived',
                        featuresCol='scaledFeat',
                        numTrees=50)

model = rfC.fit(scaled)

predictions = model.transform(scaled)

lr = LogisticRegression(labelCol='Survived',
                        featuresCol='scaledFeat',
                        rawPredictionCol='lr_raw',
                        probabilityCol='lr_prob',
                        predictionCol='lr_pred')

pipeline2 = Pipeline(stages=[indexer1,
                            indexer2,
                            encoder1,
                            assembler,
                            scaler,
                            rfC,
                            lr])

model2 = pipeline2.fit(trainFilled)
predictions = model2.transform(trainFilled)#.limit(5).toPandas()

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Defines an evaluator based on the metric areaUnderROC
evaluator = BinaryClassificationEvaluator().setLabelCol("Survived") \
                            .setRawPredictionCol("rawPrediction") \
                            .setMetricName("areaUnderROC")

# Evaluate the predictions
roc = evaluator.evaluate(predictions)

print(roc)

ev2 = (MulticlassClassificationEvaluator()
       .setLabelCol('Survived')
       .setPredictionCol('prediction')
       .setMetricName('accuracy'))

acc = ev2.evaluate(predictions)
print(acc)

lrmodel = model2.stages[-1]
summary = lrmodel.summary
summary.accuracy
summary.areaUnderROC

rfmodel = model2.stages[-2]
rfmodel.featureImportances
print(rfmodel.toDebugString)

### Step 6 using HandySpark


In [None]:
from handyspark.extensions import BinaryClassificationMetrics

bcm = BinaryClassificationMetrics(scoreAndLabels=predictions,
                                  scoreCol='probability',
                                  labelCol='Survived')

bcm.print_confusion_matrix(.7)
bcm.plot_roc_curve()

### Step 7 filling missing fare values with user-defined function and by mean class

In [None]:
fares_by_class = trainFilled.groupby('Pclass').mean('Fare').toPandas()
# Spark hates numpy - make sure we have a list of plain Python floats
fares_by_class = list(fares_by_class.sort_values(by='Pclass',ascending=True).values[:,1])
fares_by_class = [ float(x) for x in fares_by_class]

from pyspark.sql.functions import udf
@udf('float')
def fillFare(pclass, fare):
    if(fare is None):
        return fares_by_class[pclass-1]
    else:
        return fare

testFilled = testFilled.withColumn('Fare', fillFare('Pclass', 'Fare'))

### Step 7 

In [None]:
# Make the test set a "table"
testFilled.createOrReplaceTempView('test')

# Runs a series of SQL queries to get the number of null values in the test set
print({col: sqlc.sql("select * from test where " + col + " is null").count() for col in testFilled.columns})

# So, there is one null Fare, let's check it
sqlc.sql("select * from test where Fare is null").toPandas()

# Since the Fare is highly dependent on the class, it makes more sense to use the average for the given class
# But we need to take the average from the TRAINING set
trainFilled.createOrReplaceTempView('train')
avgFare = sqlc.sql("select mean(Fare) from train where Pclass = 3").take(1)[0][0]
print(avgFare)

# Fill the missing value with the calculated average
testFilled = testFilled.na.fill({'Fare': avgFare})

predictions_test = model2.transform(testFilled)
predictions_test.limit(5).toPandas()

### Step 8 

In [None]:
answers = sqlc.read.csv('titanic_answers.csv', header=True)
answers = answers.select('PassengerId',F.col('Survived').cast('Double'))

pred_answer = predictions_test.join(answers, on='PassengerId')

roc = evaluator.evaluate(pred_answer)
print(roc)
acc = ev2.evaluate(pred_answer)
print(acc)