### University of Virginia
### DS 5110: Big Data Systems

### Classification of Wisconsin Breast Cancer Database
### Last updated: June 17, 2021

**Instructions** 

In this project, you will work with the Wisconsin Breast Cancer dataset.  You will train a logistic regression model to predict the diagnosis.  First, you will work through this example, **filling in the missing cells.**  Then you will make modifications and run the code, collecting results at the bottom of the notebook.

The following experiments should be conducted:
1.  Three features were used in the original model.  **Build the model using all features.**
Before training the model, apply scaling to the features using the StandardScaler
transformer.  Then train the model and compute and show the accuracy and confusion matrix, **measured on the test set.**

**Hint**: While the data is in a dataframe, this might be helpful:
```
from pyspark.ml.feature import StandardScaler
```

2. Repeat step (1), including an intercept
3. Repeat step (1), using randomSplit([0.7, 0.3]) but NO intercept

**Total Possible Points: 10**

In [1]:
# load modules
from pyspark.sql import SparkSession
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.evaluation import MulticlassMetrics

import os

In [2]:
# param init 
infile = 'wisc_breast_cancer_w_fields.csv'

spark = SparkSession \
    .builder \
    .appName("Wisc BRCA") \
    .getOrCreate()

In [3]:
# read data into dataframe
df = spark.read.csv(infile, inferSchema=True, header = True)

**(1 PT)** Combine fields *f1*, *f2*, *f3* into a single *features* column using `VectorAssembler`  
Name the resulting dataframe *transformed*

In [4]:
assembler = VectorAssembler(inputCols=["f1", "f2", "f3"], outputCol="features") 
transformed = assembler.transform(df)

Select the *diagnosis* and *features* fields for modeling.  
We will do the remaining steps with RDDs, so we convert to RDD

In [5]:
dataRdd = transformed.select("diagnosis", "features").rdd.map(tuple)

In [6]:
# look at some data
dataRdd.take(2)

[('M', DenseVector([17.99, 10.38, 122.8])),
 ('M', DenseVector([20.57, 17.77, 132.9]))]

In [8]:
# map label to binary values, then convert to LabeledPoint
lp = dataRdd.map(lambda row:(1 if row[0]=='M' else 0, Vectors.dense(row[1])))    \
                    .map(lambda row: LabeledPoint(row[0], row[1]))

In [9]:
# look at some data
lp.take(2)

[LabeledPoint(1.0, [17.99,10.38,122.8]),
 LabeledPoint(1.0, [20.57,17.77,132.9])]

**(1 PT)** Split data approximately into training (60%) and test (40%) using `seed=314`  
The RDDs that are output from the splitting should be named *training*, *test*, respectively

In [10]:
training, test = lp.randomSplit([0.6, 0.4], seed=314)

In [11]:
# count records in datasets
(training.count(), test.count(), lp.count())

(356, 213, 569)

In [12]:
(training.count()/lp.count(), test.count()/lp.count(), lp.count()/lp.count())

(0.6256590509666081, 0.37434094903339193, 1.0)

**(1 PT)** Train model `LogisticRegressionWithLBFGS`, naming it *model*

In [13]:
model = LogisticRegressionWithLBFGS.train(training)

**(1 PT)**  Evaluate the model by computing the accuracy on the **test data**. Print the accuracy.

In [14]:
# Evaluating the model on test data
labelsAndPreds_te = test.map(lambda p: (p.label, model.predict(p.features)))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
print('model accuracy (test): {}'.format(accuracy_te))

model accuracy (test): 0.8732394366197183


**SOLUTIONS**  
 For parts 1-3, compute and show for the test set: (1) accuracy (2) confusion matrix.  

Enter solution for Part 1 (2 POINTS)

In [54]:
# use all of the fields as features
assembler = VectorAssembler(inputCols=[i for i in df.columns if i[0]=='f'], outputCol="features") 
transformed = assembler.transform(df)

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)

# convert to RDD
dataRdd = scaledData.select("diagnosis","scaledFeatures").rdd.map(tuple)

# map label to binary values, then create LabeledPoints
lp = dataRdd.map(lambda row: LabeledPoint(1 if row[0]=='M' else 0, Vectors.dense(row[1])))

# Split data approximately into training (60%) and test (40%)
training, test = lp.randomSplit([0.6, 0.4], seed=314)

# Build the model
model = LogisticRegressionWithLBFGS.train(training)

# Evaluating the model on test data

# make sure predictions are floats
labelsAndPreds_te = test.map(lambda p: (p.label, float(model.predict(p.features))))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
print('model accuracy (test): {}'.format(accuracy_te))

metrics = MulticlassMetrics(labelsAndPreds_te)
labelsAndPreds_te.take(3)
print("Confusion Matrix:\n{}".format(metrics.confusionMatrix().toArray()))

model accuracy (test): 0.9624413145539906
Confusion Matrix:
[[135.   4.]
 [  4.  70.]]


Enter solution for Part 2  (2 POINTS)

In [55]:
assembler = VectorAssembler(inputCols=[i for i in df.columns if i[0]=='f'], outputCol="features") 
transformed = assembler.transform(df)

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)

dataRdd = scaledData.select("diagnosis","scaledFeatures").rdd.map(tuple)

lp = dataRdd.map(lambda row: LabeledPoint(1 if row[0]=='M' else 0, Vectors.dense(row[1])))

training, test = lp.randomSplit([0.6, 0.4], seed=314)

model = LogisticRegressionWithLBFGS.train(training, intercept=True)

labelsAndPreds_te = test.map(lambda p: (p.label, float(model.predict(p.features))))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
print('model accuracy (test): {}'.format(accuracy_te))

metrics = MulticlassMetrics(labelsAndPreds_te)
labelsAndPreds_te.take(3)
print("Confusion Matrix:\n{}".format(metrics.confusionMatrix().toArray()))

model accuracy (test): 0.9671361502347418
Confusion Matrix:
[[135.   3.]
 [  4.  71.]]


Enter solution for Part 3 (2 POINTS)

In [56]:
assembler = VectorAssembler(inputCols=[i for i in df.columns if i[0]=='f'], outputCol="features") 
transformed = assembler.transform(df)

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)

dataRdd = scaledData.select("diagnosis","scaledFeatures").rdd.map(tuple)

lp = dataRdd.map(lambda row: LabeledPoint(1 if row[0]=='M' else 0, Vectors.dense(row[1])))

training, test = lp.randomSplit([0.7, 0.3], seed=314)

model = LogisticRegressionWithLBFGS.train(training)

labelsAndPreds_te = test.map(lambda p: (p.label, float(model.predict(p.features))))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
print('model accuracy (test): {}'.format(accuracy_te))

metrics = MulticlassMetrics(labelsAndPreds_te)
labelsAndPreds_te.take(3)
print("Confusion Matrix:\n{}".format(metrics.confusionMatrix().toArray()))

model accuracy (test): 0.9433962264150944
Confusion Matrix:
[[98.  6.]
 [ 3. 52.]]
