### University of Virginia
### DS 5110: Big Data Systems

### Lab: Supervised Learning
### Last updated: March 2, 2023

---

#### Instructions

This project has two parts:
- Part I: Classification - build and apply a logistic regression model on the Wisconsin Breast Cancer dataset.
- Part II: Regression - build and apply a linear regression model on the California Housing dataset.

**Total Possible Points: 10**

---

#### Part I: Classification (5 POINTS)

Here are the specifications and grading breakdown:

- the target variable is `diagnosis`
- use `f1`, `f2` as predictors (1 PT)
- split data into 60% training set, 40% test set 
- standardize the predictors (1 PT)
- use seed=314 whenever a seed is needed
- fit a Logistic Regression model with an intercept (1 PT)
- compute and show the area under the ROC curve for the test set (2 PTS)

In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
DATA_FILEPATH = 'wisc_breast_cancer_w_fields.csv'

spark = SparkSession \
    .builder \
    .appName("Wisc BRCA") \
    .getOrCreate()

23/09/20 15:32:19 WARN Utils: Your hostname, Beaus-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.32 instead (on interface en0)
23/09/20 15:32:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/20 15:32:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# read data into dataframe
training = spark.read.csv(DATA_FILEPATH,  inferSchema=True, header = True)
training.show(2)

23/09/20 15:32:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+---------+-----+-----+-----+------+-------+-------+------+-------+------+-------+------+------+-----+-----+--------+-------+-------+-------+-------+--------+-----+-----+-----+------+------+------+------+------+------+-------+
|    id|diagnosis|   f1|   f2|   f3|    f4|     f5|     f6|    f7|     f8|    f9|    f10|   f11|   f12|  f13|  f14|     f15|    f16|    f17|    f18|    f19|     f20|  f21|  f22|  f23|   f24|   f25|   f26|   f27|   f28|   f29|    f30|
+------+---------+-----+-----+-----+------+-------+-------+------+-------+------+-------+------+------+-----+-----+--------+-------+-------+-------+-------+--------+-----+-----+-----+------+------+------+------+------+------+-------+
|842302|        M|17.99|10.38|122.8|1001.0| 0.1184| 0.2776|0.3001| 0.1471|0.2419|0.07871| 1.095|0.9053|8.589|153.4|0.006399|0.04904|0.05373|0.01587|0.03003|0.006193|25.38|17.33|184.6|2019.0|0.1622|0.6656|0.7119|0.2654|0.4601| 0.1189|
|842517|        M|20.57|17.77|132.9|1326.0|0.08474|0.07864|0.086

In [4]:
train, test = training.randomSplit([.6,.4], 314)

In [5]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCols = ['diagnosis'], outputCols=['diagnosis_numeric'], handleInvalid = 'skip')

In [6]:
from pyspark.ml.feature import VectorAssembler

# inputCols take a list of column names
# outputCol is arbitrary name of new column; generally called features

assembler = VectorAssembler(inputCols=["f1", "f2"],
                            outputCol="features")

In [7]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

In [8]:
train.take(5)

[Row(id=8670, diagnosis='M', f1=15.46, f2=19.48, f3=101.7, f4=748.9, f5=0.1092, f6=0.1223, f7=0.1466, f8=0.08087, f9=0.1931, f10=0.05796, f11=0.4743, f12=0.7859, f13=3.094, f14=48.31, f15=0.00624, f16=0.01484, f17=0.02813, f18=0.01093, f19=0.01397, f20=0.002461, f21=19.26, f22=26.0, f23=124.9, f24=1156.0, f25=0.1546, f26=0.2394, f27=0.3791, f28=0.1514, f29=0.2837, f30=0.08019),
 Row(id=8913, diagnosis='B', f1=12.89, f2=13.12, f3=81.89, f4=515.9, f5=0.06955, f6=0.03729, f7=0.0226, f8=0.01171, f9=0.1337, f10=0.05581, f11=0.1532, f12=0.469, f13=1.115, f14=12.68, f15=0.004731, f16=0.01345, f17=0.01652, f18=0.005905, f19=0.01619, f20=0.002081, f21=13.62, f22=15.54, f23=87.4, f24=577.0, f25=0.09616, f26=0.1147, f27=0.1186, f28=0.05366, f29=0.2309, f30=0.06915),
 Row(id=8915, diagnosis='B', f1=14.96, f2=19.1, f3=97.03, f4=687.3, f5=0.08992, f6=0.09823, f7=0.0594, f8=0.04819, f9=0.1879, f10=0.05852, f11=0.2877, f12=0.948, f13=2.171, f14=24.87, f15=0.005332, f16=0.02115, f17=0.01536, f18=0.0118

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(labelCol='diagnosis_numeric',
                        featuresCol='scaledFeatures',
                        maxIter=10, 
                        regParam=0.3, 
                        elasticNetParam=0.8)

pipeline = Pipeline(stages = [stringIndexer, assembler, scaler, lr])
# Fit the model
lrModel = pipeline.fit(train)

23/09/20 15:32:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predDF = lrModel.transform(test)

# set up evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="diagnosis_numeric",
                                          metricName="areaUnderROC")

# pass to evaluator the DF with predictions, labels
auc = evaluator.evaluate(predDF)

print("Area under ROC Curve:", auc)

Area under ROC Curve: 0.6959459459459459


#### Enter code and solution

#### Part II: Regression (5 POINTS)

In this project, you will work with the California Home Price dataset to train a regression model and predict median home prices. Here are the specifications and grading breakdown:

- Scale the response variable median_house_value, dividing by 100000 (1 PT)

- Split data into train set (80%), test set (20%) using seed=314 (1 PT)

- Add new predictor: `rooms_per_household`

- In the training set, select all of these features and standardize them: (1 PT)

feats = ["total_bedrooms", 
         "population", 
         "households", 
         "median_income", 
         "rooms_per_household"]

- Fit a linear regression model on the training set with these parameters:

  - maxIter=10
  - regParam=0.3
  - elasticNetParam=0.8  


- Compute the MSE on the test set (2 PTS)

In [12]:
import os
import pandas as pd

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [13]:
DATA_FILEPATH2 = 'cal_housing_data_preproc_w_header.txt'

In [17]:
data = spark.read.csv(DATA_FILEPATH2,  inferSchema=True, header = True)
data.show(2)

+------------------+-------------+------------------+-----------+--------------+----------+----------+--------+---------+
|median_house_value|median_income|housing_median_age|total_rooms|total_bedrooms|population|households|latitude|longitude|
+------------------+-------------+------------------+-----------+--------------+----------+----------+--------+---------+
|          452600.0|       8.3252|              41.0|      880.0|         129.0|     322.0|     126.0|   37.88|  -122.23|
|          358500.0|       8.3014|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|   37.86|  -122.22|
+------------------+-------------+------------------+-----------+--------------+----------+----------+--------+---------+
only showing top 2 rows



In [21]:
data = data.withColumn('median_house_value', data['median_house_value']/100000)
data.show(5)

+------------------+-----------------+------------------+-----------+--------------+----------+----------+--------+---------+
|median_house_value|    median_income|housing_median_age|total_rooms|total_bedrooms|population|households|latitude|longitude|
+------------------+-----------------+------------------+-----------+--------------+----------+----------+--------+---------+
|             4.526|           8.3252|              41.0|      880.0|         129.0|     322.0|     126.0|   37.88|  -122.23|
|             3.585|           8.3014|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|   37.86|  -122.22|
|             3.521|7.257399999999999|              52.0|     1467.0|         190.0|     496.0|     177.0|   37.85|  -122.24|
|             3.413|           5.6431|              52.0|     1274.0|         235.0|     558.0|     219.0|   37.85|  -122.25|
|             3.422|           3.8462|              52.0|     1627.0|         280.0|     565.0|     259.0|   37.85|  -

In [22]:
data = data.withColumn('rooms_per_household', data['total_rooms']/data['households'])
data.show(5)

+------------------+-----------------+------------------+-----------+--------------+----------+----------+--------+---------+-------------------+
|median_house_value|    median_income|housing_median_age|total_rooms|total_bedrooms|population|households|latitude|longitude|rooms_per_household|
+------------------+-----------------+------------------+-----------+--------------+----------+----------+--------+---------+-------------------+
|             4.526|           8.3252|              41.0|      880.0|         129.0|     322.0|     126.0|   37.88|  -122.23|  6.984126984126984|
|             3.585|           8.3014|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|   37.86|  -122.22|  6.238137082601054|
|             3.521|7.257399999999999|              52.0|     1467.0|         190.0|     496.0|     177.0|   37.85|  -122.24|  8.288135593220339|
|             3.413|           5.6431|              52.0|     1274.0|         235.0|     558.0|     219.0|   37.85|  -122.25

In [23]:
train, test = data.randomSplit([.8,.2], 314)

In [26]:
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=["total_bedrooms", "population", "households", "median_income", "rooms_per_household"],
                            outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

lr = LinearRegression(labelCol='median_house_value',
                        featuresCol='scaledFeatures',
                        maxIter=10, 
                        regParam=0.3, 
                        elasticNetParam=0.8)

pipeline = Pipeline(stages = [assembler, scaler, lr])
# Fit the model
lrModel = pipeline.fit(train)

In [27]:
from pyspark.ml.evaluation import RegressionEvaluator

predDF = lrModel.transform(test)

regressionEvaluator = RegressionEvaluator(
    predictionCol = 'prediction',
    labelCol = 'median_house_value',
    metricName= 'mse')
mse = regressionEvaluator.evaluate(predDF)
print(f"MSE is {mse}")

MSE is 0.7551749809615468


#### Enter code and solution