# Load data into RedShift

**Note:** Please set kernel to `Python 3 (Data Science)`

After running this notebook, save this notebook and push the changes to CodeCommit. This will cause Continous Integration (CI) Pipeline to run and deploy the staging endpoint. 

Instructions to perform the git related operations are in the [**instructions.md**](instructions.md) and also shown at the end of this notebook.

---

## Overview of AWS services used in this notebook

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration so that you can start analyzing your data and putting it to use in minutes instead of months.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena is easy to use. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL. Most results are delivered within seconds. With Athena, there’s no need for complex ETL jobs to prepare your data for analysis. This makes it easy for anyone with SQL skills to quickly analyze large-scale datasets.

Athena is out-of-the-box integrated with AWS Glue Data Catalog, allowing you to create a unified metadata repository across various services, crawl data sources to discover schemas and populate your Catalog with new and modified table and partition definitions, and maintain schema versioning.

Using Amazon Redshift Spectrum, you can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum queries employ massive parallelism to execute very fast against large datasets. Much of the processing occurs in the Redshift Spectrum layer, and most of the data remains in Amazon S3. Multiple clusters can concurrently query the same dataset in Amazon S3 without the need to make copies of the data for each cluster.

## Methods to load data into RedShift
There are multiple methods to load data into RedShift:
- Copy command
- Insert command
- Via other AWS services like Glue 

This notebook uses the insert command where the data is stored in S3 and loaded into RedShift using Athena. The main purpose is to illustrate RedShift spectrum. 

---

## Introduction

In this notebook, you will create the Glue database and tables, referencing the CSV file that will be uploaded to S3. Thereafter, you will use RedShift Spectrum to query data from Glue while the data continues to reside in S3. At the end, you will insert data into RedShift using Athena. 

Note: The codes in this notebook uses the JDBC way to access RedShift while the Python code in the lambda function (lambda_redshift_dl.py) uses the new Amazon RedShift Data API to access RedShift. Both methods accomplish the task.

From the [blog post](https://aws.amazon.com/blogs/big-data/using-the-amazon-redshift-data-api-to-interact-with-amazon-redshift-clusters/) describing RedShift Data API:

As a data engineer or application developer, for some use cases, you want to interact with Amazon Redshift to load or query data with a simple API endpoint without having to manage persistent connections. With Amazon Redshift Data API, you can interact with Amazon Redshift without having to configure JDBC or ODBC. This makes it easier and more secure to work with Amazon Redshift and opens up new use cases.

---

## Data flow in this notebook (simplified)

![data](img/data.png)

---


### Variables
Variable name for secret in Secret Manager. RedShift, Athena and Glue information are stored in the secret.

In [None]:
secret_name='bankdm_redshift_login' 

### Install and import libraries
pyathena is used to connect to Athena while sqlalchemy is used to connect to RedShift.

In [None]:
!pip install -q SQLAlchemy==1.3.13
!pip install psycopg2-binary pyathena
!pip install -U pip
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from pyathena import connect
from botocore.exceptions import ClientError
import pandas as pd
import json
import boto3
import sagemaker

### Create client sessions


In [None]:
# Get region 
session = boto3.session.Session()
region_name = session.region_name

# Get SageMaker session & default S3 bucket
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

s3 = boto3.client('s3')
redshift = boto3.client('redshift')
secretsmanager = boto3.client('secretsmanager')

session = boto3.session.Session()
region = session.region_name

### Get credentials & connection information from Secret Manager

In [None]:
try:
    get_secret_value_response = secretsmanager.get_secret_value(
            SecretId=secret_name
        )
    secret_arn=get_secret_value_response['ARN']

except ClientError as e:
    print("Error retrieving secret. Error: " + e.response['Error']['Message'])
    
else:
    # Depending on whether the secret is a string or binary, one of these fields will be populated.
    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            
secret_json = json.loads(secret)
master_user_name = secret_json['username']
master_user_pw = secret_json['password']
redshift_port = secret_json['port']
redshift_cluster_identifier = secret_json['dbClusterIdentifier']
redshift_endpoint_address = secret_json['host']

database_name_redshift = secret_json['database_name_redshift']
database_name_glue = secret_json['database_name_glue']

schema_redshift = secret_json['schema_redshift']
schema_athena = secret_json['schema_athena']

table_name_glue = secret_json['table_name_glue']
table_name_redshift = secret_json['table_name_redshift']

# print(master_user_name)

### Copy data (bank-additional.csv) to S3

In [None]:
s3.upload_file('bank-additional/bank-additional-full.csv', bucket, 'bankdm/data/bank-additional.csv')


## Glue & Athena

### Create Glue database

In [None]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)
conn = connect(region_name=region_name, s3_staging_dir=s3_staging_dir)
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name_glue)
print(statement)

### Check if the database is created successfully

In [None]:
import pandas as pd
pd.read_sql(statement, conn)
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

### Create Glue table referencing the data in the CSV

The 'default' column name causes an error and the name is changed to 'defaulted' instead.

In [None]:
s3_bankdm_path = "s3://{}/bankdm/data/".format(bucket)
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         age int,
         job string,
         marital string,
         education string,
         defaulted string,
         housing string,
         loan string,
         contact string,
         month string,
         day_of_week string,
         duration int,
         campaign int,
         pdays int,
         previous int,
         poutcome string,
         emp_var_rate float,
         cons_price_idx float,
         cons_conf_idx float,
         euribor3m float,
         nr_employed int,
         y string
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n' 
LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(database_name_glue, table_name_glue, s3_bankdm_path)

print(statement)

### Check if the table is created successfully

In [None]:
import pandas as pd

pd.read_sql(statement, conn)
statement = "SHOW TABLES in {}".format(database_name_glue)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

### Test getting data from the table using Athena

In [None]:
statement = """SELECT * FROM {}.{}
""".format(
    database_name_glue, table_name_glue
)

print(statement)

In [None]:
df = pd.read_sql(statement, conn)
df.head(10)

## RedShift

### Connect to RedShift
#### Before connecting, ensure that the cluster is available

In [None]:
import time

response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)
cluster_status = response['Clusters'][0]['ClusterStatus']
print(cluster_status)

while cluster_status != 'available':
    time.sleep(10)
    response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)
    cluster_status = response['Clusters'][0]['ClusterStatus']
    print(cluster_status)

#### Also ensure the `ApplyStatus` is `in-sync`

In [None]:
iam_role = response['Clusters'][0]['IamRoles'][0]['IamRoleArn']

response['Clusters'][0]['IamRoles']

In [None]:
# You can check the RedShift endpoint and IAM role by uncommenting the code below
# print('Redshift endpoint: {}'.format(redshift_endpoint_address))
# print('IAM Role: {}'.format(iam_role))

#### Once the checks are done, connect to RedShift

In [None]:
engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(master_user_name, master_user_pw, redshift_endpoint_address, redshift_port, database_name_redshift))
session = sessionmaker()
session.configure(bind=engine)


### Create RedShift schema

In [None]:
statement = """CREATE SCHEMA IF NOT EXISTS {}""".format(schema_redshift)

s = session()
s.execute(statement)
s.commit()

### Register Glue database with Redshift Spectrum to access the data directly in S3

With just one command, you can query the S3 data lake from Amazon RedShift without moving any data into the data warehouse. This is the power of Redshift Spectrum. 

Note the `FROM DATA CATALOG` below.  This is pulling the table and schema information from the Glue Data Catalog (ie. Hive Metastore).

In [None]:
statement = """
CREATE EXTERNAL SCHEMA IF NOT EXISTS {} FROM DATA CATALOG 
    DATABASE '{}' 
    IAM_ROLE '{}'
    REGION '{}'
    CREATE EXTERNAL DATABASE IF NOT EXISTS
""".format(schema_athena, database_name_glue, iam_role, region_name)

print(statement)
s = session()
s.execute(statement)
s.commit()

### Run Sample Query on S3 Data through Redshift Spectrum

In [None]:
statement = """
SELECT *
    FROM {}.{}
""".format(schema_athena, table_name_glue)

print(statement)

In [None]:
df = pd.read_sql_query(statement, engine)
df.head(5)

### Create table in RedShift

In [None]:
statement = """
CREATE TABLE IF NOT EXISTS {}.{}( 
     age integer,
     job text,
     marital text,
     education text,
     defaulted text,
     housing text,
     loan text,
     contact text,
     month text,
     day_of_week text,
     duration integer,
     campaign integer,
     pdays integer,
     previous integer,
     poutcome text,
     emp_var_rate decimal,
     cons_price_idx decimal,
     cons_conf_idx decimal,
     euribor3m decimal,
     nr_employed integer,
     y text
     )
""".format(schema_redshift, table_name_redshift)

print(statement)
s = session()
s.execute(statement)
s.commit()


### Insert data from S3 to RedShift using Athena

In [None]:
statement = """
INSERT INTO {}.{}
    SELECT
        *
    FROM
        {}.{};             

""".format(schema_redshift, table_name_redshift, schema_athena, table_name_glue)
print(statement)
s = session()
s.execute(statement)
s.commit()        
print("Done.")

### Test getting data from RedShift

In [None]:
statement = """
SELECT *
    FROM {}.{}
""".format(schema_redshift, table_name_redshift)

print(statement)

In [None]:
df = pd.read_sql_query(statement, engine)
df.head(10)

---

## Next steps

In this notebook, you have created the Glue table referring the data from the CSV file stored in S3. After that, you inserted data into RedShift using a SQL statement to select all the rows. This is one method of loading data into RedShift.

> 🔥Note: After running this notebook, you can save this notebook, commit and push the changes to CodeCommit. This will force the SageMaker Pipeline to run after a short while. The SageMaker Pipeline must run successfully before you proceed with the next notebook! This takes around 12 minutes. 🔥

Instructions to perform the git related operations are shown below. These are taken from **instructions.md**


- On the left side, click on the second icon. Scroll your mouse to the right of `Changed`, select the `+` to track all files. Repeat the same for `Untracked`.

![studio](img/studio16.png)

- Your window should look like this where there are no files under `Changed` and `Untracked`. The number of files shown may differ from yours.
- Enter a commit message (commit in the screenshot) and click `Commit`.

![studio](img/studio17.png)

- Enter your name, email and click `OK`.

![studio](img/studio18.png)

- Click on the icon with an up arrow to push the changes. This icon is above the green line in the screenshot.

![studio](img/studio19.png)

- You will get the following message.

![studio](img/studio20.png)

- On the right side of the screen, click on `Pipelines` tab and double click on the Pipelines shown (`BankDM-p-7cj6qm9kexri` in the screenshot).

![studio](img/studio21.png)

- The pipeline should automatically run after a short while as you pushed in new codes.

![studio](img/studio22-2.png)