### Lab: Tuning and Topic Modeling

### University of Virginia
### DS 7200: Distributed Computing
### Last Updated: August 20, 2023

---  

**INSTRUCTIONS**  
In this assignment, you will do three things:
1) Tune a logistic regression model  
2) Label-balance a dataset  
3) Run the Topic Modeling notebook, making small tweaks and capturing results  

**TOTAL POINTS: 10**

In [1]:
import os

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

### PARAMETERS

In [3]:
# update to match your path
directory_path = '/home/brc4cb/distributed_computing/07_tuning_and_nlp_task/'
full_path_to_file = os.path.join(directory_path, 'breast_cancer_wisconsin.csv')
path_to_data = os.path.join(full_path_to_file)

In [4]:
# class = 2 for benign (negative class, 4 for malignant (positive class)
target = 'class'
positive_label = 4
negative_label = 2

SEED = 314

### READ IN DATA

In [5]:
brca = spark.read.csv(path_to_data, header=True, inferSchema=True)

In [6]:
brca.printSchema()

root
 |-- id: integer (nullable = true)
 |-- clump_thickness: integer (nullable = true)
 |-- uniformity_cell_size: integer (nullable = true)
 |-- uniformity_cell_shape: integer (nullable = true)
 |-- marginal_adhesion: integer (nullable = true)
 |-- single_epithelial_cell_size: integer (nullable = true)
 |-- bare_nuclei: string (nullable = true)
 |-- bland_chromatin: integer (nullable = true)
 |-- normal_nucleoli: integer (nullable = true)
 |-- mitoses: integer (nullable = true)
 |-- class: integer (nullable = true)



In [7]:
brca.count()

699

In [8]:
# compute distribution of target variable
brca.groupBy(target).count().show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  241|
|    2|  458|
+-----+-----+



### Task 1:  Cross Validate a Logistic Regression Model
i) (**4 PTS**) This task has the following requirements:
- import necessary modules
- use these features as predictors: `clump_thickness`,`uniformity_cell_size`,`uniformity_cell_shape`,`marginal_adhesion`  
- `class` is response variable. apply recoding as needed. hint: save as new variable.
- use 3 folds in the cross validator object
- use BinaryClassificationEvaluator
- logistic regression model with `maxIter`=10  
- tuning grid with `regParam` values of 0.1 and 0.01
- finally, print the average metrics based on each `regParam` value. the attribute `avgMetrics` in the cv model will hold these. 

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import when

from pyspark.ml.feature import VectorAssembler

# inputCols take a list of column names
# outputCol is arbitrary name of new column; generally called features
brca = brca.withColumn("label", when(brca['class'] == 4,1) \
      .when(brca['class'] == 2,0))

assembler = VectorAssembler(inputCols=["clump_thickness", "uniformity_cell_size", "uniformity_cell_shape", "marginal_adhesion"], outputCol="features")

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

lr = LogisticRegression(maxIter=10, labelCol = 'label', featuresCol= 'scaledFeatures')

pipeline = Pipeline(stages = [assembler, scaler, lr])
# Fit the model
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters
cvModel = crossval.setParallelism(4).fit(brca) # train 4 models in parallel
print(cvModel.avgMetrics)

[0.9903113414409844, 0.9901615314117947]


### Task 2:  Balancing a DataFrame with Downsampling  
i) (**2 PTS**) Write a function to implement downsampling.  Enter code into the cell containing the `downsample` function.  

INPUTS  
* df               - Spark dataframe  
* target           - string, target variable  
* positive_label   - integer, value of positive label  
* negative_label   - integer, value of negative label  

OUTPUT  
balanced spark dataframe  

Downsampling = sample from larger class to match smaller class  

**Example:**  

INITIAL STATE  
Smaller class has 100 records  
Larger class size has 400 records

ACTION  
Sample 100 records from larger class, without replacement  
Retain all records from smaller class

END STATE    
This produces a balanced dataset containing 100 records from each class

In [6]:
from pyspark.sql.functions import lit
import random
# Create an RDD with 500 rows and a unique ID for each row
data = [(i,) for i in range(500)]
rdd = spark.sparkContext.parallelize(data)

# Define the schema for the DataFrame
schema = ["id"]

# Create a DataFrame by mapping the RDD and adding the "id" column
df_test = rdd.toDF(schema)

# Add a "label" column with a value of 1 for rows with id < 400, and 0 for id >= 400
df_test = df_test.withColumn("label", lit(1)).filter(df_test["id"] < 400).union(
    df_test.withColumn("label", lit(0)).filter(df_test["id"] >= 400)
)

In [7]:
def dynamic_sample_spark(df, target, positive_label, negative_label):
    positive_count = df.filter(df[f'{target}']==positive_label).count()
    negative_count = df.filter(df[f'{target}']==negative_label).count()
    total = df.count()
    positive_fraction = positive_count/total
    negative_fraction = negative_count/total
    pos_to_neg = positive_count/negative_count #when neg>pos, so upsample pos (or downsample neg)
    neg_to_pos = negative_count/positive_count #when neg<pos, so downsample pos
    if positive_fraction < negative_fraction: #upsample pos/downsample neg
        df2 = df.filter(df[f"{target}"]==negative_label).sample(fraction = pos_to_neg, withReplacement=False)
        df_final = df.filter(df[f'{target}']==positive_label).union(df2) 
    if positive_fraction > negative_fraction: #downsample
        df2 = df.filter(df[f"{target}"]==positive_label).sample(fraction = neg_to_pos, withReplacement=False)
        df_final = df.filter(df[f'{target}']==negative_label).union(df2) 
    return df_final
        
df = dynamic_sample_spark(df_test, 'label', 1, 0)

In [8]:
def dynamic_sample_exact(df, target, positive_label, negative_label):
    positive_count = df.filter(df[f'{target}']==positive_label).count()
    negative_count = df.filter(df[f'{target}']==negative_label).count()
    if positive_count < negative_count: #upsample
        data = []
        for i in range(0, negative_count-positive_count):
            data.append(df.filter(df[f'{target}']==positive_label).collect()[random.randint(0, positive_count)])
        df2 = spark.createDataFrame(data = data)
    if positive_count > negative_count: #downsample
        data = []
        while len(data)<negative_count:
            obs = df.filter(df[f'{target}']==positive_label).collect()[random.randint(0, positive_count)]
            if obs not in data:
                data.append(obs)
        df2 = spark.createDataFrame(data = data)
    df_final = df.filter(df[f'{target}']==negative_label).union(df2) 
    return df_final
        
df2 = dynamic_sample_exact(df_test, 'label', 1, 0)

ii) **(1 PT)** Print the target distribution from this balanced dataset, to show the label counts nearly match.

#### IMPORTANT NOTE:
Sampling won't produce the exact fraction you request. In order to sample efficiently, Spark uses Bernouilli Sampling. 
Each row is assigned a probability of being included. If you request a 10% sample, each row individually has a 10% chance of being included but this does not guarantee an exact 10% sample   
(it should be close, however).

In [9]:
df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  100|
|    1|  108|
+-----+-----+



In [10]:
df2.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  100|
|    1|  100|
+-----+-----+



I wrote two functions, one using spark functionality and one using the random package in python. Both dynamically upsample or downsample based on the distribution of the labels in the dataframe, but the one that utilizes the random package in python returns a dataframe where the counts of both classes are perfectly balanced, whereas the spark implementation returns a dataframe where the counts for both classes are close, but not exact. I know this is extra work, but I had fun thinking through it.

### Task 3:  Topic Modeling

In this exercise, you will run the `topic_modeling.ipynb` notebook and answer the questions below.

i) **(1 PT)** For the first headline in the dataset, the code processes it and extracts tokens. Provide a list of the tokens.

+-------------+--------------------------------------------------+  
|publish_date | headline_text                                     |  
+-------------+--------------------------------------------------+  
|20030219     | aba decides against community broadcasting licence|  

[aba, decid, commun, broadcast, licenc] 

ii) **(1 PT)** The code created a count vectorizer and extracted features. 

`cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)`

The first document had six tokens and the feature vector looked like this:

(500, [118, 498], [1.0, 1.0])   

Explain why there are only two non-zero elements in this feature vector.

This is because you set the minimum document frequency to 3, meaning tokens that do not appear 3 or more times are not included in the 500-word vocabulary you set. As a result, the four other tokens in the document must not have appeared 3 or more times, so they were not added to the vocabulary and therefore they would not be in the feature vector. 

iii) **(1 PT)** Change the number of topics to 2, rerun LDA, and visualize the topics by showing the topic words.