# Work with JSON in Redshift

In [1]:
import json
import textwrap
from datetime import datetime
from pathlib import Path

import boto3
import pandas as pd
import awswrangler as wr

import pylib.api as pylib

In [2]:
dir_here = Path.cwd().absolute()
path_config_serverless = dir_here / "config-serverless.json"
config_serverless = pylib.Config.load(path_config_serverless)

# create boto session
boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
aws_account_id = boto_ses.client("sts").get_caller_identity()["Account"]
aws_region = boto_ses.region_name

# create redshift connection
conn = pylib.create_connect_for_serverless_using_iam(
    boto_ses=boto_ses,
    workgroup_name=config_serverless.workgroup,
)

pylib.test_connection(conn)


Test connection by running a query: SELECT 64;
Result: 64
Success!


In [3]:
cursor = conn.cursor()

In [4]:
TABLE_NAME = "json_test"

def create_table():
    sql = textwrap.dedent(
        f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME}(
            id VARCHAR(255) NOT NULL,
            create_at TIMESTAMP NOT NULL,
            data SUPER NOT NULL,
            PRIMARY KEY (id, create_at)
        )
        distkey(id)
        sortkey(create_at);
        """
    )
    cursor.execute(sql)
    conn.commit()


def drop_table():
    sql = textwrap.dedent(
        f"""
        DROP TABLE IF EXISTS {TABLE_NAME};
        """
    )
    cursor.execute(sql)
    conn.commit()


def delete_table():
    sql = textwrap.dedent(
        f"""
        DELETE FROM {TABLE_NAME};
        """
    )
    cursor.execute(sql)
    conn.commit()


create_table()
# drop_table()

In [5]:
bucket = f"{aws_account_id}-{aws_region}-data"
s3_dir_uri = f"s3://{bucket}/project/redshift-serverless-poc/"

In [20]:
def get_utc_now():
    return datetime.utcnow()


def load_data():
    df = pd.DataFrame(
        {
            "id": ["id-1"],
            "create_at": [get_utc_now()],
            # just use dictionary to represent JSON object
            "data": [
                {
                    "name": "Alice",
                    "age": 25,
                    "tags": ["cool", "tall", "smart", "beauty"],
                },
            ],
        }
    )
    # awswrangler will dump the data to parquet file, parquet is schema self-contained format
    wr.redshift.copy(
        df=df,
        path=s3_dir_uri,
        con=conn,
        schema="public",
        table=TABLE_NAME,
        mode="append",  # append, overwrite or upsert.
        boto3_session=boto_ses,
        primary_keys=["id"],
        serialize_to_json=True, # <--- add this option if you have json field and need to load to SUPER data type column
    )

delete_table()
load_data()

In [21]:
def test_query(sql):
    cursor.execute(sql)
    for row in cursor:
        print(row)

In [None]:
[Querying semistructured data](https://docs.aws.amazon.com/redshift/latest/dg/query-super.html#unnest)

In [22]:
# 根据 SUPER Type https://docs.aws.amazon.com/redshift/latest/dg/r_SUPER_type.html 的定义
# SUPER 这个 field 里的值根据你存的数据的不同可能是不同的值. 在我们这个例子里, 由于 Parquet 会把数据存为复杂 schema 的对象,
# 所以虽然你看起来是一个 JSON 编码后的字符串, 但是它实际上是一个对象. 在下面的例子中我们可以验证.
test_query("SELECT * FROM json_test;")

['id-1', datetime.datetime(2024, 1, 29, 21, 34, 20, 954125), '{"age":25,"name":"Alice","tags":["cool","tall","smart","beauty"]}']


In [36]:
# data column 是一个对象
test_query("SELECT JSON_TYPEOF(data) FROM json_test;")

['object']


In [23]:
test_query("SELECT id, data.age FROM json_test;")

['id-1', '25']


In [24]:
test_query("SELECT id, data.name FROM json_test;")

['id-1', '"Alice"']


In [25]:
test_query("SELECT id, data.tags FROM json_test;")

['id-1', '["cool","tall","smart","beauty"]']


In [26]:
test_query("SELECT id, data.tags[0] FROM json_test;")

['id-1', '"cool"']


In [27]:
test_query("SELECT id, data.tags[2] FROM json_test;")

['id-1', '"smart"']


In [28]:
test_query("SELECT id, JSON_SERIALIZE(data) FROM json_test;")

['id-1', '{"age":25,"name":"Alice","tags":["cool","tall","smart","beauty"]}']


In [34]:
conn.rollback()
# test_query("SELECT id, JSON_SERIALIZE(data)[0] FROM json_test;")

In [None]:
-- SELECT id, JSON_ARRAY_LENGTH(data.tags) FROM json_test;
SELECT * FROM json_test;
SELECT id, data FROM json_test;
SELECT id, data.age FROM json_test;
SELECT id, data.name FROM json_test;
SELECT id, data.tags FROM json_test;
SELECT id, data.tags[0] FROM json_test;
SELECT id, data.tags[2] FROM json_test;
SELECT id, JSON_SERIALIZE(data) FROM json_test;
SELECT id, JSON_EXTRACT_PATH_TEXT(data, "age") FROM json_test;


In [50]:
import contextlib

@contextlib.contextmanager
def auto_commit(connect):
    try:
        yield connect
        connect.commit()
    except:
        connect.rollback()

with auto_commit(conn):
    test_query("SELECT id, JSON_SERIALIZE(data)[0] FROM json_test;")
# conn.rollback()
# cursor = conn.cursor()
with auto_commit(conn):
    test_query("SELECT id, data.tags[2] FROM json_test;")
# try:
#     test_query("SELECT id, JSON_SERIALIZE(data)[0] FROM json_test;")
# except:
#     conn.rollback()
#     test_query("SELECT id, data.tags[2] FROM json_test;")

['id-1', '"smart"']


  cursor.execute(sql)
