# Pythonic Redshift API

The ``simple_aws_redshift`` library provides a Pythonic interface for working with AWS Redshift resources. It simplifies the original boto3 API by providing intuitive data models, property-based access patterns, and comprehensive type hints.


Quick Start
------------------------------------------------------------------------------
Import the library and access all public APIs:

In [1]:
import simple_aws_redshift.api as rs

In [2]:
import boto3

# Create boto3 clients
redshift_client = boto3.client("redshift")
redshift_serverless_client = boto3.client("redshift-serverless")
redshift_data_client = boto3.client("redshift-data")

## Working with Redshift Clusters

### List All Clusters

In [3]:
clusters = rs.redshift.list_redshift_clusters(redshift_client)

for cluster in clusters:
    print(f"{cluster.cluster_identifier = }")
    print(f"{cluster.endpoint_address = }")
    print(f"{cluster.endpoint_port = }")

### Get Specific Cluster


In [4]:
cluster = rs.redshift.get_redshift_cluster(
    redshift_client,
    cluster_identifier="my-cluster",
)
print(f"{cluster.cluster_identifier = }")
print(f"{cluster.endpoint_address = }")
print(f"{cluster.endpoint_port = }")
print(f"{cluster.is_available = }")
print(f"{cluster.is_creating = }")
print(f"{cluster.is_deleting = }")
print(f"{cluster.is_modifying = }")
print(f"{cluster.is_paused = }")
print(f"{cluster.is_rebooting = }")
print(f"{cluster.is_resizing = }")

## Working with Redshift Serverless

### List All Namespaces

In [None]:
# List all Redshift Serverless namespaces
namespaces = rs.redshift_serverless.list_namespaces(redshift_serverless_client)

# Iterate through namespaces
for namespace in namespaces:
    print(f"{namespace.namespace_name = }")
    print(f"{namespace.status = }")
    print(f"{namespace.is_available = }")

### Get Specific Namespace

In [None]:
namespace = rs.redshift_serverless.get_namespace(
    redshift_serverless_client,
    namespace_name="simple-aws-redshift-dev",
)
print(f"{namespace.namespace_name = }")
print(f"{namespace.db_name = }")
print(f"{namespace.status = }")
print(f"{namespace.is_available = }")

### Delete Namespace

In [None]:
deleted_namespace = rs.redshift_serverless.delete_namespace(
    redshift_serverless_client,
    namespace_name="invalid-namespace",
)

### List All Workgroups

In [None]:
# List all Redshift Serverless workgroups
workgroups = rs.redshift_serverless.list_workgroups(redshift_serverless_client)

# Iterate through workgroups
for workgroup in workgroups:
    print(f"{workgroup.workgroup_name = }")
    print(f"{workgroup.status = }")
    print(f"{workgroup.namespace_name = }")
    print(f"{workgroup.is_available = }")

### Get Specific Workgroup

In [None]:
workgroup = rs.redshift_serverless.get_workgroup(
    redshift_serverless_client,
    workgroup_name="simple-aws-redshift-dev",
)
print(f"{workgroup.workgroup_name = }")
print(f"{workgroup.status = }")
print(f"{workgroup.namespace_name = }")
print(f"{workgroup.is_available = }")

### Delete Workgroup

In [None]:
deleted_workgroup = rs.redshift_serverless.delete_workgroup(
    redshift_serverless_client,
    workgroup_name="invalid-workgroup"
)

## Working with Redshift Database Connection

### Create Redshift Serverless Connection Parameters

In [None]:
connection_params = rs.RedshiftServerlessConnectionParams.new(
    redshift_serverless_client=redshift_serverless_client,
    namespace_name="simple-aws-redshift-dev",
    workgroup_name="simple-aws-redshift-dev",
)

### Use Redshift Connector

The [Redshift connector](https://github.com/aws/amazon-redshift-python-driver) is an AWS-maintained [DB API 2.0 driver](https://peps.python.org/pep-0249/) specifically designed for AWS Redshift databases. While it provides a reliable and official connection interface, it lacks advanced features like ORM
  capabilities, relationship mapping, and query builders. This means developers must write most database operations using raw SQL commands, which can be verbose and require more manual effort for complex database
  interactions.

In [None]:
redshift_connection = connection_params.get_connection(timeout=5)
cursor = redshift_connection.cursor()
sql = "SELECT 1;"
cursor.execute(sql)
rows = cursor.fetchall()
print(rows)
print("Redshift connection is working!")
redshift_connection.close()

### Create Sqlalchemy Engine

[SQLAlchemy](https://www.sqlalchemy.org/) is the de facto standard for operating relational databases in Python, providing powerful ORM capabilities and database abstraction layers. Since AWS Redshift is built on top of PostgreSQL, it maintains compatibility with most PostgreSQL APIs and can leverage existing PostgreSQL tooling. However, the official [sqlalchemy-redshift](https://github.com/sqlalchemy-redshift/sqlalchemy-redshift) dialect is no longer maintained since April 2023 after its core AWS developer left, and its older versions force the use of SQLAlchemy < 2.0.0, which conflicts with the modern SQLAlchemy 2.0.0 standard - therefore, we built our own thin dialect layer to make SQLAlchemy work seamlessly with Redshift using the latest standards.

**Limitations**:

- **Table Creation Limitation**: Since we have not implemented a full Redshift dialect, it’s not possible to use SQLAlchemy to create tables. Redshift tables have special attributes such as Distribution Keys and Sort Keys, which SQLAlchemy does not understand. Therefore, you cannot use SQLAlchemy’s metadata-based create_all() functionality. You must write raw ``CREATE TABLE`` statements manually. However, once the tables are created, you can still use SQLAlchemy’s ORM and Table objects to write queries, which remains an elegant experience.
- **Metadata Reflection Issue**: Because SQLAlchemy assumes the backend is standard PostgreSQL, using the MetaData.reflect() function to introspect and reconstruct database DDL objects in memory doesn’t work. Redshift’s PostgreSQL-compatible schema differs from the official PostgreSQL standard, and the reflection functionality cannot accurately interpret those differences.
- **Special Syntax Support**: For Redshift-specific syntax such as the COPY command, COPY FROM S3, and UNLOAD, you must write raw SQL — there is no ORM-level abstraction. However, you can still use SQLAlchemy’s transaction context manager to manage the transactional scope of these operations.
- **Data Type Limitation**: Redshift-specific data types are not directly supported in the ORM. You need to either use generic data types or handle special types through raw SQL statements.

In [None]:
import sqlalchemy as sa

sqlalchemy_engine = connection_params.get_engine()
with sqlalchemy_engine.connect() as conn:
    sql = "SELECT 1;"
    rows = conn.execute(sa.text(sql)).fetchall()
    print(rows)
    print("Sqlalchemy engine is working!")


## Working with Redshift Data API

### Run SQL Queries

The original redshift data API execute_statement method is an async call. We provide an elegant way to run SQL on redshift and get results into Dataframe compatible  format elegantly


In [3]:
# Run SQL query and wait it to finish
sql_cmd = rs.redshift_data_api.SqlCommand(
    redshift_data_api_client=redshift_data_client,
    sql="SELECT 1 as value;",
    workgroup_name="simple-aws-redshift-dev",
    database="dev",
    delay=1,
    timeout=10,
    verbose=True,
    raises_on_error=True,
)
print("")
sql_cmd.run()
sql_cmd.result.vdf.show()


start waiter, polling every 1 seconds, timeout in 10 seconds.
on 1 th attempt, elapsed 1 seconds, remain 9 seconds ...(1, 1)
+---------+
|   value |
|---------|
|       1 |
+---------+


In [6]:
from rich import print as rprint

rprint(sql_cmd.result.vdf.columns)
rprint(sql_cmd.result.vdf.rows)