### Spark notebook ###

This notebook will only work in a Jupyter notebook or Jupyter lab session running on the cluster master node in the cloud.

Follow the instructions on the computing resources page to start a cluster and open this notebook.

**Steps**

1. Connect to the Windows server using Windows App.
2. Connect to Kubernetes.
3. Start Jupyter and open this notebook from Jupyter in order to connect to Spark.

In [None]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Constants used to interact with Azure Blob Storage using the hdfs command or Spark

global username

username = re.sub('@.*', '', getpass.getuser())


# Functions used below

def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")

        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://localhost:{sc.uiWebUrl.split(":")[-1]}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username} (notebook)</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{username}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.driver.memory", f'{master_memory}g')
        .config("spark.executor.memory", f'{worker_memory}g')
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.kubernetes.container.image", "madsregistry001.azurecr.io/hadoop-spark:v3.3.5-openjdk-8")
        .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
        .config("spark.kubernetes.memoryOverheadFactor", "0.3")
        .config("spark.memory.fraction", "0.1")
        .config("spark.app.name", f"{username} (notebook)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Movie Lens Explicit Feedback ###

The movie lens dataset contains a variety of separate datasets, including movie metadata, user-movie ratings, user-movie tag applications, and tag metadata (relevance scores). This allows us to explore a variety of models, including regression, classification, and ranking.

This example uses the user-movie ratings to fit an explicit feedback model that we could use to recommend movies to users based on rating.

**Sections**

- [Data](#Data)
- [Explicit feedback](#Explicit-feedback)

**Key points**

- The `randomSplit` method will return a different random split every time the output is used unless the random seed is specified or the output is cached.
- We must configure the `ALS` class with with a keyword argument to enable explicit feedback.
- We are predicting a numeric rating so we can use RMSE to evaluate the performance of our model without actually generating any recommendations.

In [None]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=2, executor_cores=2, worker_memory=4, master_memory=1)

In [None]:
# Write your imports and code here or insert cells below

from pyspark.sql import Row, DataFrame, Window, functions as F
from pyspark.sql.types import *

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

import numpy as np

In [None]:
# Determine ideal number of partitions

conf = sc.getConf()

N = int(conf.get("spark.executor.instances"))
M = int(conf.get("spark.executor.cores"))
partitions = 4 * N * M

print(f'ideal # partitions = {partitions}')

### Data ###

The user-movie rating data is stored in a sparse format with one rating per row.

**Key points**

- Each row corresponds to an observed rating, and there will be many more user-movie ratings that are unobserved.
- The user and movie identifiers are already increasing integers otherwise we would need to use the `StringIndexer` class to convert them into increasing integers as that is what is required by `ALS`.

In [None]:
# Load data from HDFS

schema = StructType([
    StructField('user_id', IntegerType(), True),
    StructField('movie_id', IntegerType(), True),
    StructField('rating', FloatType(), True),
    StructField('timestamp', LongType(), True),
])

data = (
    spark.read.csv("hdfs:///data/ml/ratings.csv", header=True, inferSchema=False, schema=schema)
    .repartition(partitions)
    .cache()
)

data.printSchema()
show_as_html(data)

### Explicit feedback ###

We will use the Alternating Least Squares class `ALS` for explicit feedback.

This is a specific optimisation algorithm that solves the low rank matrix factorization problem underlying the collaborative filtering model.

**Key points**
.
- The `ALS` class expects the user and item identifiers to be encoded as increasing integers starting at 0.
- We must configure the `ALS` class with a keyword argument to enable explicit feedback.
- We are predicting a numeric rating so we can use RMSE to evaluate the performance of our model without actually generating any recommendations.

In [None]:
# Split into test and training

training, test = data.randomSplit([0.8, 0.2])
training.cache()
test.cache()

In [None]:
# Alternating least squares (implicitPrefs=False)

als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="movie_id", ratingCol="rating", implicitPrefs=False)
alsModel = als.fit(training)

pred = alsModel.transform(test)
pred = pred.withColumn("predictionRounded", F.round(F.col("prediction") * 2) / 2)
pred.cache()

pred.printSchema()
show_as_html(pred)

In [None]:
# Metrics

temp = pred.filter(F.col('prediction') != np.NaN)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(temp)

evaluator_rounded = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="predictionRounded")
rmse_rounded = evaluator_rounded.evaluate(temp)

print(f'metrics for explicit feedback')
print(f'')
print(f'rmse         = {rmse:.5f}')
print(f'rmse_rounded = {rmse_rounded:.5f}')
print(f'')

### Stop Spark ###

In [None]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()