<a href="https://colab.research.google.com/github/harithatavarthy/Spark-Snowflake/blob/master/sparkSession_toSnowflake.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**The purpose of this notebook is to demonstrate connectivity to snowflake from spark using spark-snowflake connector. For demonstration purpose , we will use Spark 2.3.4 with Hadoop 2.7.  We will also use Snowflake JDBC version 3.6.12 and Spark-Snowflake connector version 2.11-2.4.8. If you are using a different version of spark, you will have to download the appropriate versions of spark-snowflake connector. For more information, refer to snowflake documentation https://docs.snowflake.net/manuals/user-guide/spark-connector-install.html**

***You dont have to make changes to any of the cells below except for the part passing snowflake credentials and attributes***

**Install Open JDK and Spark version 2.3.5 with Hadoop 2.7**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark


**Get Snowflake Connector and JCBC Driver**

In [2]:
!wget -O snowflake-jdbc-3.6.12.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.6.12/snowflake-jdbc-3.6.12.jar
!wget -O spark-snowflake_2.11-2.4.8.jar https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.11/2.4.8/spark-snowflake_2.11-2.4.8.jar

--2020-01-26 01:39:28--  https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.6.12/snowflake-jdbc-3.6.12.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.200.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.200.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15462110 (15M) [application/java-archive]
Saving to: ‘snowflake-jdbc-3.6.12.jar’


2020-01-26 01:39:28 (89.3 MB/s) - ‘snowflake-jdbc-3.6.12.jar’ saved [15462110/15462110]

--2020-01-26 01:39:29--  https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.11/2.4.8/spark-snowflake_2.11-2.4.8.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.200.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.200.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 602469 (588K) [application/java-archive]
Saving to: ‘spark-snowflake_2.11-2.4.8.jar’


2020-01-26 01:39:29 (9.21 MB/s) - ‘spark-snowflake_2.11-2.4.8.jar’ saved [602469/602469]



**Check to see if all the necessary JARS are available for use**

In [1]:

!ls -ltr /content/*.jar

-rw-r--r-- 1 root root 15462110 Sep 25  2018 /content/snowflake-jdbc-3.6.12.jar
-rw-r--r-- 1 root root   602469 Oct  5  2018 /content/spark-snowflake_2.11-2.4.8.jar


**Set Spark and Java home. Pass Driver Class Path and the location of JARS to Pyspark application**

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-class-path /content/snowflake-jdbc-3.6.12.jar:/content/spark-snowflake_2.11-2.4.8.jar --jars /content/snowflake-jdbc-3.6.12.jar,/content/spark-snowflake_2.11-2.4.8.jar pyspark-shell"


**Create Spark Session**

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
        .setMaster("local[*]")
        .setAppName("Spar-Snowflake-Connector")
        )
spark = SparkSession.builder.config(conf=conf).getOrCreate()

**Check to see if spark properties/configuration is as expected**

In [4]:

print(spark.sparkContext._conf.getAll())


[('spark.driver.host', '2df7e04ca0df'), ('spark.driver.extraClassPath', '/content/snowflake-jdbc-3.6.12.jar:/content/spark-snowflake_2.11-2.4.8.jar'), ('spark.repl.local.jars', 'file:///content/snowflake-jdbc-3.6.12.jar,file:///content/spark-snowflake_2.11-2.4.8.jar'), ('spark.jars', 'file:///content/snowflake-jdbc-3.6.12.jar,file:///content/spark-snowflake_2.11-2.4.8.jar'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.app.id', 'local-1580004055747'), ('spark.ui.showConsoleProgress', 'true'), ('spark.driver.port', '45187'), ('spark.app.name', 'Spar-Snowflake-Connector')]


**Set options for snowflake connector. Ensure the role you use has privileges to create a stage on the schema being referred to**

In [0]:
sfOptions = {
  "sfURL" : "accountname.snowflakecomputing.com:443",
  "sfUser" : "username",
  "sfPassword" : "password",
  "sfRole" : "role",
  "sfSchema" : "schema",
  "sfDatabase" : "database",
  "sfWarehouse" : "warehouse"
    }


**Enable Query Pushdown  and fire query against snowflake**

In [11]:
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query",  "select 1 as mynum union select 2").load()
df.show()

+-----+
|MYNUM|
+-----+
|    1|
|    2|
+-----+



In [0]:
spark.stop()