In [None]:
import snowflake.connector
import snowflake.connector as sc
import os
from cryptography.hazmat.primitives import serialization

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

#  snowflake_helper

In [11]:
# snowflake_helper.py
import os
import logging
import threading
from queue import Queue
from contextlib import contextmanager
import snowflake.connector
import pandas as pd

logging.basicConfig(level=logging.INFO)

class SnowflakeHelper:
    def __init__(self, database, schema, minconn=1, maxconn=5):
        self.database = database
        self.schema = schema
        self.minconn = minconn
        self.maxconn = maxconn
        self.pool = Queue(maxconn)
        self.lock = threading.Lock()
        self._initialize_pool()

    def _initialize_pool(self):
        """Initialize the connection pool with minconn connections."""
        for _ in range(self.minconn):
            self.pool.put(self._create_connection())

    def _create_connection(self):
        """Create a new Snowflake connection using env variables (with key-pair auth)."""
        return snowflake.connector.connect(
            user=os.getenv("SNOWFLAKE_USER"),
            private_key_file=os.getenv("SNOWFLAKE_PRIVATE_KEY"),  # path to private key
            account=os.getenv("SNOWFLAKE_ACCOUNT"),
            warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
            authenticator=os.getenv("SNOWFLAKE_AUTHENTICATOR"),
            role=os.getenv("SNOWFLAKE_ROLE"),
            database=self.database,
            schema=self.schema,
        )

    @contextmanager
    def _get_connection(self):
        """Context manager to fetch & release a connection."""
        conn = None
        try:
            conn = self.pool.get(block=True, timeout=10)
            yield conn
        except Exception as e:
            logging.error(f"Error getting connection: {e}")
            if conn:
                conn.close()
            raise
        finally:
            if conn:
                self.pool.put(conn)

    def execute_query(self, query, params=None, as_df=False):
        """
        Execute a SQL query using the pool.
        :param query: SQL query string
        :param params: dict for parameterized query
        :param as_df: return results as pandas DataFrame if True
        """
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute(query, params or {})
                if as_df:
                    df = cursor.fetch_pandas_all()
                    cursor.close()
                    return df
                else:
                    results = cursor.fetchall()
                    cursor.close()
                    return results
        except Exception as e:
            logging.error(f"Query execution failed: {e}")
            raise

In [12]:
# Example - database/schema passed at initialization
sf = SnowflakeHelper(database="ahp", schema="qa", minconn=2, maxconn=10)

INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 3.17.4, Python Version: 3.10.10, Platform: Windows-10-10.0.19045-SP0
INFO:snowflake.connector.connection:Connecting to GLOBAL Snowflake domain
INFO:botocore.credentials:Found credentials in environment variables.
INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 3.17.4, Python Version: 3.10.10, Platform: Windows-10-10.0.19045-SP0
INFO:snowflake.connector.connection:Connecting to GLOBAL Snowflake domain


In [None]:
# cqo_chatbot_case_level
# cqo_chatbot_product_level
# cqo_chatbot_robotics

In [16]:
df = sf.execute_query("SELECT * from cqo_chatbot_case_level limit 1000", as_df=True)