In [None]:
import boto3
import time
import json
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime

# Get Redshift cluster information
redshift = boto3.client('redshift')
clusters = redshift.describe_clusters()

# Print cluster endpoint and security group information
for cluster in clusters['Clusters']:
    print(f"Cluster Identifier: {cluster['ClusterIdentifier']}")
    print(f"Endpoint: {cluster['Endpoint']['Address']}:{cluster['Endpoint']['Port']}")
    print(f"Security Groups: {cluster['VpcSecurityGroups']}")

In [None]:
def get_secret(secret_name, region_name="us-east-2"):
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    
    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
        return json.loads(get_secret_value_response['SecretString'])
    except Exception as e:
        print(f"Error retrieving secret: {str(e)}")
        raise

In [None]:
def execute_redshift_query(cluster_id, database, db_user, query):
    redshift_client = boto3.client('redshift-data')
    
    try:
        response = redshift_client.execute_statement(
            ClusterIdentifier='Cluster_ID',
            Database=database,
            DbUser=db_user,
            Sql=query
        )
        
        query_id = response['Id']
        
        while True:
            status_response = redshift_client.describe_statement(Id=query_id)
            status = status_response['Status']
            
            if status == 'FINISHED':
                print(f"Query completed successfully: {query}")
                # Get the results if it's a SELECT query
                if query.strip().upper().startswith('SELECT'):
                    result = redshift_client.get_statement_result(Id=query_id)
                    return result
                return True
            elif status == 'FAILED':
                print(f"Query failed: {status_response.get('Error', 'Unknown error')}")
                return False
            
            time.sleep(0.5)
            
    except Exception as e:
        print(f"Error executing query: {str(e)}")
        raise

In [None]:
try:
    # Get credentials from Secrets Manager
    secret_name = "Redshift-redshift-secret"
    credentials = get_secret(secret_name)
    
    username = credentials.get('username')
    
    # Skip database creation since it already exists
    print("Database 'analytics' already exists, proceeding with schema creation...")
    
    # Create schema
    print("Creating schema...")
    create_schema_query = "CREATE SCHEMA IF NOT EXISTS analytics_schema;"
    success = execute_redshift_query('Cluster_ID', 'analytics', username, create_schema_query)
    
    if success:
        # Create tables
        create_tables_queries = [
            """
            CREATE TABLE IF NOT EXISTS analytics_schema.customers (
                customer_id INTEGER PRIMARY KEY,
                name VARCHAR(100),
                email VARCHAR(100),
                created_date TIMESTAMP
            );
            """,
            
            """
            CREATE TABLE IF NOT EXISTS analytics_schema.orders (
                order_id INTEGER PRIMARY KEY,
                customer_id INTEGER,
                order_date TIMESTAMP,
                total_amount DECIMAL(10,2),
                FOREIGN KEY (customer_id) REFERENCES analytics_schema.customers(customer_id)
            );
            """
        ]
        
        print("Creating tables...")
        for query in create_tables_queries:
            success = execute_redshift_query('Cluster_ID', 'analytics', username, query)
            if not success:
                print("Failed to create tables")
                break
        
        # Insert sample data
        insert_queries = [
            """
            INSERT INTO analytics_schema.customers (customer_id, name, email, created_date)
            VALUES 
                (1, 'John Doe', 'john@example.com', CURRENT_TIMESTAMP),
                (2, 'Jane Smith', 'jane@example.com', CURRENT_TIMESTAMP);
            """,
            
            """
            INSERT INTO analytics_schema.orders (order_id, customer_id, order_date, total_amount)
            VALUES 
                (1, 1, CURRENT_TIMESTAMP, 100.50),
                (2, 1, CURRENT_TIMESTAMP, 200.75),
                (3, 2, CURRENT_TIMESTAMP, 150.25);
            """
        ]
        
        print("Inserting sample data...")
        for query in insert_queries:
            success = execute_redshift_query('Cluster_ID', 'analytics', username, query)
            if not success:
                print("Failed to insert sample data")
                break
        
        # Verify the data
        verify_queries = [
            "SELECT COUNT(*) FROM analytics_schema.customers;",
            "SELECT COUNT(*) FROM analytics_schema.orders;"
        ]
        
        print("Verifying data...")
        for query in verify_queries:
            result = execute_redshift_query('Cluster_ID', 'analytics', username, query)
            if isinstance(result, dict):
                print(f"Query {query} result:", result)

except Exception as e:
    print(f"Error: {str(e)}")

In [None]:
def test_redshift_data_api():
    redshift_data = boto3.client('redshift-data')
    
    try:
        # Execute a simple query
        response = redshift_data.execute_statement(
            ClusterIdentifier='Cluster_ID',
            Database='analytics',
            DbUser=credentials['username'],
            Sql='SELECT COUNT(*) FROM analytics_schema.customers'
        )
        
        query_id = response['Id']
        
        # Wait for query completion
        while True:
            status = redshift_data.describe_statement(Id=query_id)
            if status['Status'] == 'FINISHED':
                result = redshift_data.get_statement_result(Id=query_id)
                print("Data API connection successful!")
                print(f"Query result: {result}")
                break
            elif status['Status'] == 'FAILED':
                print(f"Query failed: {status.get('Error')}")
                break
            time.sleep(0.5)
            
    except Exception as e:
        print(f"Data API error: {str(e)}")

test_redshift_data_api()

In [None]:
def convert_value(value, data_type):
    if value is None:
        return None
    
    if isinstance(data_type, TimestampType):
        # Convert string timestamp to datetime
        try:
            return datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
        except ValueError:
            try:
                return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
            except ValueError:
                return None
    elif isinstance(data_type, DecimalType):
        return float(value)
    elif isinstance(data_type, LongType):
        return int(value)
    else:
        return value

In [None]:
def read_redshift_with_data_api(query):
    redshift_data = boto3.client('redshift-data')
    
    try:
        # Execute query
        response = redshift_data.execute_statement(
            ClusterIdentifier='Cluster_ID',
            Database='analytics',
            DbUser=credentials['username'],
            Sql=query
        )
        
        query_id = response['Id']
        
        # Wait for completion
        while True:
            status = redshift_data.describe_statement(Id=query_id)
            if status['Status'] == 'FINISHED':
                result = redshift_data.get_statement_result(Id=query_id)
                
                # Get column metadata
                column_metadata = result['ColumnMetadata']
                
                # Create schema based on column metadata
                schema = StructType([
                    StructField(
                        col['name'],
                        {
                            'int8': LongType(),
                            'varchar': StringType(),
                            'timestamp': TimestampType(),
                            'decimal': DecimalType(10, 2),
                        }.get(col['typeName'].lower(), StringType()),
                        True
                    ) for col in column_metadata
                ])
                
                # Convert result to list of rows with proper type conversion
                data = []
                for record in result['Records']:
                    row = []
                    for field, col_meta, schema_field in zip(record, column_metadata, schema.fields):
                        # Get the first non-None value from the field
                        value = next((v for k, v in field.items() if v is not None), None)
                        # Convert the value to the appropriate type
                        converted_value = convert_value(value, schema_field.dataType)
                        row.append(converted_value)
                    data.append(row)
                
                # Create Spark DataFrame
                spark = SparkSession.builder \
                    .appName("Redshift Data API") \
                    .getOrCreate()
                
                return spark.createDataFrame(data, schema)
                
            elif status['Status'] == 'FAILED':
                raise Exception(f"Query failed: {status.get('Error')}")
            
            time.sleep(0.5)
    
    except Exception as e:
        print(f"Error: {str(e)}")
        raise

In [None]:
# Test reading data
try:
    # Read customers
    print("Reading customers table...")
    customers_df = read_redshift_with_data_api("""
        SELECT * FROM analytics_schema.customers
    """)
    print("\nCustomers data:")
    customers_df.show()
    
    # Print schema to verify data types
    print("\nCustomers schema:")
    customers_df.printSchema()
    
    # Read orders
    print("\nReading orders table...")
    orders_df = read_redshift_with_data_api("""
        SELECT * FROM analytics_schema.orders
    """)
    print("\nOrders data:")
    orders_df.show()
    
    # Print schema to verify data types
    print("\nOrders schema:")
    orders_df.printSchema()
    
    # Example of joining data
    print("\nJoining customers and orders...")
    joined_df = customers_df.join(
        orders_df,
        customers_df.customer_id == orders_df.customer_id,
        "left"
    )
    print("\nJoined data:")
    joined_df.show()
    
    # Example of aggregation
    print("\nCalculating total orders per customer...")
    agg_df = orders_df.groupBy("customer_id") \
        .agg({"total_amount": "sum"}) \
        .withColumnRenamed("sum(total_amount)", "total_spent")
    print("\nAggregated data:")
    agg_df.show()

except Exception as e:
    print(f"Error: {str(e)}")
    import traceback
    traceback.print_exc()