In [None]:
from pyspark.sql.functions import isnan, when, count, col
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

1. Sample code number            id number
   2. Clump Thickness               1 - 10
   3. Uniformity of Cell Size       1 - 10
   4. Uniformity of Cell Shape      1 - 10
   5. Marginal Adhesion             1 - 10
   6. Single Epithelial Cell Size   1 - 10
   7. Bare Nuclei                   1 - 10
   8. Bland Chromatin               1 - 10
   9. Normal Nucleoli               1 - 10
  10. Mitoses                       1 - 10
  11. Class:                        (2 for benign, 4 for malignant)

In [None]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType, LongType

schema = StructType([
  StructField("id", StringType(), False),
  StructField("clump_thickness", DoubleType(), False),
  StructField("uniformity_cell_size", DoubleType(), False),
  StructField("uniformity_cell_shape", DoubleType(), False),
  StructField("marginal_adhesion", DoubleType(), False),
  StructField("single_epithelial_cell_size", DoubleType(), False),
  StructField("bare_nuclei", DoubleType(), False),
  StructField("bland_chromatin", DoubleType(), False),
  StructField("normal_nucleoli", DoubleType(), False),
  StructField("mitoses", DoubleType(), False),
  StructField("cancer", StringType(), False),
])

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
# %fs ls /FileStore/tables/breast_cancer_wisconsin.csv

path,name,size,modificationTime
dbfs:/FileStore/tables/breast_cancer_wisconsin.csv,breast_cancer_wisconsin.csv,19889,1652709929000


In [None]:
df_cancer = spark.read.format("csv").schema(schema).load("dbfs:/FileStore/tables/breast_cancer_wisconsin.csv")

In [None]:
df_cancer.head()

Out[390]: Row(id='1000025', clump_thickness=5.0, uniformity_cell_size=1.0, uniformity_cell_shape=1.0, marginal_adhesion=1.0, single_epithelial_cell_size=2.0, bare_nuclei=1.0, bland_chromatin=3.0, normal_nucleoli=1.0, mitoses=1.0, cancer='2')

In [None]:
df_cancer = df_cancer.fillna(0)

In [None]:
df_cancer.toPandas().head()

Unnamed: 0,id,clump_thickness,uniformity_cell_size,uniformity_cell_shape,marginal_adhesion,single_epithelial_cell_size,bare_nuclei,bland_chromatin,normal_nucleoli,mitoses,cancer
0,1000025,5.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0,1.0,2
1,1002945,5.0,4.0,4.0,5.0,7.0,10.0,3.0,2.0,1.0,2
2,1015425,3.0,1.0,1.0,1.0,2.0,2.0,3.0,1.0,1.0,2
3,1016277,6.0,8.0,8.0,1.0,3.0,4.0,3.0,7.0,1.0,2
4,1017023,4.0,1.0,1.0,3.0,2.0,1.0,3.0,1.0,1.0,2


In [None]:
df_cancer.count()

Out[393]: 699

In [None]:
#df_cancer = df_cancer.replace({2.0: 0.0, 4.0: 1.0 }, subset = ["label"])
df_cancer.toPandas().head()

Unnamed: 0,id,clump_thickness,uniformity_cell_size,uniformity_cell_shape,marginal_adhesion,single_epithelial_cell_size,bare_nuclei,bland_chromatin,normal_nucleoli,mitoses,cancer
0,1000025,5.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0,1.0,2
1,1002945,5.0,4.0,4.0,5.0,7.0,10.0,3.0,2.0,1.0,2
2,1015425,3.0,1.0,1.0,1.0,2.0,2.0,3.0,1.0,1.0,2
3,1016277,6.0,8.0,8.0,1.0,3.0,4.0,3.0,7.0,1.0,2
4,1017023,4.0,1.0,1.0,3.0,2.0,1.0,3.0,1.0,1.0,2


In [None]:
# Number of positive examples
df_cancer[df_cancer["cancer"]=='2'].count()

Out[395]: 458

In [None]:
# Number of negative examples
df_cancer[df_cancer["cancer"]=='4'].count()

Out[396]: 241

In [None]:
#df_cancer.filter(df_cancer["label"]==0.0).count()

In [None]:
df_cancer.select([count(when(isnan(c), c)).alias(c) for c in df_cancer.columns]).show()

+---+---------------+--------------------+---------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+------+
| id|clump_thickness|uniformity_cell_size|uniformity_cell_shape|marginal_adhesion|single_epithelial_cell_size|bare_nuclei|bland_chromatin|normal_nucleoli|mitoses|cancer|
+---+---------------+--------------------+---------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+------+
|  0|              0|                   0|                    0|                0|                          0|          0|              0|              0|      0|     0|
+---+---------------+--------------------+---------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+------+



In [None]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="cancer", outputCol="label")
stages = [label_stringIdx]

In [None]:
label_stringIdx.getOutputCol()

Out[400]: 'label'

In [None]:
assemblerInputs = df_cancer.columns[1:10]

In [None]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [None]:
assemblerInputs

Out[403]: ['clump_thickness',
 'uniformity_cell_size',
 'uniformity_cell_shape',
 'marginal_adhesion',
 'single_epithelial_cell_size',
 'bare_nuclei',
 'bland_chromatin',
 'normal_nucleoli',
 'mitoses']

In [None]:
df_cancer.columns

Out[404]: ['id',
 'clump_thickness',
 'uniformity_cell_size',
 'uniformity_cell_shape',
 'marginal_adhesion',
 'single_epithelial_cell_size',
 'bare_nuclei',
 'bland_chromatin',
 'normal_nucleoli',
 'mitoses',
 'cancer']

In [None]:
stages += [assembler]

In [None]:
stages

Out[406]: [StringIndexer_1d7cf6cf85a1, VectorAssembler_334e76a1761e]

In [None]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df_cancer)
preppedDataDF = pipelineModel.transform(df_cancer)

In [None]:
preppedDataDF.toPandas().head()

  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,id,clump_thickness,uniformity_cell_size,uniformity_cell_shape,marginal_adhesion,single_epithelial_cell_size,bare_nuclei,bland_chromatin,normal_nucleoli,mitoses,cancer,label,features
0,1000025,5.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0,1.0,2,0.0,"[5.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0]"
1,1002945,5.0,4.0,4.0,5.0,7.0,10.0,3.0,2.0,1.0,2,0.0,"[5.0, 4.0, 4.0, 5.0, 7.0, 10.0, 3.0, 2.0, 1.0]"
2,1015425,3.0,1.0,1.0,1.0,2.0,2.0,3.0,1.0,1.0,2,0.0,"[3.0, 1.0, 1.0, 1.0, 2.0, 2.0, 3.0, 1.0, 1.0]"
3,1016277,6.0,8.0,8.0,1.0,3.0,4.0,3.0,7.0,1.0,2,0.0,"[6.0, 8.0, 8.0, 1.0, 3.0, 4.0, 3.0, 7.0, 1.0]"
4,1017023,4.0,1.0,1.0,3.0,2.0,1.0,3.0,1.0,1.0,2,0.0,"[4.0, 1.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 1.0]"


In [None]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
#display(lrModel, preppedDataDF, "ROC")

In [None]:
display(lrModel, preppedDataDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.9999997760989064
0.0,0.0263157894736842,0.9999997760989064
0.0,0.0526315789473684,0.9999995681991344
0.0,0.0789473684210526,0.9999940519518832
0.0,0.1052631578947368,0.9999935717171476
0.0,0.131578947368421,0.9999873537710642
0.0,0.1578947368421052,0.9999870512590772
0.0,0.1842105263157894,0.9999424942024002
0.0,0.2105263157894736,0.9999306438461848
0.0,0.2368421052631578,0.9998257613825956


In [None]:
display(lrModel, preppedDataDF)

fitted values,residuals
2.26752399387699,-0.906151437343818
-4.321639064350649,-0.0131041043977416
-6.195280726993061,-0.002034881522468
-6.021996635756776,-0.0024189577246384
-3.4554535128316672,-0.0306066394681897
-4.410679865174761,-0.0120011403966263
-6.021996635756776,-0.0024189577246384
-6.021996635756776,-0.0024189577246384
-1.8251575627490524,-0.1388161564425237
7.063581854659081,0.0008549759685957392


In [None]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = preppedDataDF.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

496
203


## Logistic Regression

In [None]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [None]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [None]:
predictions.toPandas().head(5)

  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,id,clump_thickness,uniformity_cell_size,uniformity_cell_shape,marginal_adhesion,single_epithelial_cell_size,bare_nuclei,bland_chromatin,normal_nucleoli,mitoses,cancer,label,features,rawPrediction,probability,prediction
0,1002945,5.0,4.0,4.0,5.0,7.0,10.0,3.0,2.0,1.0,2,0.0,"[5.0, 4.0, 4.0, 5.0, 7.0, 10.0, 3.0, 2.0, 1.0]","[-2.5580705431065436, 2.5580705431065436]","[0.07188616653442371, 0.9281138334655763]",1.0
1,1018099,1.0,1.0,1.0,1.0,2.0,10.0,3.0,1.0,1.0,2,0.0,"[1.0, 1.0, 1.0, 1.0, 2.0, 10.0, 3.0, 1.0, 1.0]","[1.8859352405161376, -1.8859352405161376]","[0.8682913742058727, 0.13170862579412734]",0.0
2,1018561,2.0,1.0,2.0,1.0,2.0,1.0,3.0,1.0,1.0,2,0.0,"[2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0]","[5.227536157072055, -5.227536157072055]","[0.9946619162530372, 0.005338083746962763]",0.0
3,1031608,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,2,0.0,"[2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0]","[5.956516924091725, -5.956516924091725]","[0.9974177714977072, 0.002582228502292838]",0.0
4,1033078,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,5.0,2,0.0,"[2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 5.0]","[4.333165343034546, -4.333165343034546]","[0.9870441243816102, 0.012955875618389845]",0.0


In [None]:
trainingSummary = lrModel.summary

In [None]:
trainingSummary.accuracy

Out[417]: 0.969758064516129

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Out[418]: 0.9951229855810009

In [None]:
evaluator.getMetricName()

Out[419]: 'areaUnderROC'

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")

In [None]:
evaluator.evaluate(predictions)

Out[421]: 0.9607718405428328

In [None]:
predictionAndTarget = predictions.select(['label', 'prediction'])

In [None]:
auc = evaluator.evaluate(predictionAndTarget)

In [None]:
auc

Out[424]: 0.9607718405428328

In [None]:
#from pyspark.mllib.evaluation import BinaryClassificationMetrics
#metrics = BinaryClassificationMetrics(predictions)

In [None]:
# Compute raw scores on the test set
#predictionAndLabels = testData.rdd.map(lambda lp: (float(lrModel.predict(lp.features)), lp.label))

# Instantiate metrics object
#metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under precision-recall curve
#print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
#print("Area under ROC = %s" % metrics.areaUnderROC)

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

results = predictions.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)



In [None]:
metrics.areaUnderROC

Out[428]: 0.9607718405428328

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix
from pyspark.mllib.evaluation import BinaryClassificationMetrics

def getMetrics(predictions):
  results = predictions.select(['prediction', 'label'])
  predictionAndLabels=results.rdd
  metrics = BinaryClassificationMetrics(predictionAndLabels)
  return metrics

def getConfusion_matrix(predictions):
  y_true = predictions.select("label")
  y_true = y_true.toPandas()
  y_pred = predictions.select("prediction")
  y_pred = y_pred.toPandas()
  cnf_matrix = confusion_matrix(y_true, y_pred)
  return cnf_matrix

def getInfo(predictions):
  arr = getConfusion_matrix(predictions)
  
  FPR = arr[1][0]/sum(arr[1]) # False Positive Rate
  FNR = arr[0][1]/sum(arr[0]) # False Negative Rate
  PPV = arr[0][0]/np.sum(arr, axis=0)[0] #Precision
  TPR = arr[0][0]/np.sum(arr, axis=1)[0] # recall
  ACC = sum(np.diag(arr))/np.sum(arr) # Accuracy
  AUC = getMetrics(predictions).areaUnderROC # AUC
  
  print('+'*100)
  print('\tFalse Positive Rate: ', FPR)
  print('\tFalse Negative Rate: ', FNR)
  print('\n\tPrecision: ', PPV)
  print('\tRecall: ', TPR)
  print('\tAccuracy: ', ACC)
  print('\n\tAUC: ', AUC)
  print('+'*100)

In [None]:
getInfo(predictions)

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
	False Positive Rate:  0.05555555555555555
	False Negative Rate:  0.022900763358778626

	Precision:  0.9696969696969697
	Recall:  0.9770992366412213
	Accuracy:  0.9655172413793104

	AUC:  0.9607718405428328
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


## DecisionTree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features") #, maxDepth=10
dtModel = dt.fit(trainingData)
predictions_dt = dtModel.transform(testData)

In [None]:
getInfo(predictions_dt)

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
	False Positive Rate:  0.05555555555555555
	False Negative Rate:  0.030534351145038167

	Precision:  0.9694656488549618
	Recall:  0.9694656488549618
	Accuracy:  0.9605911330049262

	AUC:  0.956955046649703
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


In [None]:
# Gets the value of maxDepth or its default value.
dtModel.getMaxDepth()

Out[433]: 5

## Random forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features") #, numTrees=10
rfModel = rf.fit(trainingData)
predictions_rf = rfModel.transform(testData)

In [None]:
getInfo(predictions_rf)

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
	False Positive Rate:  0.0
	False Negative Rate:  0.030534351145038167

	Precision:  1.0
	Recall:  0.9694656488549618
	Accuracy:  0.9802955665024631

	AUC:  0.9847328244274809
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


In [None]:
# Gets the value of numTrees or its default value.
rfModel.getNumTrees

Out[436]: 20

> Mô hình **Random Forest** cho AUC lớn nhất là **0.98**