# Chapter 2 - Load data using Redshift Data API

## Contents
1. [Introduction](#Introduction)
1. [Create Table](#Create-Table)
1. [Load Data](#Load-Data)
1. [Bringing all together](#Bringing-all-together)
    1. [Prepare SQL statements](#Prepare-SQL-statements)
    1. [Create function](#Create-function)
---

# Introduction

This Jupyter notebook is created to showcase data ingestion using Redshift Data API. Please follow the instructions for each given cells.



### Parameters used

This notebook uses below parameters which are set by running below code.   

1. REDSHIFT_WORKGROUP: Redshift Serverless workgroup name.
1. S3_DATA_FILE: Source data file name including full path of your S3 Bucket for the source data to be loaded.

### Running the commands:
* Select your cell and hit Shift+Enter or Click on Run button from menu options.</br>
* Cell is finished running when * turns into a Number.
* In some cells you will need to replace <' string' >  with your parameters. Please make sure you replace brackets <> as well 


In [8]:
## Check the version of your Boto3 library is greater than 1.24.xx
pip show boto3 | grep -i version

Version: 1.26.32
Note: you may need to restart the kernel to use updated packages.


### Import Libraries and setup parameters

In [1]:
import boto3
import time
import pandas as pd
import numpy as np

session = boto3.session.Session()
region = session.region_name


REDSHIFT_WORKGROUP = '<replace with your serverless workgroup name>'
S3_DATA_FILE='s3://packt-serverless-ml-redshift/chapter2/orders.parquet'
print(region)

us-west-2


## Create Table

In [2]:
##Prepare create table statement
table_ddl = """
DROP TABLE IF EXISTS chapter2.orders CASCADE;

CREATE TABLE chapter2.orders
(o_orderkey     bigint NOT NULL,
o_custkey       bigint NOT NULL encode az64,
o_orderstatus   character(1) NOT NULL encode lzo,
o_totalprice    numeric(12,2) NOT NULL encode az64,
o_orderdate     date NOT NULL,
o_orderpriority character(15) NOT NULL encode lzo,
o_clerk         character(15) NOT NULL encode lzo,
o_shippriority  integer NOT NULL encode az64,
o_comment       character varying(79) NOT NULL encode lzo
) 
distkey(o_orderkey) compound sortkey(o_orderkey,o_orderdate);"""


In [6]:
##Connect to your Redshift Serverless and execute SQL statement table_ddl
client = boto3.client("redshift-data")
res = client.execute_statement(Database='dev', Sql=table_ddl,
                                   WorkgroupName=REDSHIFT_WORKGROUP)
##Capture identifier of the SQL statement
query_id = res["Id"]
print(query_id)

ebdddfde-74b0-41cb-acd3-c03ed144480f


In [7]:
##Check status of the Query
status_description = client.describe_statement(Id=query_id)
status = status_description["Status"]
print(status)    

FINISHED


## Load Data

##### Prepare the COPY command and be sure to print the command to check it is valid

In [8]:
load_data = f"""COPY chapter2.orders 
FROM '{S3_DATA_FILE}'
IAM_ROLE default
FORMAT AS PARQUET;"""
print(load_data)

COPY chapter2.orders 
FROM 's3://chapter2-load/orders.parquet'
IAM_ROLE default
FORMAT AS PARQUET;


In [9]:
## Run the COPY command and capture the statement ID
res = client.execute_statement(Database='dev', Sql=load_data,
                                WorkgroupName=REDSHIFT_WORKGROUP)
query_id = res["Id"]
print(query_id)

4b1db31e-a956-4f58-a768-9ca3199921d7


In [10]:
##Check status of the Query
status_description = client.describe_statement(Id=query_id)
status = status_description["Status"]
print(status) 

FINISHED


##### After the above query is completed (status=FINISHED), check the record count in the data

In [11]:
cnt = client.execute_statement(Database='dev', Sql='Select count(1) from chapter2.orders;',
                                WorkgroupName=REDSHIFT_WORKGROUP)
query_id = cnt["Id"]


In [12]:
##Print the count query output
results = client.get_statement_result(Id=query_id)
print(results.get('Records'))

[[{'longValue': 1500000}]]


# Bringing all together

### Prepare SQL statements

In [13]:
prep_sql = f"""
DROP TABLE IF EXISTS chapter2.orders CASCADE;

CREATE TABLE chapter2.orders
(o_orderkey     bigint NOT NULL,
o_custkey       bigint NOT NULL encode az64,
o_orderstatus   character(1) NOT NULL encode lzo,
o_totalprice    numeric(12,2) NOT NULL encode az64,
o_orderdate     date NOT NULL,
o_orderpriority character(15) NOT NULL encode lzo,
o_clerk         character(15) NOT NULL encode lzo,
o_shippriority  integer NOT NULL encode az64,
o_comment       character varying(79) NOT NULL encode lzo
) 
distkey(o_orderkey) compound sortkey(o_orderkey,o_orderdate);

COPY chapter2.orders 
FROM '{S3_DATA_FILE}'
IAM_ROLE default
FORMAT AS PARQUET;
"""

print(prep_sql)


DROP TABLE IF EXISTS chapter2.orders CASCADE;

CREATE TABLE chapter2.orders
(o_orderkey     bigint NOT NULL,
o_custkey       bigint NOT NULL encode az64,
o_orderstatus   character(1) NOT NULL encode lzo,
o_totalprice    numeric(12,2) NOT NULL encode az64,
o_orderdate     date NOT NULL,
o_orderpriority character(15) NOT NULL encode lzo,
o_clerk         character(15) NOT NULL encode lzo,
o_shippriority  integer NOT NULL encode az64,
o_comment       character varying(79) NOT NULL encode lzo
) 
distkey(o_orderkey) compound sortkey(o_orderkey,o_orderdate);

COPY chapter2.orders 
FROM 's3://chapter2-load/orders.parquet'
IAM_ROLE default
FORMAT AS PARQUET;



### Create function

In [14]:
def call_sql(prep_sql):
    client = boto3.client("redshift-data")
    qry_resp = client.execute_statement(Database='dev', Sql=prep_sql,
                                   WorkgroupName=REDSHIFT_WORKGROUP)
    stmnt_id = qry_resp["Id"]
    done = False
    while not done:
        time.sleep(3)
        desc_response = client.describe_statement(Id=stmnt_id)
        status = desc_response["Status"]
        if status == "FAILED":
            raise Exception('SQL query failed:' + stmnt_id + ": " + desc_response["Error"])
        elif status == "FINISHED":
            if desc_response['ResultRows']>0:
                get_results = client.get_statement_result(Id=stmnt_id)
                metadata=dict()
                column_labels = []
                for i in range(len(get_results["ColumnMetadata"])): column_labels.append(get_results["ColumnMetadata"][i]['label'])
                for i in range(len(get_results["ColumnMetadata"])): 
                    if (get_results["ColumnMetadata"][i]['typeName'])=='varchar':
                        typ='str'
                    elif ((get_results["ColumnMetadata"][i]['typeName'])=='int4' or (get_results["ColumnMetadata"][i]['typeName'])=='numeric') :
                        typ='float'
                    else:
                        typ = 'str'
                    metadata[get_results["ColumnMetadata"][i]['label']]=typ
                 
                
                records = []
                
                for record in get_results.get('Records'):
                    records.append([list(rec.values())[0] for rec in record])
                df = pd.DataFrame(np.array(records), columns=column_labels)
                df = df.astype(metadata)
                return df
            else:
                return stmnt_id

#### Call the function to run the SQL statements

In [15]:
execute_statement = call_sql(prep_sql)

In [16]:
## Check the data loaded in Orders table
get_result="""Select * from chapter2.orders limit 10;"""

In [17]:
execute_statement = call_sql(get_result)

In [18]:
execute_statement.head(10)

Unnamed: 0,o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment
0,740,43417,O,96016.31,1995-07-16,3-MEDIUM,Clerk#000000583,0.0,"courts haggle furiously across the final, regul"
1,1924,74677,O,258213.13,1996-09-07,4-NOT SPECIFIED,Clerk#000000823,0.0,of the ironic accounts. instructions near the...
2,2115,104101,O,160667.9,1998-05-23,4-NOT SPECIFIED,Clerk#000000101,0.0,odolites boost. carefully regular excuses cajo...
3,2151,57902,O,216383.85,1996-11-11,3-MEDIUM,Clerk#000000996,0.0,c requests. ironic platelets cajole across the...
4,2241,101276,F,241266.34,1993-05-11,1-URGENT,Clerk#000000081,0.0,y about the silent excuses. furiously ironic i...
5,2309,99505,O,219187.0,1995-09-04,5-LOW,Clerk#000000803,0.0,he carefully pending packages. fluffily stealt...
6,3234,13891,O,204288.87,1996-04-05,4-NOT SPECIFIED,Clerk#000000367,0.0,ents according to the dependencies will sleep ...
7,4294,48623,F,357833.85,1992-08-15,3-MEDIUM,Clerk#000000407,0.0,ng pinto beans breach. slyly express requests bo
8,4384,24016,F,90816.05,1992-07-13,1-URGENT,Clerk#000000192,0.0,onic platelets. furiously regular asymptotes a...
9,5059,41249,F,99783.76,1993-11-10,2-HIGH,Clerk#000000058,0.0,"latelets. final, regular accounts cajole furio..."
