In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
#  id, clump_thickness, unif_cell_size, unif_cell_shape, marg_adhesion,
#  single_epith_cell_size, bare_nuclei, bland_chrom, norm_nucleoli, mitoses, class
schema = StructType([StructField('id', DoubleType(), True),
                    StructField('clump_thickness', DoubleType(), True),
                    StructField('unif_cell_size', DoubleType(), True),
                    StructField('unif_cell_shape', DoubleType(), True),
                    StructField('marg_adhesion', DoubleType(), True),
                    StructField('single_epith_cell_size', DoubleType(), True),
                    StructField('bare_nuclei', DoubleType(), True),
                    StructField('bland_chrom', DoubleType(), True),
                    StructField('norm_nucleoli', DoubleType(), True),
                    StructField('mitoses', DoubleType(), True),
                    StructField('label', DoubleType(), True)])

df = sqlContext.read.format('csv').load('breast-cancer-wisconsin.data.txt', schema = schema)
#print(df.count())
df = df.dropna()
#print(df.count())
#df.take(300)

from pyspark.sql import functions as F
df = df.withColumn('label',
    F.when(df['label']== 4.0, 1.0).
    otherwise(0.0)) 

#df.take(200)

In [2]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

#label_indexer = StringIndexer(inputCol = 'class', outputCol = 'label')


assembler = VectorAssembler(inputCols = ['clump_thickness', 'unif_cell_size', 'unif_cell_shape', 'marg_adhesion', 'single_epith_cell_size', 'bare_nuclei', 'bland_chrom', 'norm_nucleoli', 'mitoses'], outputCol = 'features')



In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC

#classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
classifier = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
#classifier = GBTClassifier(labelCol = 'label', featuresCol = 'features')
#classifier = LinearSVC(labelCol = 'label', featuresCol = 'features')



pipeline = Pipeline(stages = [assembler, classifier])

(train, test) = df.randomSplit([0.8, 0.2])

model = pipeline.fit(train)


from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()

auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("The AUROC is %s and the AUPR is %s." % (auroc, aupr))



The AUROC is 0.9634371395617071 and the AUPR is 0.9701314162028448.


In [12]:
#predictions.select(['label', 'prediction']).toPandas().head(50)
print(df.where(df['label'] == 0.0).count(), df.where(df['label'] == 1.0).count())

444 239
