In [4]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas as pd
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=10):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [3]:
# Helper functions

def show_class_balance(data, name="data", labelCol="label"):
    """Helper function to show class balance based on label.
    
    Note that this function does not return anything.

    Args:
        data (pyspark.sql.DataFrame): datafame with label
        name (str): name to print above metrics for readability 
        labelCol (str): label column name
    """

    total = data.count()
    counts = data.groupBy(labelCol).count().toPandas()
    counts["ratio"] = counts["count"] / total

    print(f'Class balance [{name}]')
    print(f'')
    print(f'total:   {total}')
    print(f'counts:')
    print(counts)
    print(f'')

def with_custom_prediction(
    pred,
    threshold,
    probabilityCol="probability",
    customPredictionCol="customPrediction",
):
    """Helper function to select a custom prediction column for a custom classification threshold.
    
    Args:
        pred (pyspark.sql.DataFrame): datafame with column for probability 
        threshold (float): classification threshold
        probabilityCol (str): probability column name
        customPredictionCol (str): new custom prediction column name
    
    Returns:
        pred (pyspark.sql.DataFrame): dataframe with new colum for custom prediction
    """

    classification_udf = F.udf(lambda x: int(x[1] > threshold), IntegerType())
    
    return pred.withColumn(customPredictionCol, classification_udf(F.col(probabilityCol)))


def show_metrics(
    pred,
    name="data",
    threshold=0.5,
    labelCol="label",
    predictionCol="prediction",
    rawPredictionCol="rawPrediction",
    probabilityCol="probability",
):
    """Helper function to evaluate and show metrics based on a custom classification threshold.
    
    Note that this function does not return anything.
    
    Args:
        pred (pyspark.sql.DataFrame): datafame with column for probability 
        name (str): name to print above metrics for readability 
        threshold (float): classification threshold (default: 0.5)
        predictionCol (str): prediction column name
        rawPredictionCol (str): raw prediction column name
        probabilityCol (str): probability column name
    """

    if threshold != 0.5:

        predictionCol = "customPrediction"
        pred = with_custom_prediction(pred, threshold, probabilityCol=probabilityCol, customPredictionCol=predictionCol)

    total = pred.count()

    nP_actual = pred.filter((F.col(labelCol) == 1)).count()
    nN_actual = pred.filter((F.col(labelCol) == 0)).count()

    nP = pred.filter((F.col(predictionCol) == 1)).count()
    nN = pred.filter((F.col(predictionCol) == 0)).count()
    TP = pred.filter((F.col(predictionCol) == 1) & (F.col(labelCol) == 1)).count()
    FP = pred.filter((F.col(predictionCol) == 1) & (F.col(labelCol) == 0)).count()
    FN = pred.filter((F.col(predictionCol) == 0) & (F.col(labelCol) == 1)).count()
    TN = pred.filter((F.col(predictionCol) == 0) & (F.col(labelCol) == 0)).count()

    if TP + FP > 0:
        precision = TP / (TP + FP)
    else:
        precision = 0
        
    recall = TP / (TP + FN)
    accuracy = (TP + TN) / total

    binary_evaluator = BinaryClassificationEvaluator(
        rawPredictionCol=rawPredictionCol,
        labelCol=labelCol,
        metricName='areaUnderROC',
    )
    auroc = binary_evaluator.evaluate(pred)

    print(f'Metrics [{name}]')
    print(f'')
    print(f'threshold: {threshold}')
    print(f'')
    print(f'total:     {total}')
    print(f'')
    print(f'nP actual: {nP_actual}')
    print(f'nN actual: {nN_actual}')
    print(f'')
    print(f'nP:        {nP}')
    print(f'nN:        {nN}')
    print(f'')
    print(f'TP         {TP}')
    print(f'FP         {FP}')
    print(f'FN         {FN}')
    print(f'TN         {TN}')
    print(f'')
    print(f'precision: {precision:.8f}')
    print(f'recall:    {recall:.8f}')
    print(f'accuracy:  {accuracy:.8f}')
    print(f'')
    print(f'auroc:     {auroc:.8f}')

In [7]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=2, master_memory=2)

0,1
spark.app.name,kda115 (jupyter)
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.sql.warehouse.dir,file:/users/home/kda115/Assignment%2002/Assignment_Notebook/Analysis/spark-warehouse
spark.driver.port,36787
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.driver.memory,1g
spark.driver.host,mathmadslinux2p.canterbury.ac.nz
spark.sql.shuffle.partitions,16


In [8]:
# Write your imports and code here or insert cells below

from pyspark.sql import Row, DataFrame, Window, functions as F
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.classification import OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RankingEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation

In [9]:
# Other imports to be used locally

import datetime

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import pandas as pd

%matplotlib inline

np.set_printoptions(edgeitems=5, threshold=100, precision=4)

In [10]:
# Load the songs_with_genre parquet file back into a Spark DataFrame
songs_with_genre = spark.read.parquet('hdfs:///user/kda115/Assignment02/data/songs_with_genre.parquet')

# Show the result
songs_with_genre.printSchema()
show_as_html(songs_with_genre,5)

root
 |-- AMM_std_1: double (nullable = true)
 |-- AMM_std_2: double (nullable = true)
 |-- AMM_std_3: double (nullable = true)
 |-- AMM_std_4: double (nullable = true)
 |-- AMM_std_5: double (nullable = true)
 |-- AMM_std_6: double (nullable = true)
 |-- AMM_std_7: double (nullable = true)
 |-- AMM_std_8: double (nullable = true)
 |-- AMM_std_9: double (nullable = true)
 |-- AMM_std_10: double (nullable = true)
 |-- AMM_avg_1: double (nullable = true)
 |-- AMM_avg_2: double (nullable = true)
 |-- AMM_avg_3: double (nullable = true)
 |-- AMM_avg_4: double (nullable = true)
 |-- AMM_avg_5: double (nullable = true)
 |-- AMM_avg_6: double (nullable = true)
 |-- AMM_avg_7: double (nullable = true)
 |-- AMM_avg_8: double (nullable = true)
 |-- AMM_avg_9: double (nullable = true)
 |-- AMM_avg_10: double (nullable = true)
 |-- msd_track_id: string (nullable = true)
 |-- genre: string (nullable = true)



Unnamed: 0,AMM_std_1,AMM_std_2,AMM_std_3,AMM_std_4,AMM_std_5,AMM_std_6,AMM_std_7,AMM_std_8,AMM_std_9,AMM_std_10,...,AMM_avg_3,AMM_avg_4,AMM_avg_5,AMM_avg_6,AMM_avg_7,AMM_avg_8,AMM_avg_9,AMM_avg_10,msd_track_id,genre
0,1.353,6709.0,34770.0,160500000.0,834600000.0,4339000000.0,7075000000000.0,7494000000.0,38980000000.0,993700000000000.0,...,60010.0,-179000000.0,-927700000.0,-4805000000.0,6227000000000.0,8320000000.0,43120000000.0,868500000000000.0,TROVQKM128F9316E59,Pop_Rock
1,1.428,6712.0,48250.0,160600000.0,1129000000.0,7922000000.0,7085000000000.0,10150000000.0,71230000000.0,2433000000000000.0,...,77860.0,-179200000.0,-1225000000.0,-8383000000.0,6239000000000.0,11010000000.0,75330000000.0,2044000000000000.0,TROVQAK128E0781B79,Pop_Rock
2,1.62,3353.0,30780.0,39940000.0,366600000.0,3363000000.0,878200000000.0,3293000000.0,30220000000.0,677300000000000.0,...,52720.0,-44450000.0,-407000000.0,-3722000000.0,771500000000.0,3648000000.0,33400000000.0,592500000000000.0,TROVQSD128F42283D2,Pop_Rock
3,1.07,3354.0,23280.0,39960000.0,280400000.0,1965000000.0,879000000000.0,2515000000.0,17640000000.0,304200000000000.0,...,40850.0,-44490000.0,-313500000.0,-2207000000.0,772300000000.0,2807000000.0,19780000000.0,268700000000000.0,TROVQBK128F42992E8,Pop_Rock
4,0.5706,6727.0,31660.0,161300000.0,753000000.0,3516000000.0,7132000000000.0,6693000000.0,31270000000.0,716100000000000.0,...,53980.0,-179900000.0,-839200000.0,-3910000000.0,6278000000000.0,7451000000.0,34760000000.0,637700000000000.0,TROVQFG128F92CD6DC,Electronic


## Audio Similarity Binary Classification Using Regularization

### B. Convert the genre column into a new binary label that represent if the track is "Electronic" or some other genre 


In [12]:
# Convert the genre column into binary column 
# feature_genre_binary = feature_genre.withColumn('class', F.when(F.col('genre') == 'Electronic', 1).otherwise(0))
songs_with_genre = songs_with_genre.withColumn(
    'class',
    F.when(F.col('genre') == 'Electronic',1).otherwise(0)
    )

# show the result 
show_as_html(songs_with_genre, 5)

Unnamed: 0,AMM_std_1,AMM_std_2,AMM_std_3,AMM_std_4,AMM_std_5,AMM_std_6,AMM_std_7,AMM_std_8,AMM_std_9,AMM_std_10,...,AMM_avg_4,AMM_avg_5,AMM_avg_6,AMM_avg_7,AMM_avg_8,AMM_avg_9,AMM_avg_10,msd_track_id,genre,class
0,1.353,6709.0,34770.0,160500000.0,834600000.0,4339000000.0,7075000000000.0,7494000000.0,38980000000.0,993700000000000.0,...,-179000000.0,-927700000.0,-4805000000.0,6227000000000.0,8320000000.0,43120000000.0,868500000000000.0,TROVQKM128F9316E59,Pop_Rock,0
1,1.428,6712.0,48250.0,160600000.0,1129000000.0,7922000000.0,7085000000000.0,10150000000.0,71230000000.0,2433000000000000.0,...,-179200000.0,-1225000000.0,-8383000000.0,6239000000000.0,11010000000.0,75330000000.0,2044000000000000.0,TROVQAK128E0781B79,Pop_Rock,0
2,1.62,3353.0,30780.0,39940000.0,366600000.0,3363000000.0,878200000000.0,3293000000.0,30220000000.0,677300000000000.0,...,-44450000.0,-407000000.0,-3722000000.0,771500000000.0,3648000000.0,33400000000.0,592500000000000.0,TROVQSD128F42283D2,Pop_Rock,0
3,1.07,3354.0,23280.0,39960000.0,280400000.0,1965000000.0,879000000000.0,2515000000.0,17640000000.0,304200000000000.0,...,-44490000.0,-313500000.0,-2207000000.0,772300000000.0,2807000000.0,19780000000.0,268700000000000.0,TROVQBK128F42992E8,Pop_Rock,0
4,0.5706,6727.0,31660.0,161300000.0,753000000.0,3516000000.0,7132000000000.0,6693000000.0,31270000000.0,716100000000000.0,...,-179900000.0,-839200000.0,-3910000000.0,6278000000000.0,7451000000.0,34760000000.0,637700000000000.0,TROVQFG128F92CD6DC,Electronic,1


### C. Split the dataset use stratified random sampling and resampling technique 

#### C.1 Stratify Random Split 

In [16]:
# Assemble all the feature columns (those that start with "AMM_") into a single "features" vector column
assembler = VectorAssembler(
    inputCols=[col for col in songs_with_genre.columns if col.startswith("AMM_")],
    outputCol="features"
)

# Apply the assembler to transform the dataset
data = assembler.transform(songs_with_genre)

# Select the relevant columns: label (class), and features
data = data.select(
    F.col('class').alias('label'),
    F.col('features')
)

# Show the transformed data
show_as_html(data)

Unnamed: 0,label,features
0,0,"[1.353, 6709.0, 34770.0, 160500000.0, 83460000..."
1,0,"[1.428, 6712.0, 48250.0, 160600000.0, 11290000..."
2,0,"[1.62, 3353.0, 30780.0, 39940000.0, 366600000...."
3,0,"[1.07, 3354.0, 23280.0, 39960000.0, 280400000...."
4,1,"[0.5706, 6727.0, 31660.0, 161300000.0, 7530000..."
5,0,"[1.267, 6709.0, 55000.0, 160500000.0, 13170000..."
6,0,"[1.117, 3355.0, 11910.0, 40000000.0, 144400000..."
7,0,"[0.8293, 6721.0, 44440.0, 161200000.0, 1067000..."
8,0,"[1.608, 3358.0, 29040.0, 40090000.0, 351100000..."
9,0,"[1.584, 3359.0, 16990.0, 40110000.0, 202400000..."


In [17]:
# Exact stratification using Window (multi-class, counts computed automatically)

temp = (
    data
    .withColumn("id", F.monotonically_increasing_id())
    .withColumn("random", F.rand())  # random number between 0 and 1
    .withColumn(
        "row",
        F.row_number()  # row number in each class partition (0, 1, 2, ...)
        .over(
            Window
            .partitionBy("label")
            .orderBy("random")
        )
    )
)

counts = (
    data
    .groupBy("label")
    .count()
    .toPandas()
    .set_index("label")["count"]
    .to_dict()
)
labels = sorted(counts.keys())

print(counts)

training = temp
for label in labels:
    training = training.where((F.col("label") != label) | (F.col("row") < counts[label] * 0.8))

training.cache()

test = temp.join(training, on="id", how="left_anti")
test.cache()

training = training.drop("id", "random", "row")
test = test.drop("id", "random", "row")

show_class_balance(data, "data")
show_class_balance(training, "training")
show_class_balance(test, "test")

{1: 40662, 0: 379942}
Class balance [data]

total:   420604
counts:
   label   count     ratio
0      1   40662  0.096675
1      0  379942  0.903325

Class balance [training]

total:   336482
counts:
   label   count     ratio
0      1   32529  0.096674
1      0  303953  0.903326

Class balance [test]

total:   84122
counts:
   label  count     ratio
0      1   8133  0.096681
1      0  75989  0.903319



In [18]:
# Ensure datasets are cached

data.cache()
training.cache()
test.cache()

DataFrame[label: int, features: vector]

In [19]:
show_as_html(training)

Unnamed: 0,label,features
0,1,"[1.789, 6742.0, 68110.0, 162100000.0, 16470000..."
1,1,"[0.7236, 6711.0, 47280.0, 160600000.0, 1130000..."
2,1,"[1.049, 6711.0, 40170.0, 160500000.0, 96780000..."
3,1,"[1.032, 6723.0, 43600.0, 161100000.0, 10510000..."
4,1,"[1.324, 6721.0, 38810.0, 161000000.0, 95660000..."
5,1,"[0.6975, 3316.0, 12720.0, 38950000.0, 14970000..."
6,1,"[1.288, 3352.0, 20450.0, 39940000.0, 235900000..."
7,1,"[0.988, 6709.0, 37240.0, 160500000.0, 89530000..."
8,1,"[1.495, 6710.0, 48430.0, 160500000.0, 11700000..."
9,1,"[1.786, 6755.0, 36180.0, 162700000.0, 87020000..."


### C.2 Resampling Method & Scaling

### C.2.1 No Sampling

#### Scaling Original Data

In [20]:
# standard scaling for down sampling
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Fit the StandardScaler model on the training data
scaler_no_sampling = standard_scaler.fit(training)

# Transform the training data
scaled_training_nosampled = scaler_no_sampling.transform(training)

# Transform the test data using the same scaler model
scaled_nosampled_test = scaler_no_sampling.transform(test)

### C.2.1 Down Sampling

In [21]:
# Downsampling

training_downsampled = (
    training
    .withColumn("random", F.rand())
    .where((F.col("label") != 0) | ((F.col("label") == 0) & (F.col("random") < 2 * (40662 / 379942))))
)
training_downsampled.cache()

show_class_balance(training_downsampled, "training (downsampled)")

Class balance [training (downsampled)]

total:   97750
counts:
   label  count     ratio
0      1  32529  0.332777
1      0  65221  0.667223



#### Scaling DownSampling Data

In [22]:
# standard scaling for down sampling
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Fit the StandardScaler model on the training data
scaler_down_sampling = standard_scaler.fit(training_downsampled)

# Transform the training data
scaled_training_downsampled = scaler_down_sampling.transform(training_downsampled)

# Transform the test data using the same scaler model
scaled_downsampled_test = scaler_down_sampling.transform(test)

#### C.2.2 Up Sampling

In [23]:
# Resampling via poisson random sampling

counts = {label: count for label, count in training.groupBy("label").count().collect()}
count_lower_bound = 50000 
#count_upper_bound = 300000

def random_upsample(x, counts, count_lower_bound):

    count = counts[x]

    if count < count_lower_bound:
        return [x] * int(1 + np.random.poisson((count_lower_bound - count) / count))  # randomly upsample to count_lower_bound
    
    return [x]  # do nothing

random_upsample_udf = F.udf(lambda x: random_upsample(x, counts, count_lower_bound), ArrayType(IntegerType()))
training_upsampled = (
    training
    .withColumn("sample", random_upsample_udf(F.col("label")))
    .select(
        F.col("label"),
        F.col("features"),
        F.explode(F.col("sample")).alias("sample")
    )
    .drop("sample")
)

show_class_balance(training_upsampled, "training (upsampled)")

Class balance [training (upsampled)]

total:   353769
counts:
   label   count     ratio
0      1   49739  0.140597
1      0  303953  0.859185



#### Scaling UpSampling Data

In [24]:
# standard scaling for up sampling
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Fit the StandardScaler model on the training data
scaler_upsampling = standard_scaler.fit(training_upsampled)

# Transform the training data
scaled_training_upsampled = scaler_upsampling.transform(training_upsampled)

# Transform the test data using the same scaler model
scaled_upsampled_test = scaler_upsampling.transform(test)

#### C.2.3  ReSampling

In [25]:
# Resampling via poisson random sampling

counts = {label: count for label, count in training.groupBy("label").count().collect()}
count_lower_bound = 50000
count_upper_bound = 200000

def random_resample(x, counts, count_lower_bound, count_upper_bound):

    count = counts[x]

    if count < count_lower_bound:
        return [x] * int(1 + np.random.poisson((count_lower_bound - count) / count))  # randomly upsample to count_lower_bound

    if count > count_upper_bound:
        if np.random.rand() < count_upper_bound / count: # randomly downsample to count_upper_bound
            return [x]
        else:
            return []

    return [x]  # do nothing

random_resample_udf = F.udf(lambda x: random_resample(x, counts, count_lower_bound, count_upper_bound), ArrayType(IntegerType()))
training_resampled = (
    training
    .withColumn("sample", random_resample_udf(F.col("label")))
    .select(
        F.col("label"),
        F.col("features"),
        F.explode(F.col("sample")).alias("sample")
    )
    .drop("sample")
)

show_class_balance(training_resampled, "training (resampled)")

Class balance [training (resampled)]

total:   182256
counts:
   label   count     ratio
0      1   49921  0.273906
1      0  131496  0.721491



#### Scaling ReSampling Data

In [26]:
# standard scaling for re sampling
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Fit the StandardScaler model on the training data
scaler_resampling = standard_scaler.fit(training_resampled)

# Transform the training data
scaled_training_resampled = scaler_resampling.transform(training_resampled)

# Transform the test data using the same scaler model
scaled_resampled_test = scaler_resampling.transform(test)

#### C.2.4 Observation reweighting

In [27]:
# Observation reweighting

training_weighted = training.withColumn(
    "weight",
    F.when(F.col("label") == 0, 1.0) # when the label is 0, give 0.5 weight
     .when(F.col("label") == 1, 5.0) # when label is 1, give 5.0 weight
     .otherwise(1.0)
)

training_weighted.printSchema()
show_as_html(training_weighted)

weights = (
    training_weighted
    .groupBy("label")
    .agg(
        F.collect_set(F.col("weight")).alias("weights")
    )
    .toPandas()
)
print(weights)

root
 |-- label: integer (nullable = false)
 |-- features: vector (nullable = true)
 |-- weight: double (nullable = false)



Unnamed: 0,label,features,weight
0,1,"[1.789, 6742.0, 68110.0, 162100000.0, 16470000...",5.0
1,1,"[0.7236, 6711.0, 47280.0, 160600000.0, 1130000...",5.0
2,1,"[1.049, 6711.0, 40170.0, 160500000.0, 96780000...",5.0
3,1,"[1.032, 6723.0, 43600.0, 161100000.0, 10510000...",5.0
4,1,"[1.324, 6721.0, 38810.0, 161000000.0, 95660000...",5.0
5,1,"[0.6975, 3316.0, 12720.0, 38950000.0, 14970000...",5.0
6,1,"[1.288, 3352.0, 20450.0, 39940000.0, 235900000...",5.0
7,1,"[0.988, 6709.0, 37240.0, 160500000.0, 89530000...",5.0
8,1,"[1.495, 6710.0, 48430.0, 160500000.0, 11700000...",5.0
9,1,"[1.786, 6755.0, 36180.0, 162700000.0, 87020000...",5.0


   label weights
0      1   [5.0]
1      0   [1.0]


#### Scaling Observation Reweighted Data

In [28]:
# standard scaling for reweighted
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Fit the StandardScaler model on the training data
scaler_reweighted = standard_scaler.fit(training_weighted)

# Transform the training data
scaled_training_reweighted = scaler_reweighted.transform(training_weighted)

# Transform the test data using the same scaler model
scaled_reweighted_test = scaler_reweighted.transform(test)

### D. Train and Predict each of the three classification algorithms

#### D.1 Logistic Regression Model

##### D.1.1 No Sampling 

In [30]:
# Training Logistic regression model with No Sampling
lr_no_sampling = LogisticRegression(featuresCol='features', 
                                    labelCol='label',
                                    regParam=0.01, 
                                    elasticNetParam=1.0)

lr_no_sampled_model = lr_no_sampling.fit(scaled_training_nosampled)

# Predict Logistic Regression with No Sampling
lr_no_sampled_pred = lr_no_sampled_model.transform(scaled_nosampled_test)
lr_no_sampled_pred.cache()

show_metrics(lr_no_sampled_pred)

Metrics [data]

threshold: 0.5

total:     84122

nP actual: 8133
nN actual: 75989

nP:        51
nN:        84071

TP         25
FP         26
FN         8108
TN         75963

precision: 0.49019608
recall:    0.00307390
accuracy:  0.90330710

auroc:     0.60922276


##### D.1.2  ReSampling 

In [31]:
# Training Logistic regression model with Re Sampling 0.607
lr_resampled = LogisticRegression(
    featuresCol='features', 
    labelCol='label',
    regParam=0.01, 
    elasticNetParam=1.0
)
lr_model_resampled = lr_resampled.fit(scaled_training_resampled)

# Predict Logistic Regression with No Sampling
lr_resampled_pred = lr_model_resampled.transform(scaled_resampled_test)
lr_resampled_pred.cache()

# Show the result
show_metrics(lr_resampled_pred)

Metrics [data]

threshold: 0.5

total:     84122

nP actual: 8133
nN actual: 75989

nP:        412
nN:        83710

TP         102
FP         310
FN         8031
TN         75679

precision: 0.24757282
recall:    0.01254150
accuracy:  0.90084639

auroc:     0.61172723


##### D.1.2  UpSampling 

In [33]:
# Training Logistic Regression model with Up Sampling
lr_upsampled = LogisticRegression(
    featuresCol='features', 
    labelCol='label',
    regParam=0.01, 
    elasticNetParam=1.0
)
lr_model_upsampled = lr_upsampled.fit(scaled_training_upsampled)

# Predict Logistic Regression with No Sampling
lr_upsampled_pred = lr_model_upsampled.transform(scaled_upsampled_test)
lr_upsampled_pred.cache()

# Show the result
show_metrics(lr_upsampled_pred)

Metrics [data]

threshold: 0.5

total:     84122

nP actual: 8133
nN actual: 75989

nP:        168
nN:        83954

TP         62
FP         106
FN         8071
TN         75883

precision: 0.36904762
recall:    0.00762326
accuracy:  0.90279594

auroc:     0.60776039


##### D.1.3 Down Sampling 

In [56]:
# Training Logistic Regression model with Down Sampling
lr_downsampled = LogisticRegression(
    featuresCol='features', 
    labelCol='label',
    regParam=0.01, 
    elasticNetParam=1.0
)
lr_model_downsampled = lr_downsampled.fit(scaled_training_downsampled)

# Predict Logistic Regression with No Sampling
lr_downsampled_pred = lr_model_downsampled.transform(scaled_downsampled_test)
lr_downsampled_pred.cache()

# Show the result
show_metrics(lr_downsampled_pred)

Metrics [data]

threshold: 0.5

total:     84122

nP actual: 8133
nN actual: 75989

nP:        468
nN:        83654

TP         126
FP         342
FN         8007
TN         75647

precision: 0.26923077
recall:    0.01549244
accuracy:  0.90075129

auroc:     0.61478888


##### D.1.4 Observation Reweighted 

In [32]:
# Training Logistic Regression model with Observation Reweighting 
lr_reweighted = LogisticRegression(
    featuresCol='features', 
    labelCol='label', 
    weightCol="weight",
    regParam=0.01, 
    elasticNetParam=1.0
)
lr_model_reweighted = lr_reweighted.fit(scaled_training_reweighted)

# Predict Logistic Regression with No Sampling
lr_reweighted_pred = lr_model_reweighted.transform(scaled_reweighted_test)
lr_reweighted_pred.cache()

# Show the result
show_metrics(lr_reweighted_pred)

Metrics [data]

threshold: 0.5

total:     84122

nP actual: 8133
nN actual: 75989

nP:        460
nN:        83662

TP         116
FP         344
FN         8017
TN         75645

precision: 0.25217391
recall:    0.01426288
accuracy:  0.90060864

auroc:     0.61470688


In [41]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()