Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
**/.DS_Store

# Temp
preparation/
/preparation/

# Binder
.bash_logout
Expand Down Expand Up @@ -152,3 +152,9 @@ dmypy.json
*_spark/
*_pipeline/
.vscode/

# LLMs
copilot-instructions.md
CLAUDE.md
.claude
.mcp.json
48 changes: 48 additions & 0 deletions integration/snowflake/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Data operations for getML Feature Store integration with Snowflake.

When settings are provided to data loading and preparation functions,
infrastructure (warehouse, database) is automatically bootstrapped if needed.

Usage example:
from data import (
SnowflakeSettings,
create_session,
ingestion,
)

settings = SnowflakeSettings.from_env()

with create_session(settings) as session:
ingestion.load_from_gcs(session, settings=settings)
"""

from snowflake.snowpark.exceptions import SnowparkSessionException

from ._bootstrap import (
BootstrapError,
ensure_infrastructure,
)
from ._settings import SnowflakeSettings
from ._snowflake_session import create_session
from ._sql_loader import load_sql
from .ingestion import (
DEFAULT_GCS_BUCKET,
JAFFLE_SHOP_TABLE_NAMES,
DataIngestionError,
load_from_gcs,
load_from_s3,
)

__all__ = [
"DEFAULT_GCS_BUCKET",
"JAFFLE_SHOP_TABLE_NAMES",
"BootstrapError",
"DataIngestionError",
"SnowflakeSettings",
"SnowparkSessionException",
"create_session",
"ensure_infrastructure",
"load_from_gcs",
"load_from_s3",
"load_sql",
]
182 changes: 182 additions & 0 deletions integration/snowflake/data/_bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Bootstrap Snowflake infrastructure for the Jaffle Shop dataset.

Creates the required warehouse and database.
All operations are idempotent using CREATE ... IF NOT EXISTS.

The main entry point is `ensure_infrastructure()` which auto-detects and creates
missing resources. It is called automatically by data loading functions when
settings are provided.

Usage example:
from data import SnowflakeSettings, create_session, ensure_infrastructure

settings = SnowflakeSettings.from_env()

with create_session(settings) as session:
ensure_infrastructure(session, settings)
# Warehouse and database are now guaranteed to exist
"""

# ruff: noqa: G004

from __future__ import annotations

import logging

from snowflake.snowpark import Row, Session
from snowflake.snowpark.exceptions import SnowparkSQLException

from ._settings import SnowflakeSettings
from ._sql_loader import load_sql

logger: logging.Logger = logging.getLogger(__name__)

WAREHOUSE_SIZE = "X-SMALL"
AUTO_SUSPEND_SECONDS = 60


class BootstrapError(Exception):
"""Raised when bootstrap operations fail."""


# =============================================================================
# Internal Helpers
# =============================================================================


def _warehouse_exists(session: Session, warehouse_name: str) -> bool:
"""Check if a warehouse exists.

Args:
session: Active Snowflake Snowpark session.
warehouse_name: Name of the warehouse to check.

Returns:
True if warehouse exists, False otherwise.
"""
try:
rows: list[Row] = session.sql(
f"SHOW WAREHOUSES LIKE '{warehouse_name}'"
).collect()
return len(rows) > 0
except SnowparkSQLException:
return False


def _database_exists(session: Session, database_name: str) -> bool:
"""Check if a database exists.

Args:
session: Active Snowflake Snowpark session.
database_name: Name of the database to check.

Returns:
True if database exists, False otherwise.
"""
try:
rows: list[Row] = session.sql(
f"SHOW DATABASES LIKE '{database_name}'"
).collect()
return len(rows) > 0
except SnowparkSQLException:
return False


# =============================================================================
# Core Bootstrap Functions
# =============================================================================


def ensure_infrastructure(
session: Session,
settings: SnowflakeSettings,
) -> None:
"""Ensure warehouse and database exist, creating if missing.

This is the main entry point for auto-bootstrapping. It checks if the
warehouse and database specified in settings exist, and creates them
if they don't. All operations are idempotent.

Args:
session: Active Snowflake Snowpark session.
settings: Settings containing warehouse and database names.

Raises:
BootstrapError: If required settings are missing or if resource
creation fails (e.g., due to insufficient privileges).

Example:
from data import SnowflakeSettings, create_session, ensure_infrastructure

settings = SnowflakeSettings.from_env()
with create_session(settings) as session:
ensure_infrastructure(session, settings)
# Warehouse and database are now guaranteed to exist
"""
if not settings.warehouse:
raise BootstrapError("warehouse must be specified in settings for bootstrap")
if not settings.database:
raise BootstrapError("database must be specified in settings for bootstrap")

warehouse_ready = _warehouse_exists(session, settings.warehouse)
database_ready = _database_exists(session, settings.database)

if warehouse_ready and database_ready:
logger.debug(
"Infrastructure already exists: warehouse=%s, database=%s",
settings.warehouse,
settings.database,
)
return

try:
if not warehouse_ready:
_create_warehouse(session, warehouse_name=settings.warehouse)
if not database_ready:
_create_database(session, database_name=settings.database)
logger.info("Snowflake infrastructure ready")
except Exception as e:
raise BootstrapError(
"Cannot create Snowflake infrastructure. "
"Your current role may lack CREATE WAREHOUSE or CREATE DATABASE "
"privilege.\n\n"
"Options:\n"
" 1. Ask an administrator to create the warehouse and database\n"
" 2. Switch to a role with CREATE WAREHOUSE/DATABASE privilege "
"(e.g., ACCOUNTADMIN)\n"
" 3. Create resources manually before running the pipeline\n\n"
f"Original error: {e}"
) from e


def _create_warehouse(session: Session, warehouse_name: str) -> None:
"""Create warehouse with X-SMALL size and auto-suspend enabled."""
logger.info(f"Creating warehouse '{warehouse_name}' if not exists...")

sql: str = load_sql(
path="bootstrap/create_warehouse.sql",
warehouse_name=warehouse_name,
warehouse_size=WAREHOUSE_SIZE,
auto_suspend_seconds=str(AUTO_SUSPEND_SECONDS),
)
_ = session.sql(query=sql).collect()

logger.info(f"✓ Warehouse '{warehouse_name}' ready")


def _create_database(session: Session, database_name: str) -> None:
"""Create database if it doesn't exist."""
logger.info(f"Creating database '{database_name}' if not exists...")

sql: str = load_sql(
path="bootstrap/create_database.sql", database_name=database_name
)
_ = session.sql(query=sql).collect()

logger.info(f"✓ Database '{database_name}' ready")


__all__ = [
"BootstrapError",
"ensure_infrastructure",
]
53 changes: 53 additions & 0 deletions integration/snowflake/data/_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Snowflake authentication and connection settings.

Provides typed configuration that automatically loads from environment
variables with the SNOWFLAKE_ prefix.

Usage example:
from data import SnowflakeSettings, create_session

# Auto-loads from SNOWFLAKE_* environment variables
settings = SnowflakeSettings.from_env()

with create_session(settings) as session:
session.sql("SELECT 1").collect()
"""

from typing import ClassVar, Self

from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class SnowflakeSettings(BaseSettings):
"""Settings for Snowflake operations.

Core fields (account, user, password, role) are required for all connections.
Optional fields (warehouse, database, schema_name) can be omitted for
administrative operations like creating warehouses or databases.
"""

model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
env_prefix="SNOWFLAKE_",
case_sensitive=False,
)

account: str
user: str
password: SecretStr
role: str
warehouse: str
database: str
# Can't use field name "schema" (Pydantic reserved);
# validation_alias maps standard SNOWFLAKE_SCHEMA env var to this field
schema_name: str = Field(
validation_alias="SNOWFLAKE_SCHEMA",
)

@classmethod
def from_env(cls) -> Self:
"""Create settings instance from environment variables.

Loads configuration from SNOWFLAKE_* environment variables.
"""
return cls.model_validate({})
54 changes: 54 additions & 0 deletions integration/snowflake/data/_snowflake_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Snowflake session management for getML Feature Store integration.

This module provides a factory function for creating Snowflake Snowpark sessions.
Sessions support the context manager protocol for automatic cleanup.

Usage example:
from data import SnowflakeSettings, create_session

with create_session(SnowflakeSettings.from_env()) as session:
session.sql("SELECT * FROM mytable").collect()
"""

import logging

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import SnowparkSessionException

from ._settings import SnowflakeSettings

logger: logging.Logger = logging.getLogger(__name__)


def create_session(settings: SnowflakeSettings) -> Session:
"""Create a Snowflake Snowpark session.

Connects using the provided settings.

Raises:
SnowparkSessionException: If connection to Snowflake fails.
"""
connection_params: dict[str, str | int] = {
"account": settings.account,
"user": settings.user,
"password": settings.password.get_secret_value(),
"role": settings.role,
"warehouse": settings.warehouse,
"database": settings.database,
"schema": settings.schema_name,
}

return _establish_session(connection_params)


def _establish_session(connection_params: dict[str, str | int]) -> Session:
"""Create a Snowpark session with the given parameters."""
try:
return Session.builder.configs(connection_params).create()
except Exception as e:
raise SnowparkSessionException(
f"Failed to create Snowflake session. "
f"Account: {connection_params.get('account')}, "
f"User: {connection_params.get('user')}. "
f"Error: {e}"
) from e
Loading