## Importing Libraries

In [1]:
import os
import boto3
from dotenv import load_dotenv
import snowflake.connector
import pandas as pd

## Setting Up Snowflake Connection and AWS S3 Connection

References:
* https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration

### Loading Environmental Variables

In [2]:
# Load environment variables from the .env file
load_dotenv()

# Fetch Snowflake credentials from environment variables
snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
snowflake_database = os.getenv('SNOWFLAKE_DATABASE')
snowflake_schema_bronze = os.getenv('SNOWFLAKE_SCHEMA_BRONZE')
snowflake_schema_silver = os.getenv('SNOWFLAKE_SCHEMA_SILVER')
snowflake_schema_gold = os.getenv('SNOWFLAKE_SCHEMA_GOLD')

# Fetch AWS IAM role ARN and S3 bucket details from environment variables
s3_snowflake_iam_role_arn = os.getenv('S3_SNOWFLAKE_IAM_ROLE_ARN')
s3_iam_role_arn = os.getenv('S3_IAM_ROLE_ARN')
s3_bucket_name = os.getenv('S3_BUCKET_NAME')
s3_file_prefix = ''  # No prefix needed since the bucket name itself is the prefix
s3_access_key_id = os.getenv('S3_ACCESS_KEY_ID')
s3_secret_access_key = os.getenv('S3_SECRET_ACCESS_KEY')
s3_region = os.getenv('S3_REGION')

## Setting Up Snowflake External Tables

### Creating Roles, Granting Priviledges to Roles, Creating Users, Granting Roles to Users

In [3]:
# SQL statements for role creation and privilege assignment
create_role_sql = "CREATE ROLE IF NOT EXISTS storage_admin"

grant_privileges_sql_1 = f"GRANT USAGE ON WAREHOUSE {snowflake_warehouse.upper()} TO ROLE storage_admin;"
grant_privileges_sql_2 = f"GRANT USAGE ON DATABASE {snowflake_database.upper()} TO ROLE storage_admin;"
grant_privileges_sql_3 = f"GRANT USAGE ON SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()} TO ROLE storage_admin;"
grant_privileges_sql_4 = f"GRANT CREATE STAGE ON SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()} TO ROLE storage_admin;"
grant_privileges_sql_5 = f"GRANT USAGE ON INTEGRATION MY_S3_INTEGRATION TO ROLE storage_admin;"

create_user_sql = """
CREATE USER IF NOT EXISTS my_user
    PASSWORD = 'my_password'
    DEFAULT_ROLE = 'storage_admin'
    DEFAULT_WAREHOUSE = 'my_warehouse';
"""

grant_role_to_user_sql = """
GRANT ROLE storage_admin TO USER my_user;
"""

### Creating Storage Integration

In [4]:
# Create Storage Integration
# Reference: https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration
create_storage_integration_sql = f"""
CREATE STORAGE INTEGRATION IF NOT EXISTS MY_S3_INTEGRATION
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = 'S3'
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = '{s3_snowflake_iam_role_arn}'
    STORAGE_ALLOWED_LOCATIONS = ('s3://{s3_bucket_name}');
"""
describe_storage_integration_sql = "DESC INTEGRATION MY_S3_INTEGRATION"
show_storage_integration_sql = "SHOW STORAGE INTEGRATIONS LIKE 'MY_S3_INTEGRATION'"

### Create External Stage

In [5]:
# Create External Stage
# Reference: https://docs.snowflake.com/en/user-guide/data-load-s3-create-stage

# Set the current database and schema for the user session
current_db_schema_sql_1 = f"USE DATABASE {snowflake_database.upper()};"
current_db_schema_sql_2 = f"USE SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()};"

# Grant USAGE privilege on the Storage Integration
grant_usage_on_integration_to_role = "GRANT USAGE ON INTEGRATION MY_S3_INTEGRATION TO ROLE storage_admin;"

# Grant CREATE STAGE priviledge for the Schema
grant_external_stage_to_role = "GRANT CREATE STAGE ON SCHEMA MY_DBT_DATABASE.BRONZE TO ROLE storage_admin;"

# Grant USAGE privilege on the External Stage
grant_usage_on_external_stage_to_role = "GRANT USAGE ON STAGE MY_DBT_DATABASE.BRONZE.MY_S3_STAGE TO ROLE storage_admin;"

create_external_stage_sql = f"""
CREATE STAGE IF NOT EXISTS MY_S3_STAGE
    STORAGE_INTEGRATION = MY_S3_INTEGRATION
    URL='s3://{s3_bucket_name}';
"""

show_external_stage_sql = "SHOW STAGES LIKE 'MY_S3_STAGE' IN DATABASE"

### Create External Tables

In [6]:
# Grant CREATE TABLE privilege on the schema
grant_create_table_on_schema = f"GRANT CREATE TABLE ON SCHEMA {snowflake_schema_bronze.upper()} TO ROLE storage_admin;"


tables_to_create = [
                    ('customers_ext', '@my_dbt_database.bronze.my_s3_stage/dbt-duckdb-ingestion-s3-parquet/'),
                    ('dates_ext', '@my_dbt_database.bronze.my_s3_stage/dbt-duckdb-ingestion-s3-parquet/'),
                    ('product_usage_ext', '@my_dbt_database.bronze.my_s3_stage/dbt-duckdb-ingestion-s3-parquet/'),
                    ('products_ext', '@my_dbt_database.bronze.my_s3_stage/dbt-duckdb-ingestion-s3-parquet/'),
                    ('subscriptions_ext', '@my_dbt_database.bronze.my_s3_stage/dbt-duckdb-ingestion-s3-parquet/'),
                    ('support_interactions_ext', '@my_dbt_database.bronze.my_s3_stage/dbt-duckdb-ingestion-s3-parquet/')
                    ]

In [7]:
# Function to create external table
# Reference: https://docs.snowflake.com/en/user-guide/tables-external-intro
# Note: you need to add the "AS (value:c1::int)" part for all columns in all external tables, or snowflake will show the following
# error -> "Snowflake ProgrammingError: 091064 (42601): External table column XYZ must have a defining expression."

def create_external_table(conn, table_name, s3_location):
    with conn.cursor() as cursor:
        if table_name == 'customers_ext':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.customers (
                "CustomerID" INT AS (value:c1::int),
                "Name" VARCHAR(100) AS (value:c2::varchar),
                "Age" INT AS (value:c3::int),
                "Gender" VARCHAR(10) AS (value:c4::varchar),
                "SignupDate" DATE AS (value:c5::date)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'dates_ext':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.dates (
                "DateID" INT AS (value:c1::int),
                "Date" DATE AS (value:c2::date),
                "Week" INT AS (value:c3::int),
                "Month" INT AS (value:c4::int),
                "Quarter" INT AS (value:c5::int),
                "Year" INT AS (value:c6::int)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'product_usage_ext':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.product_usage (
                "UsageID" INT AS (value:c1::int),
                "CustomerID" INT AS (value:c2::int),
                "DateID" INT AS (value:c3::int),
                "ProductID" INT AS (value:c4::int),
                "NumLogins" INT AS (value:c5::int),
                "Amount" DECIMAL(10, 2) AS (value:c6::decimal)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'products_ext':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.products (
                "ProductID" INT AS (value:c1::int),
                "ProductName" VARCHAR(100) AS (value:c2::varchar),
                "Category" VARCHAR(50) AS (value:c3::varchar),
                "Price" DECIMAL(10, 2) AS (value:c4::decimal)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'subscriptions_ext':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.subscriptions (
                "SubscriptionID" INT AS (value:c1::int),
                "CustomerID" INT AS (value:c2::int),
                "StartDate" DATE AS (value:c3::date),
                "EndDate" DATE AS (value:c4::date),
                "Type" VARCHAR(50) AS (value:c5::varchar),
                "Status" VARCHAR(50) AS (value:c6::varchar)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'support_interactions_ext':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.support_interactions (
                "InteractionID" INT AS (value:c1::int),
                "CustomerID" INT AS (value:c2::int),
                "DateID" INT AS (value:c3::int),
                "IssueType" VARCHAR(100) AS (value:c4::varchar),
                "ResolutionTime" INT AS (value:c5::int)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        else:
            raise ValueError(f"Unknown table name: {table_name}")

        cursor.execute(create_table_sql)

### Part 1 of 2: Opening Snowflake Connection - User, Privileges, Roles 

In [8]:
try:
    # Snowflake connection context manager
    with snowflake.connector.connect(
        user=snowflake_user,
        password=snowflake_password,
        account=snowflake_account,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema_bronze  # Optionally specify the schema for the session
    ) as conn:
        
        # Execute operations within the context manager
        cursor = conn.cursor()

        try:
            # Create the role
            cursor.execute(create_role_sql)
            print("Role created successfully.")

            # Grant privileges
            cursor.execute(grant_privileges_sql_1)
            cursor.execute(grant_privileges_sql_2)
            cursor.execute(grant_privileges_sql_3)
            cursor.execute(grant_privileges_sql_4)
            cursor.execute(grant_privileges_sql_5)
            print("Privileges granted successfully.")

            # Create the user
            cursor.execute(create_user_sql)
            print("User created successfully.")

            # Grant role to the user
            cursor.execute(grant_role_to_user_sql)
            print("Role assigned to user successfully.")

        except snowflake.connector.errors.ProgrammingError as e:
                print(f"Snowflake ProgrammingError: {e}")

        finally:
            cursor.close()    

except snowflake.connector.errors.DatabaseError as e:
    print(f"Snowflake DatabaseError: {e}")

print("User-Privilege-Role Script execution completed.")

Role created successfully.
Privileges granted successfully.
User created successfully.
Role assigned to user successfully.
User-Privilege-Role Script execution completed.


### Part 2 of 2: Opening Snowflake Connection - Storage Integration, External Stage, External Tables 

In [9]:
try:
    # Snowflake connection context manager
    with snowflake.connector.connect(
        user=snowflake_user,
        password=snowflake_password,
        account=snowflake_account,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema_bronze  # Optionally specify the schema for the session
    ) as conn:
        
        # Execute operations within the context manager
        cursor = conn.cursor()

        try:
            # Create storage integration
            cursor.execute(create_storage_integration_sql)
            print("Storage integration created successfully.")

            # Show storage integration details
            cursor.execute(show_storage_integration_sql)
            result = cursor.fetchall()
            print("Storage Integration details:")
            for row in result:
                print(row)
            
            # Describe storage integration details
            cursor.execute(describe_storage_integration_sql)
            result = cursor.fetchall()
            print("Describing Storage Integration details:")
            storage_aws_iam_user_arn = None
            storage_aws_external_id = None
            for row in result:
                print(row)
                if row[0] == 'STORAGE_AWS_IAM_USER_ARN':
                    storage_aws_iam_user_arn = row[2]
                elif row[0] == 'STORAGE_AWS_EXTERNAL_ID':
                    storage_aws_external_id = row[2]
            
            # Print the retrieved values with specified names
            print(f"STORAGE_AWS_IAM_USER_ARN: {storage_aws_iam_user_arn}")
            print(f"STORAGE_AWS_EXTERNAL_ID: {storage_aws_external_id}")
            
            # Create External Stage
            ## Set the current database and schema for the user session
            cursor.execute(current_db_schema_sql_1)
            cursor.execute(current_db_schema_sql_2)

            ## Grant USAGE privilege on the Storage Integration 
            cursor.execute(grant_usage_on_integration_to_role)
            
            ## Grant USAGE privilege on the External Stage
            cursor.execute(grant_usage_on_external_stage_to_role)
            
            ## Grant CREATE STAGE priviledge for the Schema
            cursor.execute(grant_external_stage_to_role)
            
            ## Execute External Stage Creation
            cursor.execute(create_external_stage_sql)
            print("External stage created successfully.")

            # Show external stage details
            cursor.execute(show_external_stage_sql)
            result = cursor.fetchall()
            print("External stage details:")
            for row in result:
                print(row)

            # Create external tables
            ## Grant Create Table on Schema to Role
            cursor.execute(grant_create_table_on_schema)

            ## Execute External Tables creation
            for table_name, s3_location in tables_to_create:
                create_external_table(conn, table_name, s3_location)
                print(f"External table '{table_name.upper()}' created successfully.")

            ## Print External Tables from Snowflake
            # Query to show all external tables in the schema
            show_external_tables_sql = f"SHOW EXTERNAL TABLES IN SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()}"
            cursor.execute(show_external_tables_sql)
            result = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            external_tables_df = pd.DataFrame(result, columns=columns)
            # Select only the specified columns
            columns_to_select = ['name', 'database_name', 'schema_name', 'owner', 'comment', 'stage', 'location']
            external_tables_df = external_tables_df[columns_to_select]
            print("External tables retrieved successfully.")
            print(external_tables_df)

        except snowflake.connector.errors.ProgrammingError as e:
                print(f"Snowflake ProgrammingError: {e}")

        finally:
            cursor.close()    

except snowflake.connector.errors.DatabaseError as e:
    print(f"Snowflake DatabaseError: {e}")

print("Script execution completed.")

Storage integration created successfully.
Storage Integration details:
('MY_S3_INTEGRATION', 'EXTERNAL_STAGE', 'STORAGE', 'true', None, datetime.datetime(2024, 7, 17, 3, 31, 31, 640000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>))
Describing Storage Integration details:
('ENABLED', 'Boolean', 'true', 'false')
('STORAGE_PROVIDER', 'String', 'S3', '')
('STORAGE_ALLOWED_LOCATIONS', 'List', 's3://dbt-duckdb-ingestion-s3-parquet', '[]')
('STORAGE_BLOCKED_LOCATIONS', 'List', '', '[]')
('STORAGE_AWS_IAM_USER_ARN', 'String', 'arn:aws:iam::373459924264:user/8j2m0000-s', '')
('STORAGE_AWS_ROLE_ARN', 'String', 'arn:aws:iam::533267405478:role/mysnowflakerole', '')
('STORAGE_AWS_EXTERNAL_ID', 'String', 'EI06736_SFCRole=2_GZHvwCF2WYu0Z2yJn/m8An755Jg=', '')
('COMMENT', 'String', '', '')
STORAGE_AWS_IAM_USER_ARN: arn:aws:iam::373459924264:user/8j2m0000-s
STORAGE_AWS_EXTERNAL_ID: EI06736_SFCRole=2_GZHvwCF2WYu0Z2yJn/m8An755Jg=
External stage created successfully.
External stage det

### Checking if Privileges are Correctly Granted

In [10]:
# try:
#     # Snowflake connection context manager
#     with snowflake.connector.connect(
#         user=snowflake_user,
#         password=snowflake_password,
#         account=snowflake_account,
#         warehouse=snowflake_warehouse,
#         database=snowflake_database,
#         schema=snowflake_schema_bronze  # Optionally specify the schema for the session
#     ) as conn:
        
#         # Execute operations within the context manager
#         cursor = conn.cursor()

#         try:
#             # Query to retrieve privileges granted to the role
#             query = """
#             SELECT *
#             FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES
#             WHERE GRANTEE = 'STORAGE_ADMIN'
#             ORDER BY OBJECT_TYPE, OBJECT_NAME, PRIVILEGE_TYPE;
#             """
            
#             cursor.execute(query)
#             result = cursor.fetchall()
            
#             # Convert result to a DataFrame
#             df = pd.DataFrame(result, columns=[desc[0] for desc in cursor.description])
            
#             # Print the DataFrame
#             print("Privileges granted to STORAGE_ADMIN:")
#             print(df)

#         except snowflake.connector.errors.ProgrammingError as e:
#             print(f"Snowflake ProgrammingError: {e}")

#         finally:
#             cursor.close()    

# except snowflake.connector.errors.DatabaseError as e:
#     print(f"Snowflake DatabaseError: {e}")

# print("Script execution completed.")