1.1 Imports the required libraries to connect to Redshift from within a Jupyter notebook

In [1]:
import boto3
import json
import sqlalchemy as sa
from sqlalchemy.engine.url import URL

Step1.2 Parameters for the Redshift serverless endpoint that has been automatically provisioned using the Cloud Development Kit (CDK)

In [2]:
endpoint = 'endpoint'
workgroup_name = 'rs-qs-workgroup'
namespace_name = 'rs-qs-namespace'
secret_name = 'REDSHIFT_PASSWORD'
admin_username = 'admin'
admin_password = 'password'
db_name = 'dev'
port = 5439

Step1.3 Sets up the connection parameters. Gets the connection endpoint of Redshift Serverless and extracts password from Secrets Manager service.

In [3]:
redshift_client = boto3.client('redshift-serverless')
secretsmanager_client = boto3.client('secretsmanager')
endpoint = redshift_client.get_workgroup(workgroupName=workgroup_name)['workgroup']['endpoint']['address']
secret_value = secretsmanager_client.get_secret_value(
    SecretId=secret_name,
    )['SecretString']
admin_password = json.loads(secret_value)['password']

In [4]:
redshift_url = URL.create(
drivername='redshift+redshift_connector', 
host=endpoint, 
port=port, 
database=db_name, 
username=admin_username,
password=admin_password
)

Sets up the SQL extension of Jupyterlab to easily integrate with Redshift

In [5]:
%reload_ext sql
%config SqlMagic.displaylimit = 10
%config SqlMagic.displaycon = False
%sql $redshift_url

Shows how Redshift result set can easily be converted into a Pandas dataframe

In [6]:
result_set = %sql select current_user
user_df = result_set.DataFrame()
print(type(user_df))
user_df

Done.
<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,current_user
0,admin


Creates an external schema to establish connection between the Redshift cluster and the Kinesis data stream. The default IAM role of Redshift has been configured to have read permissions to kinesis datastreams. 

In [None]:
%%sql
CREATE EXTERNAL SCHEMA kinesis_schema
FROM KINESIS
IAM_ROLE default;

In [None]:
%%sql
CREATE MATERIALIZED VIEW order_stream_json AS
SELECT ApproximateArrivalTimestamp, JSON_PARSE(from_varbyte(Data, 'utf-8'))  order_json
FROM kinesis_schema.order_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

In [None]:
%%sql
REFRESH MATERIALIZED VIEW order_stream_json;

In [None]:
%%sql
SELECT * FROM order_stream_json;

In [None]:
%%sql
SELECT order_json.delivery_state::VARCHAR, order_json.origin_state::VARCHAR, count(1) FROM order_stream_json
group by order_json.delivery_state, order_json.origin_state
order by count(1) desc;

Creates a materialized view that parses the data within the order stream.

In [None]:
%%sql
CREATE MATERIALIZED VIEW order_stream AS
SELECT ApproximateArrivalTimestamp, 
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'consignmentid', true)::BIGINT as consignmentid,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'timestamp', true)::VARCHAR(50) as order_timestamp,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_address', true)::VARCHAR(100) as delivery_address,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_state', true)::VARCHAR(50) as delivery_state,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_address', true)::VARCHAR(100) as origin_address,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_state', true)::VARCHAR(50) as origin_state,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delay_probability', true)::VARCHAR(10) as delay_probability,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'days_to_deliver', true)::INT as days_to_deliver,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_distance', true)::FLOAT as delivery_distance,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'userid', true)::INT as userid,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'revenue', true)::FLOAT as revenue,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'cost', true)::FLOAT as cost
FROM kinesis_schema.order_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

Refreshes the data within the materialized view. This is where the actual data ingestion happens. Data gets loaded from the kinesis data stream into Amazon S3 without having to stage it first in S3.

In [None]:
%%sql
REFRESH MATERIALIZED VIEW order_stream;

In [14]:
%%sql
SELECT count(1) FROM order_stream limit 100;

Done.


count
172808


In [None]:
%%sql
select * from order_stream