Skip to content

Commit

Permalink
SYSTEMML-594 Add Jupyter (PySpark) Example
Browse files Browse the repository at this point in the history
This adds a PySpark Jupyter Example notebook, and associated
documentation of the PySpark MLContext API.

Closes #97.
  • Loading branch information
dusenberrymw committed Mar 25, 2016
1 parent c26a3b0 commit 32962fe
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 5 deletions.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 6 additions & 5 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ machine in R-like and Python-like declarative languages.
a linear regression example in Standalone Mode.
* The [Quick Start Guide](quick-start-guide.html) provides additional examples of algorithm execution
in Standalone Mode.
* **Spark Batch** - Algorithms are automatically optimized to run across Spark clusters.
* See **Invoking SystemML in Spark Batch Mode** **(Coming soon)**.
* **Spark MLContext** - Spark MLContext is a programmatic API for running SystemML from Spark via Scala or Java.
* **Spark MLContext** - Spark MLContext is a programmatic API for running SystemML from Spark via Scala, Python, or Java.
* See the [Spark MLContext Programming Guide](spark-mlcontext-programming-guide.html) for
[**Spark Shell (Scala)**](spark-mlcontext-programming-guide.html#spark-shell-example),
[Java](spark-mlcontext-programming-guide.html#java-example), and
[**Zeppelin Notebook**](spark-mlcontext-programming-guide.html#zeppelin-notebook-example---linear-regression-algorithm)
[Java](spark-mlcontext-programming-guide.html#java-example),
[**Zeppelin Notebook**](spark-mlcontext-programming-guide.html#zeppelin-notebook-example---linear-regression-algorithm),
and [**Jupyter Notebook (PySpark)**](spark-mlcontext-programming-guide.html#jupyter-pyspark-notebook-example---poisson-nonnegative-matrix-factorization)
examples.
* **Spark Batch** - Algorithms are automatically optimized to run across Spark clusters.
* See **Invoking SystemML in Spark Batch Mode** **(Coming soon)**.
* **Hadoop Batch** - Algorithms are automatically optimized when distributed across Hadoop clusters.
* See [Invoking SystemML in Hadoop Batch Mode](hadoop-batch-mode.html) for detailed information.
* **JMLC** - Java Machine Learning Connector.
Expand Down
132 changes: 132 additions & 0 deletions docs/spark-mlcontext-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -993,4 +993,136 @@ Training time per iter: 0.2334166666666667 seconds
{% endhighlight %}


* * *

# Jupyter (PySpark) Notebook Example - Poisson Nonnegative Matrix Factorization

Here, we'll explore the use of SystemML via PySpark in a [Jupyter notebook](http://jupyter.org/).
This Jupyter notebook example can be nicely viewed in a rendered state
[on GitHub](https://github.com/apache/incubator-systemml/blob/master/samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb),
and can be [downloaded here](https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb) to a directory of your choice.

From the directory with the downloaded notebook, start Jupyter with PySpark:

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" $SPARK_HOME/bin/pyspark --master local[*] --jars $SYSTEMML_HOME/target/SystemML.jar --py-files $SYSTEMML_HOME/src/main/java/org/apache/sysml/api/python/SystemML.py --driver-class-path $SYSTEMML_HOME/target/SystemML.jar

This will open Jupyter in a browser:

![Jupyter Notebook](img/spark-mlcontext-programming-guide/jupyter1.png "Jupyter Notebook")

We can then open up the `SystemML-PySpark-Recommendation-Demo` notebook:

![Jupyter Notebook](img/spark-mlcontext-programming-guide/jupyter2.png "Jupyter Notebook")

## Set up the notebook and download the data

{% highlight python %}
%load_ext autoreload
%autoreload 2
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (10, 6)
{% endhighlight %}

{% highlight python %}
%%sh
# Download dataset
curl -O http://snap.stanford.edu/data/amazon0601.txt.gz
gunzip amazon0601.txt.gz
{% endhighlight %}

## Use PySpark to load the data in as a Spark DataFrame

{% highlight python %}
# Load data
import pyspark.sql.functions as F
dataPath = "amazon0601.txt"

X_train = (sc.textFile(dataPath)
.filter(lambda l: not l.startswith("#"))
.map(lambda l: l.split("\t"))
.map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))
.toDF(("prod_i", "prod_j", "x_ij"))
.filter("prod_i < 500 AND prod_j < 500") # Filter for memory constraints
.cache())

max_prod_i = X_train.select(F.max("prod_i")).first()[0]
max_prod_j = X_train.select(F.max("prod_j")).first()[0]
numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing
print("Total number of products: {}".format(numProducts))
{% endhighlight %}

## Create a SystemML MLContext object

{% highlight python %}
# Create SystemML MLContext
from SystemML import MLContext
ml = MLContext(sc)
{% endhighlight %}

## Define a kernel for Poisson nonnegative matrix factorization (PNMF) in DML

{% highlight python %}
# Define PNMF kernel in SystemML's DSL using the R-like syntax for PNMF
pnmf = """
# data & args
X = read($X)
X = X+1 # change product IDs to be 1-based, rather than 0-based
V = table(X[,1], X[,2])
size = ifdef($size, -1)
if(size > -1) {
V = V[1:size,1:size]
}
max_iteration = as.integer($maxiter)
rank = as.integer($rank)

n = nrow(V)
m = ncol(V)
range = 0.01
W = Rand(rows=n, cols=rank, min=0, max=range, pdf="uniform")
H = Rand(rows=rank, cols=m, min=0, max=range, pdf="uniform")
losses = matrix(0, rows=max_iteration, cols=1)

# run PNMF
i=1
while(i <= max_iteration) {
# update params
H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W))
W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))

# compute loss
losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))
i = i + 1;
}

# write outputs
write(losses, $lossout)
write(W, $Wout)
write(H, $Hout)
"""
{% endhighlight %}

## Execute the algorithm

{% highlight python %}
# Run the PNMF script on SystemML with Spark
ml.reset()
outputs = ml.executeScript(pnmf, {"X": X_train, "maxiter": 100, "rank": 10}, ["W", "H", "losses"])
{% endhighlight %}

## Retrieve the losses during training and plot them

{% highlight python %}
# Plot training loss over time
losses = outputs.getDF(sqlContext, "losses")
xy = losses.sort(losses.ID).map(lambda r: (r[0], r[1])).collect()
x, y = zip(*xy)
plt.plot(x, y)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('PNMF Training Loss')
{% endhighlight %}

![Jupyter Loss Graph](img/spark-mlcontext-programming-guide/jupyter_loss_graph.png "Jupyter Loss Graph")
256 changes: 256 additions & 0 deletions samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb

Large diffs are not rendered by default.

0 comments on commit 32962fe

Please sign in to comment.