In [1]:
import json
import pandas as pd
import matplotlib
import numpy as np
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt

from sklearn.preprocessing import  StandardScaler
from sklearn.feature_selection import SelectKBest

from sklearn import linear_model
from sklearn.neighbors import KNeighborsRegressor
from sklearn.ensemble import  RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import  GradientBoostingRegressor

import botocore.session as s
import boto3
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client
from botocore.exceptions import WaiterError
import operator

In [2]:
# https://aws.amazon.com/blogs/big-data/using-the-amazon-redshift-data-api-to-interact-from-an-amazon-sagemaker-jupyter-notebook/

session = boto3.session.Session()
region = session.region_name

bc_session = s.get_session()

session = boto3.Session(
        botocore_session=bc_session,
        region_name=region,
    )

client_redshift = session.client("redshift-data")
print("Data API client successfully loaded")

Data API client successfully loaded


In [3]:
waiter_name = 'DataAPIExecution'

delay=2
max_attempts=3

#Configure the waiter settings
waiter_config = {
  'version': 2,
  'waiters': {
    'DataAPIExecution': {
      'operation': 'DescribeStatement',
      'delay': delay,
      'maxAttempts': max_attempts,
      'acceptors': [
        {
          "matcher": "path",
          "expected": "FINISHED",
          "argument": "Status",
          "state": "success"
        },
        {
          "matcher": "pathAny",
          "expected": ["PICKED","STARTED","SUBMITTED"],
          "argument": "Status",
          "state": "retry"
        },
        {
          "matcher": "pathAny",
          "expected": ["FAILED","ABORTED"],
          "argument": "Status",
          "state": "failure"
        }
      ],
    },
  },
}

waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

In [18]:
query = 'select * from punk_db.public.tbl_punk limit 8;'

res = client_redshift.execute_statement(Database= 'punk_db', 
                                        DbUser='punk_user',
                                        Sql= query, 
                                        ClusterIdentifier= 'punk-redshift-cluster')
print("Redshift Data API execution  started ...")

id = res["Id"]
# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)

output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
ncols=len(output["ColumnMetadata"])
resultrows=output["Records"]

col_labels=[]
for i in range(ncols): col_labels.append(output["ColumnMetadata"][i]['label'])
                                              
records=[]
for i in range(nrows): records.append(resultrows[i])

df = pd.DataFrame(np.array(resultrows), columns=col_labels)

Redshift Data API execution  started ...
Done waiting to finish Data API.


In [20]:
new_df = df.copy()
def get_value(_dict):
    if 'stringValue' in _dict:
        return _dict['stringValue']
    elif 'longValue' in _dict:
        return _dict['longValue']
    return 'nan'

for i in range(len(new_df.columns)):
    new_df[col_labels[i]]=new_df[col_labels[i]].apply(get_value)

new_df.dtypes

id            int64
name         object
abv          object
ibu          object
target_fg    object
target_og    object
ebc          object
srm          object
ph           object
dtype: object