## Importing Libraries

In [1]:
import os
import boto3
from dotenv import load_dotenv
import snowflake.connector
import pandas as pd

## Setting Up Snowflake Connection and AWS S3 Connection

References:
* https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration

### Loading Environmental Variables

In [2]:
# Load environment variables from the .env file
load_dotenv()

# Fetch Snowflake credentials from environment variables
snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
snowflake_database = os.getenv('SNOWFLAKE_DATABASE')
snowflake_schema_bronze = os.getenv('SNOWFLAKE_SCHEMA_PUBLIC')
snowflake_schema_bronze = os.getenv('SNOWFLAKE_SCHEMA_BRONZE')
snowflake_schema_silver = os.getenv('SNOWFLAKE_SCHEMA_SILVER')
snowflake_schema_gold = os.getenv('SNOWFLAKE_SCHEMA_GOLD')

# Fetch AWS IAM role ARN and S3 bucket details from environment variables
s3_snowflake_iam_role_arn = os.getenv('S3_SNOWFLAKE_IAM_ROLE_ARN')
s3_snowflake_storage_integration = os.getenv('S3_SNOWFLAKE_STORAGE_INTEGRATION')
s3_snowflake_stage= os.getenv('S3_SNOWFLAKE_STAGE')
s3_iam_role_arn = os.getenv('S3_IAM_ROLE_ARN')
s3_bucket_name = os.getenv('S3_BUCKET_NAME')
s3_file_prefix = ''  # No prefix needed since the bucket name itself is the prefix
s3_access_key_id = os.getenv('S3_ACCESS_KEY_ID')
s3_secret_access_key = os.getenv('S3_SECRET_ACCESS_KEY')
s3_region = os.getenv('S3_REGION')

## Setting Up Snowflake External Tables

### Creating Warehouse, Database, and Schema in Snowflake

In [3]:
# Create warehouse
create_snowflake_warehouse_sql = f"""
        CREATE WAREHOUSE IF NOT EXISTS {snowflake_warehouse.upper()}
          WITH WAREHOUSE_SIZE = 'XSMALL'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE
          INITIALLY_SUSPENDED = TRUE;
"""

# Use Warehouse
use_snowflake_warehouse = f"ALTER WAREHOUSE {snowflake_warehouse.upper()} RESUME;"

# Create Database
create_snowflake_database_sql = f"CREATE DATABASE IF NOT EXISTS {snowflake_database.upper()};"

# Use Database
use_snowflake_database = f"USE DATABASE {snowflake_database.upper()};"

# Create Bronze Schema
create_snowflake_bronze_schema = f"CREATE SCHEMA IF NOT EXISTS {snowflake_database.upper()}.{snowflake_schema_bronze.upper()};"

# Use Bronze Schema
use_snowflake_bronze_schema = f"USE SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()};"

### Creating Roles, Granting Priviledges to Roles, Creating Users, Granting Roles to Users

In [4]:
# SQL statements for role creation and privilege assignment
create_role_sql = "CREATE ROLE IF NOT EXISTS storage_admin"

priv_usage_on_warehouse = f"GRANT USAGE ON WAREHOUSE {snowflake_warehouse.upper()} TO ROLE storage_admin;"
priv_usage_on_database  = f"GRANT USAGE ON DATABASE {snowflake_database.upper()} TO ROLE storage_admin;"
priv_usage_on_schema    = f"GRANT USAGE ON SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()} TO ROLE storage_admin;"
priv_create_file_fmt_on_schema = f"GRANT CREATE FILE FORMAT ON SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()} TO ROLE storage_admin;"
priv_create_interagration      = "GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE storage_admin;"
priv_usage_on_integration      = f"GRANT USAGE ON INTEGRATION {s3_snowflake_storage_integration.upper()} TO ROLE storage_admin;"
priv_create_stage_on_schema    = f"GRANT CREATE STAGE ON SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()} TO ROLE storage_admin;"
priv_create_table_on_schema    = f"GRANT CREATE TABLE ON SCHEMA {snowflake_schema_bronze.upper()} TO ROLE storage_admin;"


create_user_sql = f"""
CREATE USER IF NOT EXISTS my_user
    PASSWORD = 'my_password'
    DEFAULT_ROLE = 'storage_admin'
    DEFAULT_WAREHOUSE = '{snowflake_warehouse}';
"""

grant_role_to_myuser = "GRANT ROLE storage_admin TO USER my_user;"
grant_role_to_cvconsulting = "GRANT ROLE storage_admin TO USER cvconsulting;"
using_role_storage_admin = "USE ROLE storage_admin;"
current_user_in_snf_session = "SELECT CURRENT_USER();"
current_role_in_snf_session = "SELECT CURRENT_ROLE();"

### Creating Storage Integration

In [5]:
# Create Storage Integration
# Reference: https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration
create_storage_integration_sql = f"""
CREATE STORAGE INTEGRATION IF NOT EXISTS {s3_snowflake_storage_integration.upper()}
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = 'S3'
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = '{s3_snowflake_iam_role_arn}'
    STORAGE_ALLOWED_LOCATIONS = ('s3://{s3_bucket_name}');
"""
show_storage_integration_sql = f"SHOW STORAGE INTEGRATIONS LIKE '{s3_snowflake_storage_integration.upper()}'"
desc_storage_integration_sql = f"DESC INTEGRATION {s3_snowflake_storage_integration.upper()}"

### Create 'named file format' for External Stage

In [6]:
# Create FILE FORMAT
# Reference: https://docs.snowflake.com/en/sql-reference/sql/create-file-format

file_format = "my_parquet_format"

create_named_file_format_sql = f"""
CREATE FILE FORMAT IF NOT EXISTS {file_format}
    TYPE = 'PARQUET'
    COMPRESSION = 'SNAPPY';
"""

# Grant USAGE privilege on the File Format
grant_usage_on_file_format = f"GRANT USAGE ON FILE FORMAT {file_format.upper()} TO ROLE storage_admin;"

### Create External Stage

In [7]:
# Create External Stage
# Reference: https://docs.snowflake.com/en/user-guide/data-load-s3-create-stage

# Use Database
use_snowflake_database = f"USE DATABASE {snowflake_database.upper()};"

# Use Bronze Schema
use_snowflake_bronze_schema = f"USE SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()};"

create_external_stage_sql = f"""
CREATE STAGE IF NOT EXISTS {s3_snowflake_stage.upper()}
    STORAGE_INTEGRATION = {s3_snowflake_storage_integration.upper()}
    URL='s3://{s3_bucket_name}'
    FILE_FORMAT = {file_format};
"""
# Grant USAGE privilege on the External Stage
grant_usage_on_external_stage = f"GRANT USAGE ON STAGE {snowflake_database.upper()}.{snowflake_schema_bronze.upper()}.{s3_snowflake_stage.upper()} TO ROLE storage_admin;"

show_external_stage_sql = f"SHOW STAGES LIKE '{s3_snowflake_stage.upper()}' IN DATABASE"

### Create External Tables

In [8]:
# Parquet files in S3 Bucket
# s3_parquet_files = [customers.parquet
#     par
# ]

# Construct the base S3 location using environment variables
# base_s3_location = f"@{snowflake_database}.{snowflake_schema_bronze}.{s3_snowflake_stage}/{s3_bucket_name}/customers.parquet"
stage_name = f"@{s3_snowflake_stage}"

# Tables to create with their respective S3 locations
tables_to_create = [
    ('customers', stage_name),
    ('dates', stage_name),
    ('product_usage', stage_name),
    ('products', stage_name),
    ('subscriptions', stage_name),
    ('support_interactions', stage_name)
]

In [9]:
# Function to create external table
# Reference: https://docs.snowflake.com/en/user-guide/tables-external-intro
# Note: you need to add the "AS (value:c1::int)" part for all columns in all external tables, or snowflake will show the following
# error -> "Snowflake ProgrammingError: 091064 (42601): External table column XYZ must have a defining expression."

def create_external_table(conn, table_name, s3_location):
    with conn.cursor() as cursor:
        if table_name == 'customers':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.customers (
                "CustomerID" INT     AS (value:c1::int),
                "Name" VARCHAR(100)  AS (value:c2::varchar),
                "Age" INT            AS (value:c3::int),
                "Gender" VARCHAR(10) AS (value:c4::varchar),
                "SignupDate" DATE    AS (value:c5::date)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'dates':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.dates (
                "DateID" INT  AS (value:c1::int),
                "Date" DATE   AS (value:c2::date),
                "Week" INT    AS (value:c3::int),
                "Month" INT   AS (value:c4::int),
                "Quarter" INT AS (value:c5::int),
                "Year" INT    AS (value:c6::int)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'product_usage':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.product_usage (
                "UsageID" INT           AS (value:c1::int),
                "CustomerID" INT        AS (value:c2::int),
                "DateID" INT            AS (value:c3::int),
                "ProductID" INT         AS (value:c4::int),
                "NumLogins" INT         AS (value:c5::int),
                "Amount" DECIMAL(10, 2) AS (value:c6::decimal)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'products':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.products (
                "ProductID" INT            AS (value:c1::int),
                "ProductName" VARCHAR(100) AS (value:c2::varchar),
                "Category" VARCHAR(50)     AS (value:c3::varchar),
                "Price" DECIMAL(10, 2)     AS (value:c4::decimal)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'subscriptions':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.subscriptions (
                "SubscriptionID" INT AS (value:c1::int),
                "CustomerID" INT     AS (value:c2::int),
                "StartDate" DATE     AS (value:c3::date),
                "EndDate" DATE       AS (value:c4::date),
                "Type" VARCHAR(50)   AS (value:c5::varchar),
                "Status" VARCHAR(50) AS (value:c6::varchar)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        elif table_name == 'support_interactions':
            create_table_sql = f"""
            CREATE OR REPLACE EXTERNAL TABLE bronze.support_interactions (
                "InteractionID" INT      AS (value:c1::int),
                "CustomerID" INT         AS (value:c2::int),
                "DateID" INT             AS (value:c3::int),
                "IssueType" VARCHAR(100) AS (value:c4::varchar),
                "ResolutionTime" INT     AS (value:c5::int)
                -- "extracted_at" TIMESTAMP,
                -- "inserted_at" TIMESTAMP
            )
            LOCATION={s3_location}
            FILE_FORMAT=(TYPE=PARQUET)
            """
        else:
            raise ValueError(f"Unknown table name: {table_name}")

        cursor.execute(create_table_sql)

### (Ingestion) Copying Data from S3 to External Tables

In [10]:
# Function to copy data into the external table
def copy_into_table(conn, table_name):
    with conn.cursor() as cursor:
        copy_sql = f"""
        COPY INTO {table_name}
        FROM {stage_name}
        -- FILE_FORMAT = (FORMAT_NAME = {file_format});
        """
        cursor.execute(copy_sql)

### Connecting to Snowflake - Storage Integration, External Stage, External Tables / Role, User, Privileges 

In [11]:
try:
    # Snowflake connection context manager
    with snowflake.connector.connect(
        user=snowflake_user,
        password=snowflake_password,
        account=snowflake_account,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema_bronze  # Optionally specify the schema for the session
    ) as conn:
        
        # Execute operations within the context manager
        cursor = conn.cursor()

        try:
            cursor.execute(create_snowflake_warehouse_sql) # CREATE WAREHOUSE            
            print("Warehouse created successfully.")
            
            cursor.execute(create_snowflake_database_sql)  # CREATE DATABASE
            print("Database created successfully.")
            cursor.execute(use_snowflake_database)         # Use Database (Set it for the user session)
            
            cursor.execute(create_snowflake_bronze_schema) # CREATE SCHEMA    
            print("Bronze Schema created successfully.")
            cursor.execute(use_snowflake_bronze_schema)    # Use Bronze Schema (Set it for the user session)
            
            cursor.execute(create_role_sql)                # CREATE ROLE
            print("Role created successfully.")
            cursor.execute(priv_usage_on_warehouse)        # Grant Usage privilege on Warehouse TO ROLE storage_admin
            cursor.execute(priv_usage_on_database)         # Grant Usage privilege on Database TO ROLE storage_admin
            cursor.execute(priv_usage_on_schema)           # Grant Usage privilege on Schema TO ROLE storage_admin
            cursor.execute(priv_create_table_on_schema)    # Grant Create Table on Schema TO ROLE storage_admin
            cursor.execute(priv_create_file_fmt_on_schema) # Grant Create File Format privilege on Schema TO ROLE storage_admin
            cursor.execute(priv_create_stage_on_schema)    # Grant Create Stage privilege on Schema TO ROLE storage_admin
            
            cursor.execute(create_user_sql)                # CREATE USER
            print("User created successfully.")
            cursor.execute(grant_role_to_myuser)           # Grant Role to the 'myuser' user
            print("Role assigned to my cvconsulting.")  
            cursor.execute(grant_role_to_cvconsulting)     # Grant Role to the 'cvconsulting' user
            print("Role assigned to my myuser.") 

            cursor.execute(priv_create_interagration)      # Grant CREATE STORAGE INTEGRATION TO ROLE storage_admin     
            cursor.execute(create_storage_integration_sql) # CREATE STORAGE INTEGRATION
            print("Storage Integration successful.")
            cursor.execute(priv_usage_on_integration)      # Grant Usage privilege on Integration TO ROLE storage_admin
            cursor.execute(show_storage_integration_sql)   # Storage Integration details
            result = cursor.fetchall()
            print("Storage Integration details:")
            for row in result:
                print(row)
            cursor.execute(desc_storage_integration_sql)  # Storage Integration details for AWS
            result = cursor.fetchall()
            print("Storage Integration details for AWS:")
            storage_aws_iam_user_arn = None
            storage_aws_external_id = None
            for row in result:
                if row[0] == 'STORAGE_AWS_IAM_USER_ARN':
                    storage_aws_iam_user_arn = row[2]
                    print(f"STORAGE_AWS_IAM_USER_ARN: {storage_aws_iam_user_arn}")
                elif row[0] == 'STORAGE_AWS_EXTERNAL_ID':
                    storage_aws_external_id = row[2]
                    print(f"STORAGE_AWS_EXTERNAL_ID: {storage_aws_external_id}")
            
            cursor.execute(create_named_file_format_sql)  # CREATE NAMED FILE FORMAT FOR EXTERNAL STAGE
            print("Named File Format created.")
            cursor.execute(grant_usage_on_file_format)    # Grant Usage on File Format to ROKE storage_admin      
            
            cursor.execute(create_external_stage_sql)     # CREATE EXTERNAL STAGE  
            print("External Stage created.")
            cursor.execute(grant_usage_on_external_stage) # Grant USAGE privilege on the External Stage TO ROLE storage_admin
            cursor.execute(show_external_stage_sql)       # Show External Stage Details
            result = cursor.fetchall()
            print("External stage details:")
            for row in result:
                print(row)
            
            for table_name, s3_location in tables_to_create:          # CREATE EXTERNAL TABLES / COPY INTO TABLE
                create_external_table(conn, table_name, s3_location)
                print(f"External table '{table_name.upper()}' created successfully.")
                # copy_into_table(conn, table_name)
                # print(f"Data copied into {table_name} successfully.")

            ## Print External Tables from Snowflake
            # Query to show all external tables in the schema
            # show_external_tables_sql = f"SHOW EXTERNAL TABLES IN SCHEMA {snowflake_database.upper()}.{snowflake_schema_bronze.upper()}"
            # cursor.execute(show_external_tables_sql)
            # result = cursor.fetchall()
            # columns = [desc[0] for desc in cursor.description]
            # external_tables_df = pd.DataFrame(result, columns=columns)
            # # Select only the specified columns
            # columns_to_select = ['name', 'database_name', 'schema_name', 'owner', 'comment', 'stage', 'location']
            # external_tables_df = external_tables_df[columns_to_select]
            # print("External tables retrieved successfully.")
            # print(external_tables_df)

        except snowflake.connector.errors.ProgrammingError as e:
                print(f"Snowflake ProgrammingError: {e}")

        finally:
            cursor.close()    

except snowflake.connector.errors.DatabaseError as e:
    print(f"Snowflake DatabaseError: {e}")

print("Script execution completed.")

Warehouse created successfully.
Database created successfully.
Bronze Schema created successfully.
Role created successfully.
User created successfully.
Role assigned to my cvconsulting.
Role assigned to my myuser.
Storage Integration successful.
Storage Integration details:
('MY_S3_INTEGRATION', 'EXTERNAL_STAGE', 'STORAGE', 'true', None, datetime.datetime(2024, 7, 24, 8, 7, 55, 605000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>))
Storage Integration details for AWS:
STORAGE_AWS_IAM_USER_ARN: arn:aws:iam::373459924264:user/8j2m0000-s
STORAGE_AWS_EXTERNAL_ID: EI06736_SFCRole=2_oYnCijEks8pRZyHUo7q+AuXB6Y0=
Named File Format created.
External Stage created.
External stage details:
(datetime.datetime(2024, 7, 24, 8, 7, 56, 529000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'MY_S3_STAGE', 'MY_DBT_DATABASE', 'BRONZE', 's3://dbt-duckdb-ingestion-s3-parquet', 'N', 'N', 'ACCOUNTADMIN', '', 'eu-north-1', 'EXTERNAL', 'AWS', None, 'MY_S3_INTEGRATION', 

### Checking if Privileges are Correctly Granted

In [12]:
# try:
#     # Snowflake connection context manager
#     with snowflake.connector.connect(
#         user=snowflake_user,
#         password=snowflake_password,
#         account=snowflake_account,
#         warehouse=snowflake_warehouse,
#         database=snowflake_database,
#         schema=snowflake_schema_bronze  # Optionally specify the schema for the session
#     ) as conn:
        
#         # Execute operations within the context manager
#         cursor = conn.cursor()

#         try:
#             # Query to retrieve privileges granted to the role
#             query = """
#             SELECT *
#             FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES
#             WHERE GRANTEE = 'STORAGE_ADMIN'
#             ORDER BY OBJECT_TYPE, OBJECT_NAME, PRIVILEGE_TYPE;
#             """
            
#             cursor.execute(query)
#             result = cursor.fetchall()
            
#             # Convert result to a DataFrame
#             df = pd.DataFrame(result, columns=[desc[0] for desc in cursor.description])
            
#             # Print the DataFrame
#             print("Privileges granted to STORAGE_ADMIN:")
#             print(df)

#         except snowflake.connector.errors.ProgrammingError as e:
#             print(f"Snowflake ProgrammingError: {e}")

#         finally:
#             cursor.close()    

# except snowflake.connector.errors.DatabaseError as e:
#     print(f"Snowflake DatabaseError: {e}")

# print("Script execution completed.")