In [None]:
import snowflake.connector

import os

import logging
 
# Configure logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)
 
def run_snowflake_queries():

    # Retrieve credentials from environment variables

    user = os.getenv('SNOWFLAKE_USER')

    password = os.getenv('SNOWFLAKE_PASSWORD')

    account = os.getenv('SNOWFLAKE_ACCOUNT')

    warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')

    database = os.getenv('SNOWFLAKE_DATABASE')

    schema = os.getenv('SNOWFLAKE_SCHEMA')
 
    conn = None

    cur = None
 
    try:

        # Establish a connection to Snowflake

        conn = snowflake.connector.connect(

            user=user,

            password=password,

            account=account,

            warehouse=warehouse,

            database=database,

            schema=schema

        )

        logger.info("Connected to Snowflake successfully")
 
        # Create a cursor object to interact with the database

        cur = conn.cursor()
 
        # Set the role to ACCOUNTADMIN

        cur.execute("USE ROLE ACCOUNTADMIN")

        logger.info("Role set to ACCOUNTADMIN")
 
        # Set the database and schema explicitly

        cur.execute(f"USE DATABASE {database}")

        cur.execute(f"USE SCHEMA {schema}")

        logger.info(f"Using database: {database} and schema: {schema}")
 
        # Create schema if not exists

        cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")

        logger.info(f"Schema {schema} created or already exists")
 
        # Run the queries

        logger.info("Executing queries...")
 
        # Create the table

        cur.execute("""

        CREATE OR REPLACE TABLE RAW_CO2.Daily_Measurements (

            date STRING,

            co2_ppm FLOAT

        );

        """)

        logger.info("Table created or replaced")
 
        # Verify table creation

        cur.execute(f"SHOW TABLES LIKE 'Daily_Measurements' IN SCHEMA {schema}")

        if cur.fetchone():

            logger.info("Table 'Daily_Measurements' exists in the schema")

        else:

            logger.warning("Table 'Daily_Measurements' does not exist in the schema")
 
        # Grant privileges to the ACCOUNTADMIN role

        cur.execute("""

        GRANT ALL PRIVILEGES ON TABLE RAW_CO2.Daily_Measurements TO ROLE ACCOUNTADMIN;

        """)

        logger.info("Privileges granted")
 
        # Copy data from the stage into the table

        cur.execute("""

        COPY INTO RAW_CO2.Daily_Measurements

            FROM @RAW_CO2.CO2_EXTERNAL_STAGE

            FILE_FORMAT = (

                TYPE = 'CSV' 

                SKIP_HEADER = 1

                FIELD_OPTIONALLY_ENCLOSED_BY = '"'

            )

            ON_ERROR = 'CONTINUE';

        """)

        logger.info("Data copied into the table")
 
        # Verify data loading

        cur.execute("SELECT COUNT(*) FROM RAW_CO2.Daily_Measurements")

        count = cur.fetchone()[0]

        logger.info(f"Number of rows in Daily_Measurements: {count}")
 
        # Create a stream on the table

        cur.execute("""

        CREATE OR REPLACE STREAM RAW_CO2.DAILY_MEASUREMENTS_STREAM 

        ON TABLE RAW_CO2.Daily_Measurements;

        """)

        logger.info("Stream created or replaced")
 
        # Verify stream creation

        cur.execute(f"SHOW STREAMS LIKE 'DAILY_MEASUREMENTS_STREAM' IN SCHEMA {schema}")

        if cur.fetchone():

            logger.info("Stream 'DAILY_MEASUREMENTS_STREAM' exists in the schema")

        else:

            logger.warning("Stream 'DAILY_MEASUREMENTS_STREAM' does not exist in the schema")
 
        # Commit the transaction

        conn.commit()

        logger.info("Transaction committed")
 
        logger.info("All queries executed successfully!")
 
    except Exception as e:

        logger.error(f"Error: {str(e)}")

        logger.error(f"Error Type: {type(e).__name__}")

        raise  # Re-raise the exception to fail the DAG
 
    finally:

        # Close the cursor and connection

        if cur:

            cur.close()

        if conn:

            conn.close()

        logger.info("Snowflake connection and cursor closed")
 
# Call the function

if __name__ == "__main__":

    run_snowflake_queries()

 