# Connect to Snowflake using `Spark (Scala & PySpark)`


#### Topics covered in this example
* Installing Snowflake connector for Spark
* Connecting to Snowflake using `PySpark`
* Connecting to Snowflake using `Spark Scala`

## Table of Contents:

1. [Prerequisites](#Prerequisites)
2. [Introduction](#Introduction)
3. [Install dependency libraries](#Install-dependency-libraries)
4. [Connect to Snowflake using `Spark - PySpark`](#Connect-to-Snowflake-using-Spark---PySpark)
   * [Read Data from Snowflake table](#Read-data-from-Snowflake-table-using-Spark---PySpark)
   * [Write Data to Snowflake table](#Write-data-to-Snowflake-table-using-Spark---PySpark)
5. [Connect to Snowflake using `Spark - Scala`](#Connect-to-Snowflake-using-Spark---Scala)
   * [Read Data from Snowflake table](#Read-data-from-Snowflake-table-using-Spark---Scala)
   * [Write Data to Snowflake table](#Write-data-to-Snowflake-table-using-Spark---Scala)

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* The Amazon EMR cluster attached to this notebook should have the Spark application installed.
* This notebook is tested with Amazon EMR 6.4.0
* This is a multi-language notebook for EMR Studio which is supported from Amazon EMR 6.4.0 and later
* This example downloads the Snowflake connector from Maven, hence the EMR cluster attached to this notebook must have internet connectivity.
***

## Introduction
In this example we use `Spark (Scala & PySpark)` to connect to a table in Snowflake using the Snowflake connector for Spark.

The Snowflake Connector for Spark (“Spark connector”) brings Snowflake into the Apache Spark ecosystem, enabling Spark to read data from, and write data to, Snowflake.

Snowflake supports three versions of Spark: Spark 2.4, Spark 3.0, and Spark 3.1. There is a separate version of the Snowflake connector for each version of Spark. Use the correct version of the connector for your version of Spark.

The connector runs as a Spark plugin and is provided as a Spark package (spark-snowflake).

For more information please find the [documentation here](#https://docs.snowflake.com/en/user-guide/spark-connector.html)
***

## Install dependency libraries

- We will need the following three libraries for Snowflake connector for Spark to work:

  * `spark-snowflake.jar`
  * `snowflake-jdbc.jar`
  * `snowflake-ingest-sdk.jar`

* We will download these from [Maven Central Repository](https://search.maven.org/classic/#search%7Cga%7C1%7Cg%3A%22net.snowflake%22). You can manually download these and place it on a distributed file system like Amazon S3 or HDFS.

* For this example, we will download these on the Amazon EMR master node and then place these files on HDFS. We will use `hdfs:///tmp/spark-snowflake/lib/` to host these files on HDFS. Feel free to change this location to something which is appropriate for your requirements.

* Execute the below cell or you can manually execute these commands on the EMR master node to get the jar libraries and place them on HDFS:

In [None]:
%%sh

# Create directory to download the jar files
mkdir -p /tmp/spark-snowflake-jars/

# Download jar files
wget -O /tmp/spark-snowflake-jars/spark-snowflake.jar https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.9.2-spark_3.1/spark-snowflake_2.12-2.9.2-spark_3.1.jar
wget -O /tmp/spark-snowflake-jars/snowflake-jdbc.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.10/snowflake-jdbc-3.13.10.jar
wget -O /tmp/spark-snowflake-jars/snowflake-ingest-sdk.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/0.10.3/snowflake-ingest-sdk-0.10.3.jar

# Place the library jar files on HDFS
export JAVA_HOME='/etc/alternatives/jre'
hdfs dfs -mkdir -p /tmp/spark-snowflake/lib/
hdfs dfs -copyFromLocal -f /tmp/spark-snowflake-jars/*.jar /tmp/spark-snowflake/lib/

## Connect to Snowflake using `Spark - PySpark`

We start off by setting the libraries jar for the Spark session. We will do this by running the following cell

<div class="alert alert-block alert-info">
    <b>NOTE :</b> Please note that the below cell will be common for <u><i>PySpark</i></u> and <u><i>Scala</i></u>. If using specifically for either of these kernels, make sure you execute the `configure` cell below.</div>


In [None]:
%%configure -f
{
    "conf": {
        "spark.jars": "hdfs:///tmp/spark-snowflake/lib/spark-snowflake.jar,hdfs:///tmp/spark-snowflake/lib/snowflake-jdbc.jar,hdfs:///tmp/spark-snowflake/lib/snowflake-ingest-sdk.jar"        
    }
}

### Read data from Snowflake table using `Spark - PySpark`

We start off with an example of reading the data from a table in Snowflake using `Spark - PySpark`

In this example, we will connect to Snowflake with the account idenfier `abc12345` with the user `SNOWFLAKE_USER` and password `My_Password`. The table we will query is `CUSTOMER` which is present within schema `TPCH_SF1` inside the database `SNOWFLAKE_SAMPLE_DATA`.

Please make sure you replace these values with the ones appropriate for your environment/setup.

In [None]:
sfOptions = {
  "sfURL" : "https://abc12345.us-east-1.snowflakecomputing.com",
  "sfUser" : "SNOWFLAKE_USER",
  "sfPassword" : "My_Password",
  "sfDatabase" : "SNOWFLAKE_SAMPLE_DATA",
  "sfSchema" : "TPCH_SF1"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

dfPySpark = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", "SELECT * FROM CUSTOMER") \
  .load()

dfPySpark.show()

### Write data to Snowflake table using `Spark - PySpark`

This section describes how we can write data to a table in Snowflake using `Spark - PySpark`

We first create a sample data frame `sampleDf`.

In [None]:
sampleDf = spark.createDataFrame([
 ("1", "john jones"),
 ("2", "tracey smith"),
 ("3", "amy sander")
],["id", "name"])

sampleDf.show()

We will then write the data frame `samepleDf` to Snowflake with the account idenfier `abc12345` with the user `SNOWFLAKE_USER` and password `My_Password`. The table we will write to is `SAMPLETABLE` which is present within schema `PUBLIC` inside the database `SAMPLEDB`.

Please make sure you replace these values with the ones appropriate for your environment/setup.

Alternatively, you can use `query` instead of `dbtable` to provide specific SQL statement to Snowflake.

In [None]:
sfOptions = {
  "sfURL" : "https://abc12345.us-east-1.snowflakecomputing.com",
  "sfUser" : "SNOWFLAKE_USER",
  "sfPassword" : "My_Password",
  "sfDatabase" : "SAMPLEDB",
  "sfSchema" : "PUBLIC"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

sampleDf.write \
    .format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("dbtable", "SAMPLETABLE") \
    .mode("Overwrite") \
    .save()

## Connect to Snowflake using `Spark - Scala`

We We start off by setting the libraries jar for the Spark session. We will do this by running the following cell

<div class="alert alert-block alert-info">
    <b>NOTE :</b> Please note that the below cell will be common for <u><i>PySpark</i></u> and <u><i>Scala</i></u>. If using specifically for either of these kernels, make sure you execute the `configure` cell below.</div>


In [None]:
%%configure -f
{
    "conf": {
        "spark.jars": "hdfs:///tmp/spark-snowflake/lib/spark-snowflake.jar,hdfs:///tmp/spark-snowflake/lib/snowflake-jdbc.jar,hdfs:///tmp/spark-snowflake/lib/snowflake-ingest-sdk.jar"        
    }
}

### Read data from Snowflake table using `Spark - Scala`

We start with an example of reading the data from a table in Snowflake using `Spark - Scala`

In this example, we will connect to Snowflake with the account idenfier `abc12345` with the user `SNOWFLAKE_USER` and password `My_Password`. The table we will query is `CUSTOMER` which is present within schema `TPCH_SF1` inside the database `SNOWFLAKE_SAMPLE_DATA`.

Please make sure you replace these values with the ones appropriate for your environment/setup.

Alternatively, you can use `query` instead of `dbtable` to provide specific SQL statement to Snowflake.

In [None]:
%%scalaspark

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

import org.apache.spark.sql.DataFrame

var sfOptionsScala = Map(
    "sfURL" -> "https://abc12345.us-east-1.snowflakecomputing.com/",
    "sfUser" -> "SNOWFLAKE_USER",
    "sfPassword" -> "My_Password",
    "sfDatabase" -> "SNOWFLAKE_SAMPLE_DATA",
    "sfSchema" -> "TPCH_SF1",
  )

val dfScala = spark.read.
    format(SNOWFLAKE_SOURCE_NAME).
    options(sfOptionsScala).
    option("dbtable", "CUSTOMER").
    load()

dfScala.show()

### Write data to Snowflake table using `Spark - Scala`

This section describes how we can write data to a table in Snowflake using `Spark - Scala`

We first create a sample data frame `sampleDfScala`.

In [None]:
%%scalaspark

import spark.implicits._

val columns = Seq("id", "name")

val data = Seq(("1", "john jones"),
               ("2", "tracey smith"),
               ("3", "amy sanders"))

val sampleDfScala = data.toDF(columns:_*)

sampleDfScala.show()

We will then write the data frame `samepleDfScala` to Snowflake with the account idenfier `abc12345` with the user `SNOWFLAKE_USER` and password `My_Password`. The table we will write to is `SAMPLETABLESCALA` which is present within schema `PUBLIC` inside the database `SAMPLEDB`.

Please make sure you replace these values with the ones appropriate for your environment/setup.

In [None]:
%%scalaspark

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

var sfOptionsScala = Map(
    "sfURL" -> "https://abc12345.us-east-1.snowflakecomputing.com/",
    "sfUser" -> "SNOWFLAKE_USER",
    "sfPassword" -> "My_Password",
    "sfDatabase" -> "SAMPLEDB",
    "sfSchema" -> "PUBLIC",
  )

sampleDfScala.write.
    format(SNOWFLAKE_SOURCE_NAME).
    options(sfOptionsScala).
    option("dbtable", "SAMPLETABLESCALA").
    mode("Overwrite").
    save()