# Spark and Jupyter integration - available at http://aka.ms/sparkmeetup
by [@aggFTW](https://twitter.com/aggftw) & [@theseoafs](https://twitter.com/theseoafs) & [@kamhawy](https://twitter.com/kamhawy)

![spark](images/spark-logo-trademark.png)

# Spark Basics

<a href="http://spark.apache.org/" target="_blank">Apache Spark</a> is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications.

There are different ways to use Spark, from command line shells to compiling jars that you can run in a Spark cluster.

However, because Spark is so fast, you can run it interactively from within a "notebook". Jupyter is one of many notebook servers and has been vetted by the data science community as one of the best ones out there.

This is our proposition on how to integrate Spark with Jupyter. We hope you like it!

# Architecture

By default Jupyter notebook comes with a **Python2** kernel. A kernel is a program that runs and interprets your code. You can install two additional kernels that you can use with the Jupyter notebook. These are:

1. **PySpark** (for applications written in Python). PySpark kernel exposes the Spark programming model to Python
2. **Spark** (for applications written in Scala)

The kernels use a completely Open Source library, [sparkmagic](https://github.com/jupyter-incubator/sparkmagic), to communicate with Spark clusters. Here's the rough architecture:

![arch](images/arch.png)

The Jupyter notebook uses the sparkmagic library to send Spark code to Livy, an open source REST server for Spark. When you execute a code cell in a PySpark notebook, it creates a Livy session to execute your code.

You can use the `%%info` magic to display the Livy session information for your notebook.

In [1]:
%%info

Endpoint:
	https://YOURCLUSTER.azurehdinsight.net/livy

Current session ID number:
	None

Session configs:
	{u'executorCores': 2, 'kind': 'spark', u'driverMemory': u'1000M'}

Info for endpoint:
    Sessions:




# Features available with the new kernels

## Notebook setup

When using Spark and Pyspark kernel notebooks, there is no need to create a SparkContext, or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkContext (sc)
- HiveContext (sqlContext)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

## Help magic (%%help)

This magic gives you information about the different supported magics in the Spark and PySpark kernels and the usage for each. 

In [2]:
%%help

Magic,Example,Explanation
info,%%info,Outputs session information for the current Livy endpoint.
cleanup,%%cleanup -f,"Deletes all sessions for the current Livy endpoint, including this notebook's session. The force flag is mandatory."
delete,%%delete -f -s 0,Deletes a session by number for the current Livy endpoint. Cannot delete this kernel's session.
logs,%%logs,Outputs the current session's Livy logs.
configure,"%%configure -f {""executorMemory"": ""1000M"", ""executorCores"": 4}",Configure the session creation parameters. The force flag is mandatory if a session has already been  created and the session will be dropped and recreated. Look at Livy's POST /sessions Request Body for a list of valid parameters. Parameters must be passed in as a JSON string.
sql,%%sql -o tables -q SHOW TABLES,"Executes a SQL query against the sqlContext.  Parameters:  -o VAR_NAME: The result of the query will be available in the %%local Python context as a  Pandas dataframe.  -q: The magic will return None instead of the dataframe (no visualization).  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a SQL query that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."
local,%%local a = 1,All the code in subsequent lines will be executed locally. Code must be valid Python code.


### Session configuration (%%configure)
 
Use the `%%configure` magic to configure new or existing Livy sessions.
* If a session is already running, you can change the configuration by using the `-f` argument with `%%configure` magic. This will delete the current session and recreate it with the applied configurations. If you don't provide the **-f** argument, an error will be displayed and no configuration changes will be applied.
* If you haven't already started the session, then the **-f** argument is not mandatory. Even if you use it with a session that you are just creating, it will not delete any currently running sessions.




These are some session attributes that can be used for configuration 
- **"name"**: Name of the application
- **"driverMemory"**: Memory for driver (e.g. 1000M, 2G) 
- **"executorMemory"**: Memory for executor (e.g. 1000M, 2G) 

For more attributes for session configuration see <a href="https://github.com/cloudera/livy/tree/6fe1e80cfc72327c28107e0de20c818c1f13e027#post-sessions" target="_blank"> POST /sessions request body</a>.

In [3]:
%%configure -f {"name":"remotesparkmagics-sample", "executorMemory": "4G", "executorCores":4}

Endpoint:
	https://YOURCLUSTER.azurehdinsight.net/livy

Current session ID number:
	None

Session configs:
	{u'executorCores': 4, 'kind': 'spark', u'executorMemory': u'4G', u'name': u'remotesparkmagics-sample'}

Info for endpoint:
    Sessions:




## Scala support

In [4]:
val fruits = sc.textFile("wasb:///example/data/fruits.txt")
val fruitsReversed = fruits.map((fruit) => fruit.reverse)

Creating SparkContext as 'sc'
Creating HiveContext as 'sqlContext'
fruitsReversed: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:11

## Automatic visualization of queries 

The sparkmagic kernels automatically visualizes the output of SparkSQL queries. You are given the option to choose between several different types of visualizations:
- Table
- Pie
- Line 
- Area
- Bar

>**TIP**: You can take the first N rows or sample a percentage of rows. Look at the syntax with the `%%help` magic

### SQL magic (%%sql)

The sparkmagic kernels support easy inline SQL queries that execute a SQL query against the `sqlContext`. The (`-o [Variable]`) argument persists the output of the SQL query as a <a href="http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html" target="_blank">Pandas dataframe</a> on the Jupyter server (e.g. `-o tables` in the example below). This means it'll be available in the local mode which will be explained later. The output will be automatically visualized after you run the cell below.

In [5]:
%%sql -o platform_state_df -m sample -r 1.0 -n 200
SELECT clientid, querytime, deviceplatform, querydwelltime, state
FROM hivesampletable

![query_result](images/autoviz_bar.png)

### Running locally (%%local)

You can use the `%%local` magic to run your code locally on the Jupyter server, which is the headnode of the HDInsight cluster. Here's a typical use case for this scenario. 

By default, the output of any code snippet that you run from a Jupyter notebook is available within the context of the session that is persisted on the worker nodes. However, if you want to save a trip to the worker nodes for every computation, and all the data that you need for your computation is available locally on the Jupyter server node, you can use the `%%local` magic to run the code snippet on the Jupyter server. Typically, you would use `%%local` magic in conjunction with the `%%sql` magic with `-o` parameter. The `-o` parameter would persist the output of the SparkSQL query locally and then `%%local` magic would trigger the next set of code snippet to run locally against the output of the SQL or Hive queries that is persisted locally.

> **TIPS**: 
> * Working against a locally persisted data is especially useful when you want visual representation of the results because it gives you the flexibility of using the visual representation libraries such as **matplotlib**. If you work directly against the data on the remote worker nodes, the output received through Livy is always text that limits the options of visual representation.


> * If your dataset is large, it is considered a best practice to do your computation or number-crunching on the cluster or in the SQL query rather than in local mode.


When you use `%%local` all subsequent lines in the cell will be executed locally, meaning it doesn't even have to go to the Spark cluster! The code in the cell must be valid Python code.

In [6]:
%%local
print(platform_state_df.columns)

Index([u'clientid', u'deviceplatform', u'querydwelltime', u'querytime',
       u'state'],
      dtype='object')


In [7]:
%%local
from ipywidgets import Output
from IPython.core.display import display
from plotly.offline import init_notebook_mode, iplot

o = Output()
display(o)

comment = "This graph has nothing to do with the query above, but it shows how you could visualize"\
          "data yourself by using the state column, for example."

import pandas as pd
df = pd.read_csv('https://plot.ly/~Dreamshot/5718/electoral-college-votes-by-us-state/.csv')

for col in df.columns:
    df[col] = df[col].astype(str)

df.columns = ["state", "votes"] 

import plotly
from plotly.graph_objs import *

scl = [[0.0, 'rgb(242,240,247)'],[0.2, 'rgb(218,218,235)'],[0.4, 'rgb(188,189,220)'],\
            [0.6, 'rgb(158,154,200)'],[0.8, 'rgb(117,107,177)'],[1.0, 'rgb(84,39,143)']]

df['text'] = df['state'] 
    
data = [dict(
    type='choropleth',
    colorscale = scl,
    autocolorscale = False,
    locations = df['state'],
    z = df['votes'].astype(float),
    locationmode = 'USA-states',
    text = df['text'],
    hoverinfo = 'location+z',
    marker = dict(
        line = dict (
            color = 'rgb(255,255,255)',
            width = 2
        )
    ),
    colorbar = dict(
        title = "Votes"
    )
)]

layout = dict(
    title = 'Device per state',
    geo = dict(
        scope='usa',
        projection=dict( type='albers usa' ),
        showlakes = True,
        lakecolor = 'rgb(255, 255, 255)'
    )
)
    
fig = dict(data=data, layout=layout)

with o:
    init_notebook_mode()
    plotly.offline.iplot(fig, validate=False)

![map](images/map_us.png)

### Session logs (%%logs)

You can get the logs of your current Livy session to debug any issues you encounter. From the logs, you can retrieve the YARN application id and then use the YARN UI to look at the YARN logs for that application id. Yarn is the resource manager for Spark. URL for Yarn UI is `https://<clusterdnsname>.azurehdinsight.net/yarnui`. For instructions on how to access the YARN UI for the cluster, see [Access YARN application logs on Linux-based HDInsight](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-access-yarn-app-logs-linux/).

In [8]:
%%logs

16/03/09 22:59:53 WARN SparkConf: The configuration key 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key 'spark.yarn.am.waitTime' instead.
16/03/09 22:59:54 WARN SparkConf: The configuration key 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key 'spark.yarn.am.waitTime' instead.
16/03/09 22:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/09 22:59:55 INFO TimelineClientImpl: Timeline service address: http://hn0-YOURCLUSTER.dx.internal.cloudapp.net:8188/ws/v1/timeline/
16/03/09 22:59:55 INFO ConfiguredRMFailoverProxyProvider: Failing over to rm2
16/03/09 22:59:55 INFO Client: Requesting a new application from cluster with 10 NodeManagers
16/03/09 22:59:56 INFO Client: Verifying our application has not requested more than the ma

### Delete session (%%delete)

Use '`%%delete -f -s <session #>`' to delete a session given its session ID. Note that you cannot delete the session that is initiated for the kernel itself. Another notebook might go into an error state, since notebooks are supposed to manage sessions by themselves, and all work will be lost on that session.

### Sessions cleanup (%%cleanup)

Use '`%%cleanup -f`' magic to delete all of the sessions for this cluster, including this notebook's session.
The force flag `-f` is mandatory.

# We love feedback!

### Useful links:

* Contributions welcome at [sparkmagic repo](https://github.com/jupyter-incubator/sparkmagic)!
* All Microsoft HDInsight Spark clusters come with Jupyter set up to be immediately productive on Spark. [Give it a try!](https://azure.microsoft.com/en-us/pricing/free-trial/)
* [Feature Requests](https://feedback.azure.com/forums/217335-hdinsight)
* [Forum Questions](https://social.msdn.microsoft.com/forums/azure/en-US/home?forum=hdinsight)
* [hdinsight tag at StackOverflow](https://stackoverflow.com/questions/tagged/hdinsight)

# Thanks!