In [1]:
train_datapath='/FileStore/tables/train.csv'
test_datapath='/FileStore/tables/test.csv'
traindata_rdd = sc.textFile(train_datapath)
testdata_rdd = sc.textFile(test_datapath)

In [2]:
traindata_rdd.take(3)

In [3]:
testdata_rdd.take(3)

In [4]:
def parseTrain(rdd):
  
  
 

  header = rdd.first()
   
  body = rdd.filter(lambda r: r!=header)
  def parseRow(row):
    
       
    row_list = row.replace('"','').split(",")
       
    row_tuple = tuple(row_list)
    return row_tuple
  
  
 
  rdd_parsed = body.map(parseRow)
 
  colnames = header.split(",")
  colnames.insert(3,'FirstName')
 
  return rdd_parsed.toDF(colnames)

 
def parseTest(rdd):
  
  header = rdd.first()
  body = rdd.filter(lambda r: r!=header)
 
  def parseRow(row):
    row_list = row.replace('"','').split(",")
    row_tuple = tuple(row_list)
    return row_tuple
  rdd_parsed = body.map(parseRow)
  colnames = header.split(",")
  colnames.insert(2,'FirstName')
  return rdd_parsed.toDF(colnames)
 
traindata_df = parseTrain(traindata_rdd)
testdata_df = parseTest(testdata_rdd)

In [5]:
traindata_df.show(3)

In [6]:
from pyspark.sql.functions import lit, col
traindata_df = traindata_df.withColumn('Mark',lit('train'))
testdata_df = (testdata_df.withColumn('Survived',lit(0))
                  .withColumn('Mark',lit('test')))
testdata_df = testdata_df[traindata_df.columns]
## Append Test data to Train data
df = traindata_df.unionAll(testdata_df)

In [7]:
df.show(20)

In [8]:
df.show(2)

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
## created user defined function to extract title
getTitlename = udf(lambda name: name.split('.')[0].strip(),StringType())
df = df.withColumn('Title', getTitlename(df['Name']))
 
df.select('Name','Title').show(3)

In [10]:
df.show(2)

In [11]:
df = (df.withColumn('Age',df['Age'].cast('double'))
            .withColumn('SibSp',df['SibSp'].cast('double'))
            .withColumn('Parch',df['Parch'].cast('double'))
            .withColumn('Fare',df['Fare'].cast('double'))
            .withColumn('Survived',df['Survived'].cast('double'))
            )
df.printSchema()

In [12]:
numVars = ['Survived','Age','SibSp','Parch','Fare']
def countNullvalues(df,var):
    return df.where(df[var].isNull()).count()
 
missing_Variables = {var: countNullvalues(df,var) for var in numVars}
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
df = df.na.fill({'Age':age_mean,'Fare':fare_mean})
df.show(5)

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumnsnames = ["Sex"]
stages = [] # stages in our Pipeline
for categoricalColdata in categoricalColumnsnames:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColdata, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColdata+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [14]:
label_stringIdxname = StringIndexer(inputCol = "Survived", outputCol = "label")
stages += [label_stringIdxname]

In [15]:
numericColsname = [ "Age", "SibSp", "Parch","Fare"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumnsnames) + numericColsname
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [16]:
df.show()

In [17]:
colname = df.columns
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

# Keep relevant columns
selectedcols = ["label", "features"] + colname
dataset = dataset.select(selectedcols)
#display(dataset)
#type(dataset)
dataset.toPandas()

In [18]:
df.show(2)

In [19]:
train_df = dataset.where(df.Mark =='train')
test_df = dataset.where(df.Mark =='test')

In [20]:
(trainingsetdata, Validatesetdata) = train_df.randomSplit([0.70, 0.30], seed = 121)
print trainingsetdata.count()
print Validatesetdata.count()

In [21]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features")


# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10,15])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_df)

In [22]:
predict = cvModel.transform(Validatesetdata)
predictions.printSchema()

In [23]:
actual_datalist = predictions.select('Survived').collect()
prediction_datalist = predictions.select('prediction').collect()
actual_dataarray = [float(i.Survived) for i in actual_datalist]
predict_dataarray = [float(i.prediction) for i in prediction_datalist]

def accuracy_calculation(list1, list2):
  length = len(list1)
  accum = 0 
  for i in range (length):
    if list1[i] == list2[i]:
      accum += 1
      accuracy = float(accum)/float(length)
  return accuracy 
accuracy_calculation(actual_datalist,prediction_datalist)

In [24]:
print 'Model Intercept: ', cvModel.bestModel.intercept

In [25]:
weights = cvModel.bestModel.coefficients
weights = map(lambda w: (float(w),), weights)  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
weightsDF.toPandas()

In [26]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
(trainingDataset, ValidateDataset) = train_df.randomSplit([0.70, 0.30], seed = 121)
print trainingDataset.count()
print ValidateDataset.count()

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator)
cvModel = cv.fit(train_df)

In [28]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth
predictions_Dtree = cvModel.transform(ValidateDataset)

In [29]:
actual_datalist = predictions_Dtree.select('Survived').collect()
prediction_datalist = predictions_Dtree.select('prediction').collect()
actual_dataarray = [float(i.Survived) for i in actual_datalist]
predict_dataarray = [float(i.prediction) for i in prediction_datalist]


accuracy_cal(actual_datalist,prediction_datalist)

In [30]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
(trainingDataset, ValidateDataset) = train_df.randomSplit([0.70, 0.30])

In [31]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingDataset)

In [32]:
predictions_RF = cvModel.transform(ValidateDataset)

In [33]:
evaluator.evaluate(predictions)

In [34]:
actual_datalist = predictions_RF.select('Survived').collect()
prediction_datalist = predictions_RF.select('prediction').collect()
actual_dataarray = [float(i.Survived) for i in actual_datalist]
predict_dataarray = [float(i.prediction) for i in prediction_datalist]


accuracy_cal(actual_datalist,prediction_datalist)

In [35]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [36]:
(trainingDataset, ValidateDataset) = train_df.randomSplit([0.70, 0.30], seed = 122)
layers = [24, 5, 4, 2]
mp = MultilayerPerceptronClassifier(layers = layers)

In [37]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(mp.maxIter, [50,100,200])
             .addGrid(mp.blockSize, [128,256,512])
             .addGrid(mp.seed, [1234,3223])
             .build())
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

cv = CrossValidator(estimator=mp, estimatorParamMaps=paramGrid, evaluator=evaluator)


# Run cross validations.  
cvModel = cv.fit(trainingDataset)

In [38]:
predictions_mc = cvModel.transform(ValidateData)

In [39]:
evaluator.evaluate(predictions_mc)

In [40]:
bestModelcalculation = cvModel.bestModel
# Generate predictions for entire dataset
finalPredictions = bestModelcalculation.transform(ValidateData)
finalPredictions.printSchema()
# Evaluate best model
evaluator.evaluate(finalPredictions)