### University of Virginia
### DS 5559: Big Data Analytics

### Classification of Wisconsin Breast Cancer Database
### Last updated: Feb 20, 2020

**Instructions** 

In this project, you will work with the Wisconsin Breast Cancer dataset.  You will train a logistic regression model to predict the diagnosis.  First, you will work through this example.  Then you will make modifications and run the code, collecting results at the bottom of the notebook.

The following experiments should be conducted:
1.  Three features were used in the model.  Build the  model using all features.
Before training the model, apply scaling to the features using the StandardScaler
transformer.  Then train the model and compute the accuracy on the test set.  Additionally, compute and show the confusion matrix.

2. Repeat step (1), including an intercept
3. Repeat step (1), using randomSplit([0.7, 0.3])
4. Repeat step (2), using randomSplit([0.7, 0.3])
5. Compare and discuss the results of (1) vs (2).  Compare and discuss the results of (3) vs (4).

**Total Possible Points: 10**

In [1]:
import os
import pyspark.sql.types as typ
import pyspark.sql.functions as F

In [2]:
# load modules
from pyspark.sql import SparkSession
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.sql.functions import col 
from pyspark.mllib.evaluation import MulticlassMetrics

import os

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '39481'),
 ('spark.driver.host', 'udc-ba26-10'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'data preprocessing'),
 ('spark.executor.cores', '4'),
 ('spark.cores.max', '4'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1591652769640'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.memory', '8g'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [5]:
# read in data
data_file = "wisc_breast_cancer_w_fields.csv"
df = spark.read.csv(data_file, inferSchema=True, header = True)

In [6]:
# use some of the fields as features
assembler = VectorAssembler(inputCols=["f1", "f2", "f3"], outputCol="features") 
transformed = assembler.transform(df)

In [7]:
# convert to RDD
dataRdd_s = transformed.select(col("diagnosis"), col("features")).rdd.map(tuple)

In [8]:
dataRdd_s.take(2)

[('M', DenseVector([17.99, 10.38, 122.8])),
 ('M', DenseVector([20.57, 17.77, 132.9]))]

In [9]:
# convert to RDD
dataRdd = transformed.select(col("diagnosis"), col("features")).rdd.map(tuple)

In [10]:
# look at some data
dataRdd.take(2)

[('M', DenseVector([17.99, 10.38, 122.8])),
 ('M', DenseVector([20.57, 17.77, 132.9]))]

In [11]:
# map label to binary values, then convert to LabeledPoint
lp = dataRdd.map(lambda row:(1 if row[0]=='M' else 0, Vectors.dense(row[1])))    \
                    .map(lambda row: LabeledPoint(row[0], row[1]))

In [12]:
# look at some data
lp.take(2)

[LabeledPoint(1.0, [17.99,10.38,122.8]),
 LabeledPoint(1.0, [20.57,17.77,132.9])]

In [13]:
# Split data approximately into training (60%) and test (40%)
training, test = lp.randomSplit([0.6, 0.4], seed=314)

In [14]:
# count records in datasets
(training.count(), test.count(), lp.count())

(356, 213, 569)

In [15]:
(training.count()/lp.count(), test.count()/lp.count(), lp.count()/lp.count())

(0.6256590509666081, 0.37434094903339193, 1.0)

In [16]:
# Build the model
model = LogisticRegressionWithLBFGS.train(training)

In [17]:
# Evaluating the model on test data
labelsAndPreds_te = test.map(lambda p: (p.label, model.predict(p.features)))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
print('model accuracy (test): {}'.format(accuracy_te))

model accuracy (test): 0.8732394366197183


In [18]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

**SOLUTIONS**  
 For parts 1-4, compute and show for the test set: (1) accuracy (2) confusion matrix.  
 Each part is worth 2 POINTS.

In [19]:
df.describe().select("Summary", "f1", "f2", "f3").show()

+-------+------------------+-----------------+-----------------+
|Summary|                f1|               f2|               f3|
+-------+------------------+-----------------+-----------------+
|  count|               569|              569|              569|
|   mean|14.127291739894563|19.28964850615117|91.96903339191566|
| stddev|3.5240488262120793|4.301035768166948| 24.2989810387549|
|    min|             6.981|             9.71|            43.79|
|    max|             28.11|            39.28|            188.5|
+-------+------------------+-----------------+-----------------+



In [20]:
from sklearn.preprocessing import StandardScaler

In [21]:
from pyspark.ml.feature import StandardScaler

In [31]:
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["f1", "f2", "f3"], outputCol="features") 
# Now let us use the transform method to transform our dataset
df=assembler.transform(df)
df.select("features").show()

+-------------------+
|           features|
+-------------------+
|[17.99,10.38,122.8]|
|[20.57,17.77,132.9]|
|[19.69,21.25,130.0]|
|[11.42,20.38,77.58]|
|[20.29,14.34,135.1]|
| [12.45,15.7,82.57]|
|[18.25,19.98,119.6]|
| [13.71,20.83,90.2]|
|  [13.0,21.82,87.5]|
|[12.46,24.04,83.97]|
|[16.02,23.24,102.7]|
|[15.78,17.89,103.6]|
| [19.17,24.8,132.4]|
|[15.85,23.95,103.7]|
| [13.73,22.61,93.6]|
|[14.54,27.54,96.73]|
|[14.68,20.13,94.74]|
|[16.13,20.68,108.1]|
|[19.81,22.15,130.0]|
|[13.54,14.36,87.46]|
+-------------------+
only showing top 20 rows



In [34]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").\
setOutputCol("Scaled_features")
df=standardscaler.fit(df).transform(df)
df.select("features","Scaled_features").show(5)

+-------------------+--------------------+
|           features|     Scaled_features|
+-------------------+--------------------+
|[17.99,10.38,122.8]|[5.10492359418783...|
|[20.57,17.77,132.9]|[5.83703603849048...|
|[19.69,21.25,130.0]|[5.58732326679035...|
|[11.42,20.38,77.58]|[3.24059074183574...|
|[20.29,14.34,135.1]|[5.75758197476771...|
+-------------------+--------------------+
only showing top 5 rows



In [37]:
# convert to RDD
dataRdd_s = transformed.select(col("diagnosis"), col("features")).rdd.map(tuple)

In [46]:
## Enter solution for Part 2



In [None]:
## Enter solution for Part 3



In [None]:
## Enter solution for Part 4



In [2]:
## Enter solution for Part 5

