### University of Virginia 
### DS 5559: Big Data Analytics
### Assignment: Tools for Supervised Learning
### Last Updated: Oct 20, 2019
---  

**Instructions**  
In this assignment, you will code functions to support supervised learning tasks.  The outline is provided below.  The value *None* is used as a placeholder.

TOTAL POINTS: 10

### MODULES

In [1]:
import os

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

### PARAMETERS

In [3]:
directory_path = '/home/jovyan/work/data/brca/'
full_path_to_file = os.path.join(directory_path, 'breast_cancer_wisconsin.csv')
path_to_data = os.path.join(full_path_to_file)

In [4]:
# class = 2 for benign (negative class, 4 for malignant (positive class)
target = 'class'
positive_label = 4
negative_label = 2

SEED = 314

### READ IN DATA

In [5]:
brca = spark.read.csv(path_to_data, header=True, inferSchema=True)

In [6]:
brca.printSchema()

root
 |-- sample_code_number: integer (nullable = true)
 |-- clump_thickness: integer (nullable = true)
 |-- uniformity_cell_size: integer (nullable = true)
 |-- uniformity_cell_shape: integer (nullable = true)
 |-- marginal_adhesion: integer (nullable = true)
 |-- single_epithelial_cell_size: integer (nullable = true)
 |-- bare_nuclei: string (nullable = true)
 |-- bland_chromatin: integer (nullable = true)
 |-- normal_nucleoli: integer (nullable = true)
 |-- mitosis: integer (nullable = true)
 |-- class: integer (nullable = true)



In [7]:
brca.count()

699

In [8]:
# compute distribution of target variable
brca.groupBy(target).count().show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  241|
|    2|  458|
+-----+-----+



### Task 1:  Balancing a DataFrame with Downsampling  
i) (**4 PTS**) Write a function to implement downsampling.  Enter code into the cell containing the `downsample` function.  

INPUTS  
* df               - Spark dataframe  
* target           - string, target variable  
* positive_label   - integer, value of positive label  
* negative_label   - integer, value of negative label  

OUTPUT  
balanced spark dataframe  

Downsampling = sample from larger class to match smaller class  

In [9]:
def downsample(df, target, positive_label, negative_label):
    """
    df              spark dataframe
    target          str, target variable
    positive_label  int, value of positive label
    negative_label  int, value of negative label
    
    """
    positives = df.filter(df[target] == positive_label)
    negatives = df.filter(df[target] == negative_label)
    num_positives = positives.count()
    num_negatives = negatives.count()
    
    if (num_positives > num_negatives): # downsample positives
        sampled_df = positives.sample(withReplacement=False, fraction=num_negatives/num_positives, seed=SEED)
        df_b = sampled_df.union(negatives)
    if (num_positives < num_negatives): # downsample negatives
        sampled_df = negatives.sample(withReplacement=False, fraction=num_positives/num_negatives, seed=SEED)
        df_b = sampled_df.union(positives)
    
    return df_b

ii) **(1 PT)** Print the target distribution from this balanced dataset, to show the label counts nearly match.

#### IMPORTANT NOTE:
Sampling won't produce the exact fraction you request. In order to sample efficiently, Spark uses Bernouilli Sampling. 
Each row is assigned a probability of being included. If you request a 10% sample, each row individually has a 10% chance of being included but this does not guarantee an exact 10% sample   
(it should be close, however).

In [10]:
brca_b = downsample(brca, target, positive_label, negative_label)

In [11]:
brca_b.groupBy(target).count().show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  241|
|    2|  255|
+-----+-----+



### Task 2:  Univariate AUC Measurement  

In this exercise, you will measure (in a particular sense) the individual predictive power of the following variables:  
* clump_thickness
* uniformity_cell_size
* uniformity_cell_shape
* marginal_adhesion
* single_epithelial_cell_size

**(5 PTS)** Complete the `compute_univariate_aucs` function below, and print the resulting dataframe in the cell at bottom.
The function does the following in this order:  
* Split the dataset into training and testing sets (*60% / 40%*, respectively)  
* For each variable v:  
    * train a logistic regression classifier with intercept, including variable *v* as predictor
    * classify each record in the test set  
    * measure the area under the ROC curve  
* Return a pandas dataframe containing each variable, its model weight (coefficient), and its Univariate AUC, sorted by Univariate AUC in descending order.  There is a cell at the bottom for placing these results.

INPUTS  
* df, Spark dataframe 
* target variable as string
* training_fraction  
* max_iterations  
* seed  

OUTPUTS  
dataframe containing three columns: variable name, weight, AUROC  

#### IMPORTANT NOTES:   
1) LabeledPoint requires that positive label = 1, negative label = 0  
2) Do NOT use the downsampling function

In [7]:
# load modules
import pandas as pd
import pyspark.sql.functions as F
import pyspark.mllib.regression as reg
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [8]:
# parameters
training_fraction = 0.6
ITERS = 10

In [9]:
# narrow the list of features for modeling

vars_to_keep = [
 'clump_thickness',
 'uniformity_cell_size',
 'uniformity_cell_shape',
 'marginal_adhesion',
 'single_epithelial_cell_size'
]

In [10]:
brca_f = brca.select([target]+vars_to_keep)

In [11]:
brca_f.show(3)

+-----+---------------+--------------------+---------------------+-----------------+---------------------------+
|class|clump_thickness|uniformity_cell_size|uniformity_cell_shape|marginal_adhesion|single_epithelial_cell_size|
+-----+---------------+--------------------+---------------------+-----------------+---------------------------+
|    2|              5|                   1|                    1|                1|                          2|
|    2|              5|                   4|                    4|                5|                          7|
|    2|              3|                   1|                    1|                1|                          2|
+-----+---------------+--------------------+---------------------+-----------------+---------------------------+
only showing top 3 rows



In [12]:
# map target labels to 0/1
brca_f = brca_f.withColumn(target,F.when(brca_f[target] == positive_label, 1).otherwise(0))

In [13]:
brca_f.groupBy([target]).count().show()

+-----+-----+
|class|count|
+-----+-----+
|    1|  241|
|    0|  458|
+-----+-----+



In [36]:
def compute_univariate_aucs(df, target, training_fraction, iters, seed):

    data_train, data_test = df.randomSplit([training_fraction, 1 - training_fraction], seed=seed)
    
    # list of predictor variables
    vars = [v for v in brca_f.columns if v != target]
    
    # results storage
    df_auc = pd.DataFrame(index=vars, columns=['weight','auroc'])    

    for v in vars:    
        print('=== analysis of variable: {}'.format(v))

        # select columns
        datai_tr = data_train.select([target, v])
        datai_te = data_test.select([target, v])

        # cast to LabeledPoint
        # train
        datai_tr_lp = datai_tr \
                     .rdd \
                     .map(lambda row: reg.LabeledPoint(row[0], row[1:]))
        
        # test
        datai_te_lp = datai_te \
                     .rdd \
                     .map(lambda row: reg.LabeledPoint(row[0], row[1:]))

        LR_Model = LogisticRegressionWithLBFGS.train(datai_tr_lp, iterations=iters, intercept=True)

        # from test set, zip labels with predicted labels and cast to float
        act_pred_test_set = datai_te_lp.map(lambda p: (p.label, LR_Model.predict(p.features))) \
                                            .map(lambda row: (row[0], row[1] * 1.0))

        metrics = BinaryClassificationMetrics(act_pred_test_set)

        df_auc['weight'].loc[v] = LR_Model.weights[0]
        df_auc['auroc'].loc[v] = metrics.areaUnderROC
        print('=== completed analysis of variable: {}'.format(v))
        
    df_auc.sort_values(by='auroc', ascending=False, inplace=True)
    
    return df_auc

Call the `compute_univariate_aucs` and print the results from the dataframe.  Remember not to downsample.

In [37]:
compute_univariate_aucs(brca_f, target, training_fraction, ITERS, SEED)

=== analysis of variable: clump_thickness
=== completed analysis of variable: clump_thickness
=== analysis of variable: uniformity_cell_size
=== completed analysis of variable: uniformity_cell_size
=== analysis of variable: uniformity_cell_shape
=== completed analysis of variable: uniformity_cell_shape
=== analysis of variable: marginal_adhesion
=== completed analysis of variable: marginal_adhesion
=== analysis of variable: single_epithelial_cell_size
=== completed analysis of variable: single_epithelial_cell_size


Unnamed: 0,weight,auroc
uniformity_cell_size,1.65358,0.914467
uniformity_cell_shape,1.65846,0.884134
single_epithelial_cell_size,1.29378,0.878502
marginal_adhesion,1.0952,0.86831
clump_thickness,0.987542,0.839804
