### Lab: Tuning and Topic Modeling

### University of Virginia
### DS 7200: Distributed Computing
### Last Updated: August 20, 2023

---  

### Justin Lee

### jgh2xh

---

**INSTRUCTIONS**  
In this assignment, you will do three things:
1) Tune a logistic regression model  
2) Label-balance a dataset  
3) Run the Topic Modeling notebook, making small tweaks and capturing results  

**TOTAL POINTS: 10**

In [1]:
import os

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

/opt/conda/lib/python3.7/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/16 00:48:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/16 00:48:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### PARAMETERS

In [3]:
# update to match your path
directory_path = ''
full_path_to_file = os.path.join(directory_path, 'breast_cancer_wisconsin.csv')
path_to_data = os.path.join(full_path_to_file)

In [4]:
# class = 2 for benign (negative class, 4 for malignant (positive class)
target = 'class'
positive_label = 4
negative_label = 2

SEED = 314

### READ IN DATA

In [5]:
brca = spark.read.csv(path_to_data, header=True, inferSchema=True)

In [6]:
brca.printSchema()

root
 |-- id: integer (nullable = true)
 |-- clump_thickness: integer (nullable = true)
 |-- uniformity_cell_size: integer (nullable = true)
 |-- uniformity_cell_shape: integer (nullable = true)
 |-- marginal_adhesion: integer (nullable = true)
 |-- single_epithelial_cell_size: integer (nullable = true)
 |-- bare_nuclei: string (nullable = true)
 |-- bland_chromatin: integer (nullable = true)
 |-- normal_nucleoli: integer (nullable = true)
 |-- mitoses: integer (nullable = true)
 |-- class: integer (nullable = true)



In [7]:
brca.count()

699

In [8]:
# compute distribution of target variable
brca.groupBy(target).count().show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  241|
|    2|  458|
+-----+-----+



### Task 1:  Cross Validate a Logistic Regression Model
i) (**4 PTS**) This task has the following requirements:
- import necessary modules
- use these features as predictors: `clump_thickness`,`uniformity_cell_size`,`uniformity_cell_shape`,`marginal_adhesion`  
- `class` is response variable. apply recoding as needed. hint: save as new variable.
- use 3 folds in the cross validator object
- use BinaryClassificationEvaluator
- logistic regression model with `maxIter`=10  
- tuning grid with `regParam` values of 0.1 and 0.01
- finally, print the average metrics based on each `regParam` value. the attribute `avgMetrics` in the cv model will hold these. 

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import MaxAbsScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pyspark.sql.functions as F
import time

In [10]:
features = ['clump_thickness', 'uniformity_cell_size', 'uniformity_cell_shape', 'marginal_adhesion']

# relabel class
relabeled_class_col = F.when(brca['class'] == positive_label, 1).when(brca['class'] == negative_label, 0)
labels = 'relabeled_class'
brca = brca.withColumn(labels, relabeled_class_col)

# Quick look at the predictors
brca.select(features + [labels]).show(5)

for feat in features:
    brca.groupBy(feat).count().show()

+---------------+--------------------+---------------------+-----------------+---------------+
|clump_thickness|uniformity_cell_size|uniformity_cell_shape|marginal_adhesion|relabeled_class|
+---------------+--------------------+---------------------+-----------------+---------------+
|              5|                   1|                    1|                1|              0|
|              5|                   4|                    4|                5|              0|
|              3|                   1|                    1|                1|              0|
|              6|                   8|                    8|                1|              0|
|              4|                   1|                    1|                3|              0|
+---------------+--------------------+---------------------+-----------------+---------------+
only showing top 5 rows

+---------------+-----+
|clump_thickness|count|
+---------------+-----+
|              1|  145|
|              6|   34|


In [11]:
# use 3 folds in the cross validator object
# use BinaryClassificationEvaluator
# logistic regression model with maxIter=10
# tuning grid with regParam values of 0.1 and 0.01
# finally, print the average metrics based on each regParam value. the attribute avgMetrics in the cv model will hold these. 

## setup pipeline here
# feature transformations
# features_scaled = [feat + '_scaled' for feat in features]
# maxabs_stages = [MaxAbsScaler(inputCol=feat, outputCol=feat+'_scaled') for feat in features]
# maxabs_pipeline = Pipeline(stages=maxabs_stages)
# va = VectorAssembler(inputCols=features_scaled, outputCol='features')
va = VectorAssembler(inputCols=features, outputCol='features')
maxabs = MaxAbsScaler(inputCol='features', outputCol='features_scaled')

# model estimation
maxiter = 10
logreg = LogisticRegression() \
            .setFeaturesCol('features_scaled') \
            .setLabelCol(labels) \
            .setMaxIter(maxiter)
# va.transform(brca).select(x).show(5)

# stages = [maxabs_pipeline, va, logreg]
stages = [va, maxabs, logreg]

model_pipeline = Pipeline(stages=stages)

## setup param grid
reg_params = [0.1, 0.01]
param_grid = ParamGridBuilder() \
                .addGrid(logreg.regParam, reg_params) \
                .build()

## setup crossvalidator
k = 3
evaluator = BinaryClassificationEvaluator(labelCol=labels)
cv = CrossValidator() \
        .setEstimator(model_pipeline) \
        .setEstimatorParamMaps(param_grid) \
        .setEvaluator(evaluator) \
        .setNumFolds(k)

In [12]:
## fit model
t0 = time.time()
cv_model = cv.setParallelism(4).fit(brca)
# cv_model = cv.fit(brca)
print("train time:", time.time() - t0)
print('-'*30)

train time: 7.866039991378784
------------------------------


In [13]:
avg_metrics = cv_model.avgMetrics

for reg_param, avg_metric in zip(reg_params, cv_model.avgMetrics):
    print(f'regParam: {reg_param}  --  avg_auroc: {avg_metric}')

regParam: 0.1  --  avg_auroc: 0.9912708515915821
regParam: 0.01  --  avg_auroc: 0.9911033300218205


### Task 2:  Balancing a DataFrame with Downsampling  
i) (**2 PTS**) Write a function to implement downsampling.  Enter code into the cell containing the `downsample` function.  

INPUTS  
* df               - Spark dataframe  
* target           - string, target variable  
* positive_label   - integer, value of positive label  
* negative_label   - integer, value of negative label  

OUTPUT  
balanced spark dataframe  

Downsampling = sample from larger class to match smaller class  

**Example:**  

INITIAL STATE  
Smaller class has 100 records  
Larger class size has 400 records

ACTION  
Sample 100 records from larger class, without replacement  
Retain all records from smaller class

END STATE    
This produces a balanced dataset containing 100 records from each class

In [14]:
def downsample(df, target, positive_label=1, negative_label=0, seed=None):
    # get counts of pos and neg label
    counts = df.groupBy(target).count()
    pos_count = counts.filter(F.col(target) == positive_label).first()['count']
    neg_count = counts.filter(F.col(target) == negative_label).first()['count']
    total = pos_count + neg_count
    
    # pick larger class and get sampling fraction
    if pos_count > neg_count:
        larger = positive_label
        smaller = negative_label
        frac = neg_count / pos_count
    else:
        larger = negative_label
        smaller = positive_label
        frac = pos_count / neg_count
    
    
    larger_samples = df.filter(F.col(target) == larger).sample(fraction=frac, seed=seed)
    smaller = df.filter(F.col(target) == smaller)
    
    downsampled_df = smaller.union(larger_samples)
    
    return downsampled_df

ii) **(1 PT)** Print the target distribution from this balanced dataset, to show the label counts nearly match.

#### IMPORTANT NOTE:
Sampling won't produce the exact fraction you request. In order to sample efficiently, Spark uses Bernouilli Sampling. 
Each row is assigned a probability of being included. If you request a 10% sample, each row individually has a 10% chance of being included but this does not guarantee an exact 10% sample   
(it should be close, however).

In [15]:
down_brca = downsample(brca, target, positive_label=positive_label, negative_label=negative_label, seed=SEED)

down_brca.groupBy(target).count().show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  241|
|    2|  253|
+-----+-----+



### Task 3:  Topic Modeling

In this exercise, you will run the `topic_modeling.ipynb` notebook and answer the questions below.

i) **(1 PT)** For the first headline in the dataset, the code processes it and extracts tokens. Provide a list of the tokens.

+-------------+--------------------------------------------------+  
|publish_date | headline_text                                     |  
+-------------+--------------------------------------------------+  
|20030219     | aba decides against community broadcasting licence|  

`tokens = [aba, decid, commun, broadcast, licenc]`

ii) **(1 PT)** The code created a count vectorizer and extracted features. 

`cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)`

The first document had six tokens and the feature vector looked like this:

(500, [118, 498], [1.0, 1.0])   

Explain why there are only two non-zero elements in this feature vector.

Answer: The count vectorizer is set to have a vocab size of 500. This means that only two of the tokens extracted from the first document are within the 500 most common tokens found amongst all the documents.

iii) **(1 PT)** Change the number of topics to 2, rerun LDA, and visualize the topics by showing the topic words.

```
topic: 0
*************************
u
iraq
polic
war
man
sai
claim
govt
plan
nsw
*************************
topic: 1
*************************
new
win
call
council
qld
urg
world
war
iraqi
rain
*************************
```