# Connecting Saturn Cloud to Snowflake


This notebook shows how to connect to a Snowflake database and do large data manipulations that would require distributed computing using dask. We will use the NYC Taxi dataset hosted in a Saturn Cloud Snowflake database.

First, we import the necessary libraries:

In [43]:
import os
import snowflake.connector
import pandas as pd

Next, we connect to the Snowflake database using the Snowflake connector module. Here we are loading the credentials as environmental variables using the Saturn Cloud credential manager, however you could load them in other ways too. Be sure not to save them as plaintext in unsafe places!

In [41]:
conn_info = {
    "account": os.environ["EXAMPLE_SNOWFLAKE_ACCOUNT"],
    "user": os.environ["EXAMPLE_SNOWFLAKE_USER"],
    "password": os.environ["EXAMPLE_SNOWFLAKE_PASSWORD"],
    "database": os.environ["TAXI_DATABASE"],
}

conn = snowflake.connector.connect(**conn_info)

We can run queries directly on a Snowflake database and load them into a Pandas DataFrame. In the following chunk we get the data on which days had any taxi rides in it

In [None]:
dates_query = """
SELECT
    DISTINCT(DATE(pickup_datetime)) as date
FROM taxi_yellow
WHERE
    pickup_datetime BETWEEN '2019-01-01' and '2019-01-31'
"""
dates = pd.read_sql(dates_query, conn)["DATE"].tolist()

print(dates[0:5])

<hr>

## Querying Snowflake and running complex computations with Dask

Sometimes Snowflake queries return so much data that the result wouldn't fit on a single machine. In these cases Dask can be used to store and manipulate data in a distributed way.

To use Dask with snowflake, first we import the required modules and connect to the Dask cluster on Saturn Cloud. _For this code to run you need to start the Dask cluster from the project page of Saturn Cloud._

In [45]:
import dask.dataframe as dd
import dask
from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster()
client = Client(cluster)

[2021-02-25 21:45:46] INFO - dask-saturn | Cluster is ready
[2021-02-25 21:45:46] INFO - dask-saturn | Registering default plugins
[2021-02-25 21:45:46] INFO - dask-saturn | {'tcp://10.0.13.86:36207': {'status': 'repeat'}, 'tcp://10.0.5.136:45745': {'status': 'repeat'}, 'tcp://10.0.6.177:39553': {'status': 'repeat'}}


Next, we define a function that will query a small part of the data. Here we query information about a single day of taxi rides. We will run this query for each day separately and using all the Dask workers to run the queries and store the results. The `@dask.delayed` indicates that this function will be run over the Dask cluster.

In [26]:
@dask.delayed
def load_from_snowflake(day):
    with snowflake.connector.connect(**conn_info) as conn:
        query = """
        SELECT *
        FROM taxi_yellow
        WHERE
            date(pickup_datetime) = '{day}'
        """
        df = pd.read_sql(query.format(day=day), conn)
        # some days have no values for congestion_surcharge, this line ensures
        # that the missing data is properly filled
        df.CONGESTION_SURCHARGE = df.CONGESTION_SURCHARGE.astype("float64")
        return df

We create a list of all the results from running this query in a distributed way over all the days. As you can see from the `delayed_obs[:5]` call, these aren't Pandas dataframes that are returned, they are Dask objects. The queries haven't actually be run yet, since Dask is lazy they won't be run until they are needed. The list of delayed observations can be turned into a single Dask Dataframe using `.from_delayed()`. A Dask DataFrame performs just liked a Pandas DataFrame, they can use similar function calls and share a similar syntax, however the Dask DataFrame is actually a collection of Pandas dataframes all distributed across a Dask cluster.

In [46]:
delayed_obs = [load_from_snowflake(day) for day in dates]
delayed_obs[:5]

dask_data = dd.from_delayed(delayed_obs)

NameError: name 'load_from_snowflake' is not defined

We can see the contents of the DataFrame by using the same `.head()` call on the Dask DataFrame as we would on the Pandas DataFrame.

In [35]:
dask_data.head()

Unnamed: 0,CSV_FILENAME,VENDORID,PICKUP_DATETIME,DROPOFF_DATETIME,PASSENGER_COUNT,PICKUP_LONGITUDE,PICKUP_LATITUDE,TRIP_DISTANCE,RATECODEID,STORE_AND_FWD_FLAG,...,DROPOFF_TAXIZONE_ID,PAYMENT_TYPE,FARE_AMOUNT,EXTRA,MTA_TAX,TIP_AMOUNT,TOLLS_AMOUNT,IMPROVEMENT_SURCHARGE,TOTAL_AMOUNT,CONGESTION_SURCHARGE
0,trip data/yellow_tripdata_2019-01.csv,2,2019-01-21 00:01:16,2019-01-21 00:05:15,3,,,0.82,1,N,...,229,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,
1,trip data/yellow_tripdata_2019-01.csv,2,2019-01-21 00:03:49,2019-01-21 00:05:07,1,,,0.21,1,N,...,239,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,
2,trip data/yellow_tripdata_2019-01.csv,2,2019-01-21 00:00:30,2019-01-21 00:24:07,1,,,5.13,1,N,...,37,2,20.0,0.5,0.5,0.0,0.0,0.3,21.3,
3,trip data/yellow_tripdata_2019-01.csv,2,2019-01-21 00:00:29,2019-01-21 00:05:11,1,,,1.36,1,N,...,137,1,6.0,0.5,0.5,1.46,0.0,0.3,10.71,
4,trip data/yellow_tripdata_2019-01.csv,2,2019-01-21 00:01:08,2019-01-21 00:05:28,1,,,1.09,1,N,...,263,1,5.5,0.5,0.5,1.18,0.0,0.3,7.98,


To actually cause the lazy computations to run, such as when finding the sum of a column in Pandas, for Dask we need to end with `.computed()`

In [40]:
dask_data['TOLLS_AMOUNT'].sum().compute()

2430011.1799999997

<hr>
As you can see, by using Dask we are able to store data across multiple machines and run distributed calculations, all while using the same syntax as Pandas. Depending on the size and type of data you are working with it may make more sense to either use Pandas directly or Dask instead!