In [None]:
titanicdata = spark.read.csv("titanic.csv", inferSchema=True, header=True)

In [None]:
display(titanicdata)

In [None]:
from pyspark.ml.classification import (RandomForestClassifier,DecisionTreeClassifier)

In [None]:
Data2 = titanicdata.select([
 'Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [None]:
Data3 = Data2.na.drop()

In [None]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

In [None]:
gender_indexer = StringIndexer(inputCol='Sex',outputCol='genderindex')
gender_encoded = OneHotEncoder(inputCol='genderindex',outputCol='genderVec')

In [None]:
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='Embarkedindex')
embark_encoded = OneHotEncoder(inputCol='Embarkedindex',outputCol='EmbarkedVec')

In [None]:
assembler = VectorAssembler(inputCols=['Pclass', 'genderVec', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkedVec'], outputCol = 'features')

In [None]:
train_df,test_df = Data3.randomSplit([0.7,0.3])

In [None]:
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='Survived')

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,gender_encoded,embark_encoded,assembler,dtc])

In [None]:
fit_model = pipeline.fit(train_df)

In [None]:
test_results = fit_model.transform(test_df)

In [None]:
display(test_results)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [None]:
acc_eval = MulticlassClassificationEvaluator(metricName='accuracy', predictionCol="prediction", labelCol="Survived")

In [None]:
acc_eval.evaluate(test_results)

In [None]:
from sklearn.metrics import classification_report,confusion_matrix
y_true = test_results.select(['Survived']).collect()
y_pred = test_results.select(['prediction']).collect()
print(classification_report(y_true, y_pred))  

In [None]:
rfc = RandomForestClassifier(numTrees=30,featuresCol='features', labelCol='Survived',maxDepth=8)

In [None]:
pipeline2 = Pipeline(stages=[gender_indexer,embark_indexer,gender_encoded,embark_encoded,assembler,rfc])

In [None]:
fit_model2 = pipeline2.fit(train_df)

In [None]:
test_results2 = fit_model2.transform(test_df)

In [None]:
display(test_results2)

In [None]:
acc_eval.evaluate(test_results2)

In [None]:
from sklearn.metrics import classification_report,confusion_matrix
y_true = test_results2.select(['Survived']).collect()
y_pred = test_results2.select(['prediction']).collect()
print(classification_report(y_true, y_pred))  

In [None]:
pipeline3 = Pipeline(stages=[gender_indexer,embark_indexer,gender_encoded,embark_encoded,assembler])

In [None]:
fit_model3 = pipeline3.fit(train_df)

In [None]:
model_data = fit_model3.transform(train_df)

In [None]:
display(model_data)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
rfc = RandomForestClassifier(labelCol="Survived", featuresCol="features")
evaluator = BinaryClassificationEvaluator(labelCol="Survived")
paramGrid = (ParamGridBuilder()
             .addGrid(rfc.maxDepth, [4, 6, 8])
             .addGrid(rfc.numTrees, [30,50,80])
             .build())
cv = CrossValidator(estimator=rfc, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
# Run cross validations.
cvModel = cv.fit(model_data)

In [None]:
cvModel.avgMetrics

In [None]:
bestPipeline = cvModel.bestModel

In [None]:
bestPipeline.extractParamMap()