## Setting Up Your Snowflake Environment

Let’s make sure you have a Snowflake environment ready to go. If you’re new to Snowflake, follow these steps to get started:
Create a Snowflake account

If you do not have a Snowflake account, there is a 30-day free trial which includes (at the time of writing) $400 in free usage. This should be enough for us to test a few sample pipelines!
- 1.Visit Snowflake Trial and sign up for a free account.
- 2.For our purposes, we will select the Standard edition instead of the Enterprise edition to save on credit costs in the future if you continue to use Snowflake!
- 3.Choose your preferred cloud provider (AWS, Azure, or GCP). The examples in this tutorial will use AWS.
- 4.The automatically chosen data center should be the one nearest to you geographically and should provide the best experience.
- 5.After account creation, login to the Snowflake Web UI.

#### Navigating the UI
Once inside, you'll see:
- Worksheet: Where you can run SQL queries.
- Databases: View and manage databases and schemas.
- Warehouses: Monitor and control compute resources.
- History: Track previously run queries and tasks\
If you want more advice on using Snowflake, follow this Snowflake Foundations course.

### Creating necessary resources

This step will cover the creation of a virtual warehouse, database, and schema. Run the following SQL in your worksheet to set up your environment:

In [None]:
-- set the user role appropriately
USE ROLE accountadmin;

-- Create a virtual warehouse
CREATE WAREHOUSE datacamp_pipeline_wh WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 60;

-- Set our active working warehouse
USE WAREHOUSE datacamp_pipeline_wh;

-- create our Datacamp Sample Database
CREATE OR REPLACE DATABASE datacamp_sample_db;

-- create the Datacamp (Sample) Raw Schema
CREATE OR REPLACE SCHEMA datacamp_sample_db.raw_schema;

### Loading Data Into Snowflake

As we discuss in our article about Data Ingestion in Snowflake, Snowflake supports both batch (bulk) and continuous loading. Bulk loading is designed to upload large chunks of data to your table at a time. This is generally done at scheduled intervals, or automated with Snowpipe’s event-based management. 
Continuous (stream) loading can be accomplished using things like Snowpipe Streaming. To ingest data, we must create data integrations (stages) with our cloud provider and load the data using COPY INTO.

### Creating S3 stage and integration

Data ingestion is the first step in any pipeline. In this step, we’ll cover the basics of creating a storage integration and creating a stage. To do so, we should imagine we have already connected your Snowflake to AWS with a Snowflake IAM user. If not, refer to the Snowflake  AWS configuration guide in the documentation.

In [None]:
-- set Role Context
USE ROLE accountadmin;

-- set Warehouse Context
USE WAREHOUSE datacamp_pipeline_wh;

-- this creates the storage integration 
-- The STORAGE_AWS_ROLE_ARN will come from the AWS configuration guide steps
-- We will allow access to all locations and block any with sensitive data
CREATE STORAGE INTEGRATION datacamp_aws
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('*')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket/private/path/’);

-- Create file format, this is referenced by your stage to easily determine presets for your files
CREATE FILE FORMAT my_csv_format 
TYPE = 'CSV' 
FIELD_OPTIONALLY_ENCLOSED_BY = '"';

USE SCHEMA datacamp_sample_db.raw_schema;

--This stage is what ultimately connects your AWS files to your database schema
CREATE STAGE datacamp_s3_stage
  STORAGE_INTEGRATION = datacamp_aws
  URL = 's3://bucket1/path1/'
  FILE_FORMAT = my_csv_format;

At the end, you should have successfully created your Snowflake integration with AWS and created the stage. If you want to review AWS basics, the following article is a great introduction to AWS.

### Bulk loading with COPY INTO

The command COPY INTO is the common method for uploading data to Snowflake. We can either use a locally uploaded file in Snowflake, or use our cloud connections. In the following example, we will use our stage to copy a CSV file.

In [None]:
-- Load data using COPY INTO
-- The following will attempt to load all file in the bucket
-- To specify the particular files you can use the FILES or PATTERN settings
COPY INTO datacamp_sample_db.raw_schema.aws_first_upload
FROM @my_stage
FILE_FORMAT = (FORMAT_NAME = 'my_csv_format');

### Automating Loading with Snowpipe

Manually loading every time we get a new file is inefficient and time-consuming as velocity increases. Instead, we can use Snowpipe to load files from our stage. Then, we combine our AWS event notifications to our Snowflake IAM user to trigger the Snowpipe every time. 


In [None]:
-- Let’ s first create our Snowpipe
CREATE PIPE my_pipe AS
COPY INTO datacamp_sample_db.raw_schema.aws_first_upload
FROM @datacamp_s3_stage
FILE_FORMAT = (FORMAT_NAME = 'my_csv_format');

Next, we can use something like Amazon SQS to send notifications to our IAM user whenever our target bucket receives a file.
Finally, we can monitor our Snowpipe using:

In [None]:
SELECT * FROM INFORMATION_SCHEMA.PIPE_USAGE_HISTORY;

Now you don’t have to worry about uploading data every time a file arrives in your S3 bucket! These cloud notifications will trigger your Snowpipe to load data. 

### Transforming Data with Streams and Tasks

Snowflake’s Streams and Tasks are built-in tools for tracking and transforming changing data. Streams focus on tracking changes to tables where Tasks are utilized for automating the execution of SQL statements.

### Introduction to Streams
A Stream captures Change Data Capture (CDC) information—i.e., inserts, updates, and deletes—in a table since the last time it was queried. These are often in the form of Data Manipulation Language (DML) queries. This allows us to see the changes to a table between two different query times.

In [None]:
-- Create a stream on a table
CREATE OR REPLACE STREAM my_stream ON TABLE datacamp_sample_db.raw_schema.aws_first_upload;

In [None]:
-- You can then query the stream to get only the changes:
SELECT * FROM my_stream WHERE METADATA$ACTION = 'INSERT';

### Introduction to Tasks
Tasks allow you to automate SQL execution on a schedule or event trigger. Tasks can either call upon SQL, Javascript, Python, and so on, in order to automate procedures on our tables and pipelines.

In [None]:
-- Create a task to process changes and filter data
-- We want changes that occur in the US, for example
-- Schedule to run this task every 5 minutes from our stage
CREATE OR REPLACE TASK process_changes_task
  WAREHOUSE = datacamp_sample_db 
SCHEDULE = '5 MINUTE'
AS
  INSERT INTO raw_schema.aws_processed_data
  SELECT * FROM my_stream WHERE location = ‘USA’;

-- This will allow us to test the task
EXECUTE TASK process_changes_task;

### Combining Streams and Tasks
This is the foundation for automated transformation pipelines. As you saw above, Tasks can use Streams to pull in new data to a different table. A general process may look like the following:

- Create a Stream in order to monitor changes to a data table
- Create a Task that pulls data from that stream on a scheduled basis
- With each data pull, transform the data to our desired format
- Load that data into our target table

In this way, we have created parts of a data pipeline. By combining Streams and Tasks, we are able to perform transformations only on newly loaded data. With only Tasks, we would potentially have to process the entirety of our raw tables each time we wanted new data, this can be costly and inefficient use of compute power.

## Building an End-to-End Data Pipeline

Let’s walk through a simple pipeline use case: ingesting order data, tracking changes, and transforming it for analytics. Since we covered how to execute each step independently, this section covers what the steps might look like together. 

#### Step 1: Create cloud storage integration and stage

First, we have to connect our Snowflake ecosystem to our cloud storage. We used AWS as an example. In Snowflake, we used the CREATE STORAGE INTEGRATION and  CREATE STAGE commands to build the connection between our AWS and our database.

In [None]:
-- set Role Context
USE ROLE accountadmin;

-- set Warehouse Context
USE WAREHOUSE datacamp_pipeline_wh;

-- this creates the storage integration 
CREATE STORAGE INTEGRATION datacamp_aws
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('*')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket/private/path/’);

-- Create file format, this is referenced by your stage to easily determine presets for your files
CREATE FILE FORMAT my_csv_format 
TYPE = 'CSV' 
FIELD_OPTIONALLY_ENCLOSED_BY = '"';

USE SCHEMA datacamp_sample_db.raw_schema;

--This stage is what ultimately connects your AWS files to your database schema
CREATE STAGE datacamp_s3_stage
  STORAGE_INTEGRATION = datacamp_aws
  URL = 's3://bucket1/path1/'
  FILE_FORMAT = my_csv_format;

#### Step 2: Ingest data with Snowpipe

In this second step, we create a Snowpipe that will run the COPY INTO command into our raw data tables. 

In [None]:
-- Let’ s first create our SnowpipeCREATE PIPE my_pipe AS

COPY INTO datacamp_sample_db.raw_schema.aws_first_upload

FROM @datacamp_s3_stage

FILE_FORMAT = (FORMAT_NAME = 'my_csv_format');

#### Step 3: Use Streams to capture changes

We then use a stream on this raw data table in order to track changes.

In [None]:
-- Create a stream on a table
CREATE OR REPLACE STREAM my_stream ON TABLE datacamp_sample_db.raw_schema.aws_first_upload;

#### Step 4: Automate transformations with Tasks

Next, we create a Task that transforms and loads this newly loaded data to our processed version of the table. This will trigger on a schedule to check for new data. If there is no new data, the Task won’t run. 

In [None]:
-- Create a task to process changes and filter data
-- We want changes that occur in the US, for example
-- Schedule to run this task every 5 minutes from our stage
CREATE OR REPLACE TASK process_changes_task
  WAREHOUSE = datacamp_sample_db 
SCHEDULE = '5 MINUTE'
AS
  INSERT INTO raw_schema.aws_processed_data
  SELECT * FROM my_stream WHERE location = ‘USA’;

-- This will allow us to test the task
EXECUTE TASK process_changes_task;

#### Step 5: Monitor and manage

Finally, we want to make sure our pipelines are working as intended. Make sure to use the SHOW PIPES and SHOW TASKS commands to verify these are properly set up and running. Then use metadata tables from INFORMATION_SCHEMA to track data loading performance and errors.