In [1]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
data = sqlContext.read.format("com.databricks.spark.csv").option("header", "True").option("inferSchema", "true").option("delimiter",",").load("/FileStore/tables/train.csv")

In [2]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler



def parse(x):
  if x is not None:
    return 1.0
  else:
    return 0.0
  
myfunc = udf(parse, DoubleType() )
#https://stackoverflow.com/questions/41362295/sparkexception-values-to-assemble-cannot-be-null
def preprocessing(dat):
  df_final = dat.withColumn('Cabind',myfunc(dat['Cabin']) ).drop('Cabin').where(dat['Age'].isNotNull()).where(dat['Embarked'].isNotNull()).where(dat['Age'].isNotNull())
  df= df_final.select(['Pclass','Age','Parch','SibSp','Fare','Cabind','Embarked','Sex','Survived']).na.drop()
  categoricalColumns = ['Sex','Embarked']
  stages = [] # stages in our Pipeline
  for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
  label_stringIdx = StringIndexer(inputCol = "Survived", outputCol = "label")
  stages += [label_stringIdx]
  numericCols = ["Pclass", "Parch","SibSp", "Fare","Cabind"]
  assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
  assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
  stages += [assembler]
  cols = df.columns
# Create a Pipeline.
  pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
  pipelineModel = pipeline.fit(df)
  dataset = pipelineModel.transform(df)
  selectedcols = ["label", "features"]+cols
  dataset = dataset.select(selectedcols)
  
  return dataset

In [3]:
dataset = preprocessing(data)

In [4]:
dataset.count()

In [10]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)



In [12]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing


In [13]:
# Use test set here so we can measure the accuracy of our model on new data

predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
#print evaluator.evaluate(predictions)
def getaccuracy(predictions):
  predictionAndLabels = predictions.select("prediction", "label")
 
  print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

  
getaccuracy(predictions)

In [14]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [15]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)


In [16]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)# cvModel uses the best model found from the Cross Validation

getaccuracy(predictions)

In [17]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")


paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData) 

In [18]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [19]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
getaccuracy(predictions)

In [20]:
#confusion Matrix
label_and_pred = predictions.select("label", "prediction")
label_and_pred.rdd.zipWithIndex().countByKey()


In [21]:

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# create the trainer and set its parameters
layers = [8, 5, 4, 2]
mlp = MultilayerPerceptronClassifier(seed=12)
paramGrid = (ParamGridBuilder()
             .addGrid(mlp.layers, [[8,5,4,2],[8,5,4,2],[8,5,4,2]])
             .addGrid(mlp.maxIter, [20, 60 , 80 , 100])
             .build())

cv = CrossValidator(estimator=mlp, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData)
# compute accuracy on the test set


In [22]:
predictions = cvModel.transform(testData)
getaccuracy(predictions)