# Sagemaker Jupyter Notebook Integration with Snowflake via EMR Spark

---
This Notebook shows how to integrate Sagemaker and Snowflake so you can store data in Snowflake and import it into a Jupyter Notebook via the [Spark connector](https://docs.snowflake.net/manuals/user-guide/spark-connector.html). In this particular case, Spark is running on an EMR cluster. The Jupyter Notebook runs a PySpark kernel . To connect to snowflake we need to install the Snowflake Spark Connector on the EMR cluster. 

The easiest way to get started is to configure the Sagemaker environment and the EMR environment in the same VPC.

Building an EMR cluster that can be used with Sagemaker requires a couple of customizations. At a minimum you will need to make changes in the following sections

1. Advanced Options
 * Use the Advanced options link to configure all necessary options
1. Software and Steps
 * Pick Hadoop, and Spark 
 * Optionally Zeppelin and Ganglia
1. Hardware
 * Validate the VPC (Network). The Sagemaker host needs to be created in the same VPC. 
 * Optionally you can change the instance types and whether or not to use spot pricing.
1. General Cluster Settings
 * Set the Cluster name
 * Keep Logging to troubleshoot problems
 * Pick the Bootstrap Action. [see below](#Bootstrap)
1. Security
 * Pick an EC2 key pair (create you don't have one yet). Without the key pair you won't be able to access the master node via ssh to finalize the setup
 * Create and additional security group to enable access via SSH and Livy [see below](#Additional-Security-Groups) 
 
 
## Bootstrap

Sagemaker and Snowflake require additional JAR files to be deployed to both, the EMR master and EMR worker nodes. The easiest way to deploy these libraries is via a bootstrap script. The bootstrap script must be stored in an S3 bucket

## Additional Security Groups

Sagemaker is using the livy API (hosted on the EMR cluster) to access Spark resources. By default, access to the livy API is disable. Best practice is to create a new security group assigned to the EMR cluster which allows inbound traffic on port 8998 coming from the security group assigned to the Sagemaker host. For this to work, the Sagemaker host has to run in a VPC. 

The second reason for creating a security group is to enable SSH. SSH is needed at least for the EMR master node since we have to modify the spark-defaults configuration file.  creation.

## Contents

1. [Notebook Configuration](#Notebook-Configuration)
1. [Credentials](#Credentials)
1. [Data Import](#Data-Import)


## Notebook Configuration
The default configuration for a pyspark Notebook is to use localhost as the Spark environment. You have to change the sparkmagic configuration for the Sagemaker Kernel to point to the EMR master node instead of `localhost`. Update the environment variable 

`EMR_MASTER_INTERNAL_IP` 

with the internal IP of the EMR master node. Be sure to provide the internal IP and not the public IP. 
Unfortunately, Sagemaker does not dynamically update it's runtime parameters when the config file has been updated, you need to restart the Sagemaker Kernel via 

`Kernel->Restart` 

Generally speaking teh config file needs to be updated upon the initial run as well as whenever the IP of the EMR master node has changed.

In [1]:
%%bash
EMR_MASTER_INTERNAL_IP=ip-172-31-58-190.ec2.internal
CONF=/home/ec2-user/.sparkmagic/config.json
if [[ ! -e $CONF.bk ]]
then
   wget "https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json" \
-P /home/ec2-user/.sparkmagic -O /home/ec2-user/.sparkmagic/config.json.bk 2>/dev/null
fi
cat $CONF.bk | sed "s/localhost/$EMR_MASTER_INTERNAL_IP/" > $CONF.new
if [[ $(diff $CONF.new $CONF) ]]
then
   echo "Configuration has changed; Restart Kernel"
fi
cp $CONF.new $CONF

## Credentials
Credentials can be hard coded but a much more secure way is to stored them in the [Systems Manager Parameter Store](https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-paramstore.html). The following step reads the values for the provided keys from the parameter store. These Keys are just an example. You can use the same Keys but you have to create the Key/Value pairs in the parameter store before you can use them here. 

The PySpark kernel automatically creates a spark context upon running the first step on the Spark cluster. Therefore it creates the spark context when we are running the boto3 script below the get the database credentials.

In [2]:
import boto3

params=['/SNOWFLAKE/URL','/SNOWFLAKE/ACCOUNT_ID'
        ,'/SNOWFLAKE/USER_ID','/SNOWFLAKE/PASSWORD'
        ,'/SNOWFLAKE/DATABASE','/SNOWFLAKE/SCHEMA'
        ,'/SNOWFLAKE/WAREHOUSE','/SNOWFLAKE/BUCKET'
        ,'/SNOWFLAKE/PREFIX']

region='us-east-1'

def get_credentials(params):
   ssm = boto3.client('ssm',region)
   response = ssm.get_parameters(
      Names=params,
      WithDecryption=True
   )
   #Build dict of credentials
   param_values={k['Name']:k['Value'] for k in  response['Parameters']}
   return param_values

param_values=get_credentials(params)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark3,idle,,,✔


SparkSession available as 'spark'.


## Data Import
The following step reads weather from the [Snowflake Sample Weather Data](https://docs.snowflake.net/manuals/user-guide/sample-data-openweathermap.html) database. Notice, how easy it is to read and transform JSON data. The result set can directly be used to create a pandas data frame. Check out this [JSON tutorial](https://docs.snowflake.net/manuals/user-guide/json-basics-tutorial.html) on the Snowflake documentation site.

Since we are running a scalable spark environment, we now can read the whole dataset, i.e. about 220 million rows.

In [3]:
sfOptions = {
  "sfURL" : param_values['/SNOWFLAKE/URL'],
  "sfAccount" : param_values['/SNOWFLAKE/ACCOUNT_ID'],
  "sfUser" : param_values['/SNOWFLAKE/USER_ID'],
  "sfPassword" : param_values['/SNOWFLAKE/PASSWORD'],
  "sfDatabase" : param_values['/SNOWFLAKE/DATABASE'],
  "sfSchema" : param_values['/SNOWFLAKE/SCHEMA'],
  "sfWarehouse" : param_values['/SNOWFLAKE/WAREHOUSE'],
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", \
"select (V:main.temp_max - 273.15) * 1.8000 + 32.00 as temp_max_far, " +\
"       (V:main.temp_min - 273.15) * 1.8000 + 32.00 as temp_min_far, " +\
"       cast(V:time as timestamp) time, " +\
"       V:city.coord.lat lat, " +\
"       V:city.coord.lon lon " +\
"from snowflake_sample_data.weather.weather_14_total").load()
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|     TEMP_MAX_FAR|      TEMP_MIN_FAR|               LAT|               LON|
+-------+-----------------+------------------+------------------+------------------+
|  count|        226196451|         226196451|         226196451|         226196451|
|   mean|64.54348749225441| 62.50216523693793| 27.97162737457021|14.959025391770865|
| stddev| 18.8954069539488|19.277845772751554|22.682531505927834| 71.01645033334991|
|    min|         -62.3974|          -62.3974|          -0.03333|          -0.00421|
|    max|            168.8|             138.2|           9.99559|         99.993423|
+-------+-----------------+------------------+------------------+------------------+