# Snowflake Performance Recipes — Versión para **Snowflake Notebooks**
**Fecha:** 2025-11-06
**Autor:** Adolfo Orozco
**Credits:** Juan Manuel Díaz
**Tags:** #snowflake #snowpark #python #sql #cost #performance #recipes

Este cuaderno está optimizado para ejecutarse dentro de **Snowflake Notebooks** usando **Snowpark**. 
Cuando se ejecute fuera de Snowflake Notebooks (por ejemplo, en tu Jupyter local), hará *fallback* a una conexión estándar con credenciales.


## 1. Requisitos previos

- En **Snowflake Notebooks** no necesitas credenciales: se usa `get_active_session()`.
- Si corres localmente:
  - Paquetes: `snowflake-snowpark-python[pandas]`, `python-dotenv` (opcional).
  - Credenciales de Snowflake (account, user, password/SSO/TOTP, role, warehouse, database, schema).


In [None]:
# 1.1 (Opcional, ejecución local) Instalar dependencias
# %pip install --quiet "snowflake-snowpark-python[pandas]" python-dotenv


## 2. Conexión con Snowflake Notebooks (Snowpark)

- En Snowflake Notebooks: `get_active_session()` devuelve una sesión lista para usar.
- En local: `Session.builder.configs({...}).create()` con tus credenciales.


In [None]:
# 2.1 Crear/obtener la sesión de Snowpark
from __future__ import annotations

import os
import getpass

try:
    # Disponible dentro de Snowflake Notebooks
    from snowflake.snowpark.context import get_active_session
    from snowflake.snowpark import Session
    session = get_active_session()
    IN_SNOWFLAKE_NOTEBOOK = True
except Exception:
    # Fallback local
    from snowflake.snowpark import Session
    IN_SNOWFLAKE_NOTEBOOK = False

def get_session() -> "Session":
    """Devuelve una sesión de Snowpark. Usa la sesión activa si estamos en Snowflake Notebooks.
    Si se ejecuta localmente, crea una sesión con credenciales proporcionadas por variables de entorno o input seguro."""
    global session
    if IN_SNOWFLAKE_NOTEBOOK:
        return session
    
    # Lectura de credenciales para ejecución local
    connection_parameters = {
        "account":   os.getenv("SNOWFLAKE_ACCOUNT", "your_account"),
        "user":      os.getenv("SNOWFLAKE_USER", "your_user"),
        "password":  os.getenv("SNOWFLAKE_PASSWORD") or getpass.getpass("Snowflake password: "),
        "role":      os.getenv("SNOWFLAKE_ROLE", "ACCOUNTADMIN"),
        "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE", "COMPUTE_WH"),
        "database":  os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE"),
        "schema":    os.getenv("SNOWFLAKE_SCHEMA", "ACCOUNT_USAGE"),
    }
    session = Session.builder.configs(connection_parameters).create()
    return session

# Probar conexión
_sess = get_session()
_sess.sql("SELECT CURRENT_VERSION() AS VERSION").to_pandas()


## 3. Utilidades y contexto de sesión
Incluimos un *helper* para ejecutar SQL con Snowpark y devolver `pandas.DataFrame`, 
junto con una consulta para inspeccionar el contexto actual.


In [None]:
# 3.1 Helper para ejecutar SQL y devolver pandas DataFrame
import pandas as pd

def run_sql_df(sql: str) -> pd.DataFrame:
    s = get_session()
    return s.sql(sql).to_pandas()

# 3.2 Contexto actual
ctx_sql = '''
SELECT CURRENT_ROLE() AS ROLE,
       CURRENT_WAREHOUSE() AS WAREHOUSE,
       CURRENT_DATABASE() AS DATABASE,
       CURRENT_SCHEMA() AS SCHEMA;
'''
run_sql_df(ctx_sql)


## 4. Carga vs. colas por warehouse (últimos 30 días)

**Propósito:** identificar por día la suma de consultas promedio en ejecución vs. en cola. 
Días con colas elevadas pueden sugerir activar**Multi-Cluster** o ajustar tamaños.


In [None]:
# 4.1 Carga diaria vs. colas (últimos 30 días)
sql_load_vs_queue = '''
SELECT
    CAST(START_TIME AS DATE) AS DATE,
    WAREHOUSE_NAME,
    SUM(AVG_RUNNING)      AS SUM_RUNNING_QUERIES,
    SUM(AVG_QUEUED_LOAD)  AS SUM_QUEUED_QUERIES
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_LOAD_HISTORY
WHERE CAST(START_TIME AS DATE) >= DATEADD(MONTH, -1, CURRENT_DATE())
GROUP BY 1, 2
HAVING SUM(AVG_QUEUED_LOAD) > 0
ORDER BY DATE DESC, WAREHOUSE_NAME;
'''
run_sql_df(sql_load_vs_queue)


## 5. Consultas con **spilling** severo (ratio > 5×)

**Recomendación:** si `BYTES_SPILLED_TO_REMOTE_STORAGE` es más de 5× `BYTES_SCANNED`, la consulta podría beneficiarse de tuning o de un warehouse mayor.


In [None]:
# 5.1 Top consultas por ratio de spilling remoto
sql_spill_ratio = '''
SELECT
    QUERY_ID,
    USER_NAME,
    WAREHOUSE_NAME,
    WAREHOUSE_SIZE,
    BYTES_SCANNED,
    BYTES_SPILLED_TO_REMOTE_STORAGE,
    BYTES_SPILLED_TO_REMOTE_STORAGE / NULLIF(BYTES_SCANNED, 0) AS SPILLING_READ_RATIO
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE BYTES_SPILLED_TO_REMOTE_STORAGE > BYTES_SCANNED * 5
ORDER BY SPILLING_READ_RATIO DESC;
'''
run_sql_df(sql_spill_ratio)


## 6. Top 10 consultas con mayor **spilling remoto** (últimos 45 días)

Incluye duración total para enfocarnos en casos costosos.


In [None]:
# 6.1 Top 10 spilling últimos 45 días
sql_top_spill_45d = '''
SELECT
    QUERY_ID,
    SUBSTR(QUERY_TEXT, 1, 80) AS PARTIAL_QUERY_TEXT,
    USER_NAME,
    WAREHOUSE_NAME,
    WAREHOUSE_SIZE,
    BYTES_SPILLED_TO_REMOTE_STORAGE,
    START_TIME,
    END_TIME,
    TOTAL_ELAPSED_TIME / 1000 AS TOTAL_ELAPSED_TIME_SEC
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE BYTES_SPILLED_TO_REMOTE_STORAGE > 0
  AND CAST(START_TIME AS DATE) > DATEADD(DAY, -45, CURRENT_DATE())
ORDER BY BYTES_SPILLED_TO_REMOTE_STORAGE DESC
LIMIT 10;
'''
run_sql_df(sql_top_spill_45d)


## 7. Actividad por minuto de dos warehouses (última hora)

Sustituye `WH_LARGE` y `WH_SMALL` por tus nombres reales.


In [None]:
# 7.1 Parámetros de ejemplo
WH_LARGE = "WH_LARGE"
WH_SMALL = "WH_SMALL"


In [None]:
# 7.2 Actividad por minuto (última hora) para ambos warehouses
sql_minute_activity = f'''
WITH warehouses AS (
    SELECT DISTINCT warehouse_name, warehouse_size
    FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
    WHERE START_TIME <= CURRENT_TIMESTAMP()
      AND END_TIME   >= DATEADD(HOUR, -1, CURRENT_TIMESTAMP())
      AND warehouse_name IN ('{WH_LARGE}', '{WH_SMALL}')
),
wh_up AS (
    SELECT warehouse_name, start_time, end_time
    FROM (
        SELECT
            warehouse_name,
            timestamp AS start_time,
            event_action,
            CASE
                WHEN event_action = 'RESUME_CLUSTER' THEN
                    LAG(timestamp) OVER (PARTITION BY warehouse_name ORDER BY timestamp)
                ELSE NULL
            END AS end_time
        FROM (
            SELECT
                eh.warehouse_name,
                eh.timestamp,
                eh.event_name,
                LEAD(eh.event_name)  OVER (PARTITION BY eh.warehouse_name ORDER BY eh.timestamp) AS event_action
            FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_EVENTS_HISTORY eh
            JOIN warehouses w ON w.warehouse_name = eh.warehouse_name
            WHERE eh.event_name IN ('WAREHOUSE_CONSISTENT','RESUME_CLUSTER','SUSPEND_CLUSTER')
        )
        WHERE event_name = 'WAREHOUSE_CONSISTENT'
    )
    WHERE event_action = 'RESUME_CLUSTER'
      AND start_time <= CURRENT_TIMESTAMP()
),
queries AS (
    SELECT q.warehouse_name, q.start_time, q.end_time, q.query_id
    FROM warehouses w
    JOIN SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q
      ON q.warehouse_name = w.warehouse_name
     AND q.cluster_number IS NOT NULL
     AND q.start_time <= CURRENT_TIMESTAMP()
     AND q.end_time   >= DATEADD(HOUR, -1, CURRENT_TIMESTAMP())
),
minutes AS (
    SELECT DATEADD(MINUTE, SEQ4(), DATEADD(HOUR, -1, CURRENT_TIMESTAMP())) AS minute_ts
    FROM TABLE(GENERATOR(ROWCOUNT => 60))
    WHERE DATEADD(MINUTE, SEQ4(), DATEADD(HOUR, -1, CURRENT_TIMESTAMP())) <= CURRENT_TIMESTAMP()
),
wh_minutes AS (
    SELECT
        w.warehouse_name,
        m.minute_ts,
        MAX(CASE WHEN wu.warehouse_name IS NULL THEN 0 ELSE 1 END) AS is_wh_up
    FROM minutes m
    CROSS JOIN warehouses w
    LEFT JOIN wh_up wu
      ON wu.warehouse_name = w.warehouse_name
     AND m.minute_ts BETWEEN wu.start_time AND COALESCE(wu.end_time, CURRENT_TIMESTAMP())
    GROUP BY w.warehouse_name, m.minute_ts
)
SELECT *
FROM (
    SELECT
        warehouse_name,
        minute_ts AS time,
        CASE WHEN is_wh_up = 0 THEN NULL ELSE COUNT(q.query_id) END AS query_count
    FROM wh_minutes w
    LEFT JOIN queries q
      ON q.warehouse_name = w.warehouse_name
     AND w.minute_ts BETWEEN q.start_time AND q.end_time
    GROUP BY warehouse_name, minute_ts, is_wh_up
)
PIVOT (SUM(query_count) FOR warehouse_name IN ('{WH_LARGE}', '{WH_SMALL}'))
ORDER BY 1;
'''
run_sql_df(sql_minute_activity)


## 8. Consultas menos eficientes por **spilling remoto**

Ordena por el ratio `BYTES_SPILLED_TO_REMOTE_STORAGE / BYTES_SCANNED` para priorizar *tuning*.


In [None]:
# 8.1 Orden por ratio de spilling remoto
sql_spill_remote_ratio = '''
SELECT
    QUERY_ID,
    USER_NAME,
    WAREHOUSE_NAME,
    WAREHOUSE_SIZE,
    BYTES_SCANNED,
    BYTES_SPILLED_TO_REMOTE_STORAGE,
    DIV0(BYTES_SPILLED_TO_REMOTE_STORAGE, BYTES_SCANNED) AS SPILLING_REMOTE_READ_RATIO
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE BYTES_SPILLED_TO_REMOTE_STORAGE > 0
ORDER BY SPILLING_REMOTE_READ_RATIO DESC;
'''
run_sql_df(sql_spill_remote_ratio)


## 9. Warehouses potencialmente sobredimensionados (últimos 3 meses)

Filtra casos con mucha actividad, sin *local spill* y con `WAREHOUSE_SIZE` distinto a `X-Small`.


In [None]:
# 9.1 Sobredimensionamiento potencial
sql_oversized_wh = '''
SELECT
    WAREHOUSE_NAME,
    WAREHOUSE_SIZE,
    MAX(START_TIME) AS LAST_USED,
    COUNT(1)        AS QUERY_COUNT
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE START_TIME >= DATEADD(MONTH, -3, CURRENT_DATE())
  AND CLUSTER_NUMBER IS NOT NULL
  AND WAREHOUSE_SIZE != 'X-Small'
GROUP BY 1, 2
HAVING SUM(BYTES_SPILLED_TO_LOCAL_STORAGE) = 0
   AND COUNT(1) > 100
ORDER BY LAST_USED DESC;
'''
run_sql_df(sql_oversized_wh)


## 10. Queries elegibles para **Query Acceleration Service (QAS)**

Agrupa por categoría (prefijo de texto) y calcula factores y tiempos de aceleración.


In [None]:
# 10.1 Candidatos a QAS en la última semana
QAS_USER_FILTER = ""   # p.ej., 'DATA_SCIENTIST'
QAS_WH_FILTER   = ""   # p.ej., 'ANALYTICS_WH'

sql_qas = f'''
SELECT
    LEFT(qh.QUERY_TEXT, 25) AS QUERY_CAT,
    qh.USER_NAME,
    qae.WAREHOUSE_NAME,
    COUNT(*) AS QUERY_COUNT,
    AVG(qae.UPPER_LIMIT_SCALE_FACTOR)           AS AVG_SCALE_FACTOR,
    AVG(ELIGIBLE_QUERY_ACCELERATION_TIME)       AS AVG_TIME_SAVINGS_MS,
    MAX(UPPER_LIMIT_SCALE_FACTOR)               AS MAX_SCALE_FACTOR,
    MIN(UPPER_LIMIT_SCALE_FACTOR)               AS MIN_SCALE_FACTOR,
    SUM(ELIGIBLE_QUERY_ACCELERATION_TIME)       AS TOTAL_ACCELERATION_TIME_MS
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE qae
JOIN SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
  ON qh.QUERY_ID = qae.QUERY_ID
WHERE (qae.WAREHOUSE_NAME = {("'" + QAS_WH_FILTER + "'") if QAS_WH_FILTER else "qae.WAREHOUSE_NAME"})
  AND (qh.USER_NAME      = {("'" + QAS_USER_FILTER + "'") if QAS_USER_FILTER else "qh.USER_NAME"})
  AND ELIGIBLE_QUERY_ACCELERATION_TIME > 120
  AND qae.START_TIME >= CURRENT_DATE() - 7
GROUP BY 1, 2, 3
ORDER BY TOTAL_ACCELERATION_TIME_MS DESC
LIMIT 1000;
'''
run_sql_df(sql_qas)


## 11. Prevención de consultas descontroladas

Configura `STATEMENT_TIMEOUT_IN_SECONDS` según tu política (cuenta/usuario/warehouse/sesión).

```sql
ALTER ACCOUNT SET STATEMENT_TIMEOUT_IN_SECONDS = 7200;  -- 2 horas
```
