In [1]:
# Load data
dataDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", ",").option("header", True).load("/FileStore/tables/risk_factors_cervical_cancer.csv")

In [2]:
display(dataDF)

In [3]:
from pyspark.sql.functions import when, lit, col
# Majority of rows are missing values for these columns so we drop them
dataDF = dataDF.drop("STDs: Time since first diagnosis").drop("STDs: Time since last diagnosis").drop('Hinselmann').drop('Schiller').drop('Citology')

In [4]:
# Remove all rows with questions marks in them
for c in dataDF.schema.names:
  dataDF = dataDF.filter(~col(c).isin(['?']))

# Convert all data from string to float
from pyspark.sql.types import DoubleType

for c in dataDF.schema.names:
  dataDF = dataDF.withColumn(str(c), dataDF[str(c)].cast(DoubleType()))

In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer

#
dataDF = dataDF.withColumnRenamed("Biopsy", "label")
#
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
vecAssembler = VectorAssembler(inputCols=dataDF.schema.names, outputCol="features")
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features")
pipeline = Pipeline(stages=[vecAssembler, labelIndexer, lr])
#maxIter=10

In [6]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [10, 50, 100]).addGrid(lr.regParam, [0.1, 0.01, 0.001]).build()

In [7]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=5)  

In [8]:
dataDF.count()

In [9]:
splits = dataDF.randomSplit([0.8, 0.20], 10)
training = splits[0]
test = splits[1]

In [10]:
training.schema.names

In [11]:
cvModel = crossval.fit(training)

In [12]:
# Make predictions.
predictions = cvModel.transform(test)
predictions.select("prediction", "indexedLabel", "features").show(5)

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

In [14]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionsAndLabels = predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictionsAndLabels)

In [15]:
# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Accuracy = %s" % accuracy)
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

In [16]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import PCA

# Model 2
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
lr = LinearRegression(labelCol="indexedLabel", featuresCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, labelIndexer,pca, lr])
cvModel2 = crossval.fit(training)
predictions = cvModel2.transform(test)
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

predictionsAndLabels = predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictionsAndLabels)

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)