In [43]:
import pandas
import os
import copy
import gzip
import math
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path
import yaml
import psycopg2
import psycopg2.extras

In [44]:
BASE_DIR = Path.cwd().parent   # notebook's parent directory
config_path = BASE_DIR / "config" / "postgres_to_csv_export.yaml"


os.environ["SNOWFLAKE_PASSWORD"] = "JimmyPage@1234"
os.environ["POSTGRES_PASSWORD"] = "postgres"


In [3]:
def load_config(config_path: str) -> Dict[str, Any]:
    try:
        with open(config_path, "r", encoding="utf-8") as f:
            config = yaml.safe_load(f)
            return config or {}
    except FileNotFoundError:
        raise FileNotFoundError(f"Config file not found: {config_path}")
    except yaml.YAMLError as e:
        raise ValueError(f"Invalid YAML in config file: {e}")

In [4]:
def validate_config(cfg: Dict[str, Any]) -> None:
    # Top-level keys
    required_top_keys = {"version", "source", "export", "tables"}
    missing = required_top_keys - cfg.keys()
    if missing:
        raise KeyError(f"Missing top-level keys: {', '.join(sorted(missing))}")

    # Validate source
    if not isinstance(cfg["source"], dict) or not cfg["source"]:
        raise ValueError("'source' must be a non-empty dictionary")

    for source_name, conn in cfg["source"].items():
        if not isinstance(conn, dict):
            raise TypeError(f"Source '{source_name}' must be a dictionary")
        for key in ("host", "port", "database", "user", "password_env", "schema"):
            if key not in conn:
                raise KeyError(f"Missing '{key}' in source '{source_name}'")

    # Validate export
    export = cfg["export"]
    if not isinstance(export, dict):
        raise TypeError("'export' must be a dictionary")
    for key in ("output_dir", "format"):
        if key not in export:
            raise KeyError(f"Missing '{key}' in 'export'")

    # Validate tables
    tables = cfg["tables"]
    if not isinstance(tables, list) or not tables:
        raise ValueError("'tables' must be a non-empty list")
    for i, table in enumerate(tables):
        if not isinstance(table, dict):
            raise TypeError(f"Table at index {i} must be a dictionary")
        if "name" not in table or "mode" not in table or "order_by" not in table:
            raise KeyError(f"Table {table.get('name', i)} missing required keys")
        if table["mode"] == "table" and "table" not in table:
            raise KeyError(f"Table {table['name']} missing 'table' key for mode 'table'")
        if table["mode"] == "query" and "query" not in table:
            raise KeyError(f"Table {table['name']} missing 'query' key for mode 'query'")


In [5]:
def validate_yaml_file(path: str) -> Dict[str, Any]:
    # Load YAML
    try:
        with open(path, "r", encoding="utf-8") as f:
            cfg = yaml.safe_load(f)
    except FileNotFoundError:
        raise FileNotFoundError(f"YAML file not found: {path}")
    except yaml.YAMLError as e:
        raise ValueError(f"Invalid YAML syntax: {e}")

    if not isinstance(cfg, dict):
        raise ValueError("YAML root must be a dictionary")

    validate_config(cfg)
    return cfg

In [6]:
cfg=load_config(config_path)

In [7]:
def resolve_secrets(cfg: Dict[str, Any]) -> Dict[str, Any]:
    """
    Return a copy of the config with secrets resolved.
    
    Replaces any key ending with '_env' with the actual environment variable value,
    and renames the key by removing '_env' (e.g., 'password_env' -> 'password').

    Raises KeyError if the environment variable is not set.
    """

    def _resolve(obj: Any) -> Any:
        if isinstance(obj, dict):
            resolved = {}
            for k, v in obj.items():
                if k.endswith("_env") and isinstance(v, str):
                    env_var = v
                    secret_value = os.getenv(env_var)
                    if secret_value is None:
                        raise KeyError(f"Environment variable '{env_var}' not set")
                    new_key = k[:-4]  # remove '_env'
                    resolved[new_key] = secret_value
                else:
                    resolved[k] = _resolve(v)  # recurse for nested dicts/lists
            return resolved
        elif isinstance(obj, list):
            return [_resolve(item) for item in obj]
        else:
            return obj

    return _resolve(copy.deepcopy(cfg))

In [8]:
import psycopg2
from psycopg2.extensions import connection as Psycopg2Connection
from typing import Dict

def create_pg_connection(conn_cfg: Dict[str, str]) -> Psycopg2Connection:
    """
    Create a PostgreSQL connection using psycopg2.

    conn_cfg must include:
        host, port, database, user, password, schema (optional)
    """
    conn = psycopg2.connect(
        host=conn_cfg["host"],
        port=conn_cfg.get("port", 5432),
        dbname=conn_cfg["database"],
        user=conn_cfg["user"],
        password=conn_cfg["password"]
    )

    # Optionally set search_path if schema is provided
    if "schema" in conn_cfg and conn_cfg["schema"]:
        with conn.cursor() as cur:
            cur.execute(f"SET search_path TO {conn_cfg['schema']};")
            conn.commit()

    return conn

In [9]:
cfg_pwd=resolve_secrets(cfg)

In [10]:
conn=create_pg_connection(cfg_pwd["source"]['postgres'])

In [11]:
from typing import List
import psycopg2

def get_table_columns(conn: psycopg2.extensions.connection, schema: str, table: str) -> List[str]:
    """
    Fetch column names for a given schema.table in PostgreSQL.

    Args:
        conn: psycopg2 connection
        schema: schema name (e.g., 'public')
        table: table name (e.g., 'hotel')

    Returns:
        List of column names in order.
    """
    query = """
    SELECT column_name
    FROM information_schema.columns
    WHERE table_schema = %s AND table_name = %s
    ORDER BY ordinal_position;
    """
    with conn.cursor() as cur:
        cur.execute(query, (schema, table))
        columns = [row[0] for row in cur.fetchall()]
    return columns

In [12]:
def get_table_pk(conn: psycopg2.extensions.connection, schema: str, table: str) -> Optional[List[str]]:
    """
    Fetch the primary key column names for a given schema.table in PostgreSQL.

    Args:
        conn: psycopg2 connection
        schema: schema name (e.g., 'public')
        table: table name (e.g., 'hotel')

    Returns:
        List of primary key column names in order, or None if table has no PK.
    """
    query = """
    SELECT kcu.column_name
    FROM information_schema.table_constraints tc
    JOIN information_schema.key_column_usage kcu
      ON tc.constraint_name = kcu.constraint_name
      AND tc.table_schema = kcu.table_schema
    WHERE tc.table_schema = %s
      AND tc.table_name = %s
      AND tc.constraint_type = 'PRIMARY KEY'
    ORDER BY kcu.ordinal_position;
    """
    with conn.cursor() as cur:
        cur.execute(query, (schema, table))
        pk_columns = [row[0] for row in cur.fetchall()]

    return pk_columns if pk_columns else None

In [13]:
import psycopg2
from typing import Optional

def estimate_rowcount(conn: psycopg2.extensions.connection, sql: str) -> int:
    """
    Estimate the number of rows a SQL query will return.
    Uses COUNT(*) wrapped around the query.

    Args:
        conn: psycopg2 connection
        sql: SQL query (string)

    Returns:
        Estimated row count (int)
    """
    # Wrap the original query as a subquery
    count_sql = f"SELECT COUNT(*) FROM ({sql}) AS subquery"
    try:
        with conn.cursor() as cur:
            cur.execute(count_sql)
            rowcount = cur.fetchone()[0]
    except Exception:
        conn.rollback()
        raise
    return rowcount

In [14]:
from typing import Dict, List

def build_base_query(table_cfg: Dict, schema_default: str, columns: List[str] = None) -> str:
    """
    Build the base SQL query for a table export.

    Args:
        table_cfg: dictionary containing table config from YAML
                   Must include 'mode' and either 'table' or 'query'
        schema_default: default schema to use if table_cfg does not specify one
        columns: optional list of columns to select (for mode='table')

    Returns:
        SQL string (no trailing semicolon)
    """
    mode = table_cfg.get("mode")
    
    if mode == "table":
        table_name = table_cfg.get("table")
        if not table_name:
            raise KeyError(f"Table config '{table_cfg.get('name')}' missing 'table' key for mode='table'")
        
        schema = table_cfg.get("schema", schema_default)
        
        # Use all columns if not specified
        cols_sql = ", ".join(columns) if columns else "*"
        
        sql = f"SELECT {cols_sql} FROM {schema}.{table_name}"
        return sql

    elif mode == "query":
        query = table_cfg.get("query")
        if not query:
            raise KeyError(f"Table config '{table_cfg.get('name')}' missing 'query' key for mode='query'")
        return query.rstrip().rstrip(";")  # strip trailing semicolon

    else:
        raise ValueError(f"Unknown mode '{mode}' for table '{table_cfg.get('name')}'")


In [15]:
cols=get_table_columns(conn,'public','hotel')

In [16]:
base_query=build_base_query(cfg_pwd['tables'][0], 'public',cols)

In [17]:
from typing import Dict, List

def apply_filters(base_sql: str, filters_cfg: Dict[str, str]) -> str:
    """
    Apply filters to a base SQL query by adding a WHERE clause.

    Args:
        base_sql: the base SQL query (without trailing semicolon)
        filters_cfg: dictionary of filters, e.g.
                     {"checkin_date__gte": "2025-01-01",
                      "checkin_date__lt": "2026-01-01",
                      "status": "confirmed"}

    Returns:
        SQL string with WHERE clauses applied.
    """
    if not filters_cfg:
        return base_sql.rstrip().rstrip(";")  # no filters

    conditions: List[str] = []

    for key, value in filters_cfg.items():
        if key.endswith("__gte"):
            col = key[:-5]
            conditions.append(f"{col} >= '{value}'")
        elif key.endswith("__lte"):
            col = key[:-5]
            conditions.append(f"{col} <= '{value}'")
        elif key.endswith("__gt"):
            col = key[:-4]
            conditions.append(f"{col} > '{value}'")
        elif key.endswith("__lt"):
            col = key[:-4]
            conditions.append(f"{col} < '{value}'")
        else:
            # exact match
            conditions.append(f"{key} = '{value}'")

    where_clause = " AND ".join(conditions)

    # Determine if base_sql already has WHERE
    if " where " in base_sql.lower():
        return f"{base_sql} AND {where_clause}"
    else:
        return f"{base_sql} WHERE {where_clause}"


In [18]:
filters_cfg={"created_at__gte": "2026-01-01",
                      "created_at__lt": "2026-02-01"}
base_query_filters=apply_filters(base_query, filters_cfg)

In [19]:
from typing import Dict

def apply_partition_clause(base_sql: str, partition_spec: Dict, partition_value: str) -> str:
    """
    Apply a partition filter to a base SQL query.

    Args:
        base_sql: base SQL string (no trailing semicolon)
        partition_spec: dictionary describing the partition, e.g.
                        {
                            "type": "date_range",
                            "column": "checkin_date",
                            "granularity": "month"
                        }
        partition_value: string representing the partition value, e.g. "2025-01" or "2025-01-15"

    Returns:
        SQL string with WHERE clause for this partition applied
    """
    column = partition_spec.get("column")
    if not column:
        raise KeyError("partition_spec must have a 'column' key")

    conditions = []

    # Determine partition type
    if partition_spec.get("type") == "date_range":
        granularity = partition_spec.get("granularity", "day")

        if granularity == "month":
            # partition_value = "YYYY-MM"
            start = f"{partition_value}-01"
            from datetime import datetime, timedelta
            # compute next month start for < condition
            start_date = datetime.strptime(start, "%Y-%m-%d")
            # handle year/month increment
            if start_date.month == 12:
                end_date = start_date.replace(year=start_date.year+1, month=1)
            else:
                end_date = start_date.replace(month=start_date.month+1)
            end_str = end_date.strftime("%Y-%m-%d")
            conditions.append(f"{column} >= '{start}'")
            conditions.append(f"{column} < '{end_str}'")

        elif granularity == "day":
            # partition_value = "YYYY-MM-DD"
            from datetime import datetime, timedelta
            start_date = datetime.strptime(partition_value, "%Y-%m-%d")
            end_date = start_date + timedelta(days=1)
            conditions.append(f"{column} >= '{partition_value}'")
            conditions.append(f"{column} < '{end_date.strftime('%Y-%m-%d')}'")

        else:
            raise ValueError(f"Unsupported granularity '{granularity}'")

    else:
        raise ValueError(f"Unsupported partition type '{partition_spec.get('type')}'")

    partition_clause = " AND ".join(conditions)

    # Determine if base_sql already has WHERE
    if " where " in base_sql.lower():
        return f"{base_sql} AND {partition_clause}"
    else:
        return f"{base_sql} WHERE {partition_clause}"


In [20]:
partition_spec= {"type": "date_range",
                            "column": "created_at",
                            "granularity": "month" }
partition_value = "2026-01"
base_query_filters_p=apply_partition_clause(base_query_filters, partition_spec, partition_value)


In [21]:
from typing import Optional

def apply_ordering(base_sql: str, order_by: Optional[str]) -> str:
    """
    Apply ORDER BY clause to the base SQL for deterministic export.

    Args:
        base_sql: SQL query (string, no trailing semicolon)
        order_by: column name(s) to order by, e.g., "id" or "id, name"

    Returns:
        SQL string with ORDER BY clause applied.
    """
    if not order_by:
        # No ordering specified, return as-is
        return base_sql.rstrip().rstrip(";")
      
    # Determine if base_sql already has ORDER BY
    if " order by " in base_sql.lower():
        # Already has ORDER BY, skip or append? Here we leave as-is
        return base_sql.rstrip().rstrip(";")
    
    return f"{base_sql} ORDER BY {order_by}"

In [22]:
order_by=cfg_pwd["tables"][0]['order_by']

In [23]:
master_query=apply_ordering(base_query_filters, order_by)

In [24]:
from typing import List, Dict
from datetime import datetime, timedelta

def generate_partitions(partition_spec: Dict) -> List[Dict[str, str]]:
    """
    Generate partitions based on a partition specification.

    Args:
        partition_spec: dictionary with keys like:
            - type: "date_range"
            - column: column to partition by (ignored here)
            - granularity: "month" or "day"
            - start: "YYYY-MM-DD"
            - end: "YYYY-MM-DD"

    Returns:
        List of dicts with keys:
            - suffix: string for file naming (e.g., "2025-01")
            - start_date: partition start date
            - end_date: partition end date
    """
    if partition_spec.get("type") != "date_range":
        raise ValueError("Only date_range partitions are supported")

    start_str = partition_spec["start"]
    end_str = partition_spec["end"]
    granularity = partition_spec.get("granularity", "month")

    start_date = datetime.strptime(start_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_str, "%Y-%m-%d")

    partitions = []

    current = start_date
    while current < end_date:
        if granularity == "month":
            # Compute first day of next month
            if current.month == 12:
                next_partition = current.replace(year=current.year + 1, month=1)
            else:
                next_partition = current.replace(month=current.month + 1)
            suffix = current.strftime("%Y-%m")
        elif granularity == "day":
            next_partition = current + timedelta(days=1)
            suffix = current.strftime("%Y-%m-%d")
        else:
            raise ValueError(f"Unsupported granularity '{granularity}'")

        # Ensure end_date of partition does not exceed overall end
        partition_end = min(next_partition, end_date)

        partitions.append({
            "suffix": suffix,
            "start_date": current.strftime("%Y-%m-%d"),
            "end_date": partition_end.strftime("%Y-%m-%d")
        })

        current = next_partition

    return partitions


In [25]:
from typing import List, Dict

def plan_file_splits(rowcount: int, max_rows_per_file: int) -> List[Dict[str, int]]:
    """
    Plan file splits for a given rowcount and max rows per file.

    Args:
        rowcount: total number of rows in the partition
        max_rows_per_file: maximum rows per output file

    Returns:
        List of dicts with keys:
            - start_row: 0-based inclusive start
            - end_row: exclusive end
    """
    if rowcount <= 0:
        return []

    chunks = []
    start = 0

    while start < rowcount:
        end = min(start + max_rows_per_file, rowcount)
        chunks.append({"start_row": start, "end_row": end})
        start = end

    return chunks

In [26]:
rowcount=estimate_rowcount(conn,master_query)

In [27]:
rowcount

2

In [28]:
chunk=plan_file_splits(rowcount, 50000)

In [29]:
from typing import Dict, Optional

def build_chunk_query(
    sql: str,
    order_by: Optional[str],
    chunk: Dict[str, int]
) -> str:
    """
    Build a SQL query for a specific chunk of rows.

    Args:
        sql: base SQL (should include WHERE/filters/partition/order)
        order_by: column(s) used for deterministic ordering
        chunk: dict with 'start_row' (inclusive) and 'end_row' (exclusive)

    Returns:
        SQL string with ORDER BY + OFFSET/LIMIT applied
    """
    if not order_by:
        raise ValueError("order_by must be specified for chunking")

    start = chunk["start_row"]
    limit = chunk["end_row"] - chunk["start_row"]
    
    # Ensure SQL has ORDER BY
    sql_ordered = sql if " order by " in sql.lower() else f"{sql} ORDER BY {order_by}"

    return f"{sql_ordered} OFFSET {start} LIMIT {limit}"

In [30]:
final_query=build_chunk_query(master_query,order_by,chunk[0])

In [31]:
import tempfile

def export_query_to_temp_csv_gz(conn, sql: str, *, delimiter: str = ",", header: bool = True, null_as: str = "") -> Path:
    """
    Writes a query result to a TEMP .csv.gz file using Postgres COPY for speed.
    Returns the temp file path.
    """
    sql = sql.strip().rstrip(";")

    copy_sql = f"COPY ({sql}) TO STDOUT WITH CSV DELIMITER '{delimiter}'"
    if header:
        copy_sql += " HEADER"
    if null_as != "":
        copy_sql += f" NULL '{null_as}'"

    # Temporary file path
    tmp_dir = Path(tempfile.mkdtemp(prefix="pg_to_sf_"))
    out_path = tmp_dir / "extract.csv.gz"

    with gzip.open(out_path, "wt", encoding="utf-8", newline="") as f_out:
        with conn.cursor() as cur:
            cur.copy_expert(copy_sql, f_out)

    return out_path

In [32]:
export_query_to_temp_csv_gz(conn,final_query)

WindowsPath('C:/Users/ALEKYA~1/AppData/Local/Temp/pg_to_sf_z4lcqyc8/extract.csv.gz')

In [33]:
from __future__ import annotations

import os
import shlex
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional

import yaml

In [34]:
@dataclass(frozen=True)
class SnowflakeCreds:
    account: str
    user: str
    role: Optional[str]
    warehouse: Optional[str]
    database: Optional[str]
    schema: Optional[str]
    password: Optional[str]  # use password_env in YAML (recommended)


In [35]:
from pathlib import Path
import os
import yaml

def load_snowflake_creds_from_yaml(
    yaml_path: str | Path,
    key_path: str = "snowflake"
) -> SnowflakeCreds:
    """
    Reads Snowflake credentials from a YAML file and returns a SnowflakeCreds object.
    """
    yaml_path = Path(yaml_path)
    cfg = yaml.safe_load(yaml_path.read_text(encoding="utf-8"))

    node = cfg
    for key in key_path.split("."):
        if key not in node:
            raise KeyError(f"Missing config path: {key_path}")
        node = node[key]

    def req(k: str) -> str:
        v = node.get(k)
        if not v or not isinstance(v, str):
            raise ValueError(f"Missing/invalid '{key_path}.{k}' in YAML")
        return v

    account = req("account")
    user = req("user")

    password = None
    password_env = node.get("password_env")
    if password_env:
        password = os.environ.get(password_env)
        if not password:
            raise ValueError(
                f"Env var '{password_env}' is not set (needed for Snowflake password)."
            )
    else:
        password = node.get("password")

    return SnowflakeCreds(
        account=account,
        user=user,
        password=password,
        role=node.get("role"),
        warehouse=node.get("warehouse"),
        database=node.get("database"),
        schema=node.get("schema"),
    )


In [36]:
creds=load_snowflake_creds_from_yaml(config_path, key_path="export.snowflake")

In [37]:
# -----------------------------
# 3) SnowSQL helpers (PUT + COPY)
# -----------------------------
def _snowsql_base_cmd(creds: SnowflakeCreds, snowsql_path: str = "snowsql") -> list[str]:
    cmd = [
        snowsql_path,
        "-a", creds.account,
        "-u", creds.user,
        "-o", "exit_on_error=true",
        "-o", "friendly=false",
        "-o", "quiet=true",
    ]
    return cmd


def _run_snowsql(creds: SnowflakeCreds, sql: str, *, snowsql_path: str = "snowsql", timeout_sec: int = 300) -> str:
    env = os.environ.copy()
    if creds.password:
        env["SNOWSQL_PWD"] = creds.password

    cmd = _snowsql_base_cmd(creds, snowsql_path=snowsql_path) + ["-q", sql]

    proc = subprocess.run(
        cmd,
        env=env,
        capture_output=True,
        text=True,
        timeout=timeout_sec,
    )
    if proc.returncode != 0:
        raise RuntimeError(
            "SnowSQL command failed.\n"
            f"Command: {shlex.join(cmd)}\n"
            f"STDOUT:\n{proc.stdout}\n"
            f"STDERR:\n{proc.stderr}"
        )
    return proc.stdout

In [38]:
def put_file_to_stage(
    creds: SnowflakeCreds,
    local_path: Path,
    stage_fqn: str,
    *,
    overwrite: bool = True,
    auto_compress: bool = False,
    parallel: int = 4,
    snowsql_path: str = "snowsql",
) -> str:
    """
    Uploads a local file into an INTERNAL stage via PUT.
    Note: PUT *requires* a local file path (we keep it temp and delete after).
    """
    if not stage_fqn.startswith("@"):
        raise ValueError("stage_fqn must start with '@' (e.g. @DB.SCHEMA.LANDING_STAGE)")

    session = []
    if creds.role:
        session.append(f"USE ROLE {creds.role}")
    if creds.warehouse:
        session.append(f"USE WAREHOUSE {creds.warehouse}")
    if creds.database:
        session.append(f"USE DATABASE {creds.database}")
    if creds.schema:
        session.append(f"USE SCHEMA {creds.schema}")

    put_sql = (
        f"PUT 'file://{local_path.as_posix()}' {stage_fqn} "
        f"OVERWRITE={'TRUE' if overwrite else 'FALSE'} "
        f"AUTO_COMPRESS={'TRUE' if auto_compress else 'FALSE'} "
        f"PARALLEL={int(parallel)}"
    )
    full_sql = "; ".join(session + [put_sql]) + ";"
    return _run_snowsql(creds, full_sql, snowsql_path=snowsql_path)


In [39]:
def copy_stage_to_raw_table(
    creds: SnowflakeCreds,
    stage_fqn: str,
    stage_file_name: str,
    raw_table_fqn: str,
    file_format_fqn: str,
    *,
    on_error: str = "ABORT_STATEMENT",
    force: bool = True,
    snowsql_path: str = "snowsql",
) -> str:
    """
    Loads a specific staged file into a RAW table using COPY INTO.
    stage_file_name is the basename that was PUT (e.g. 'booking_2025_01_part_001.csv.gz').
    """
    session = []
    if creds.role:
        session.append(f"USE ROLE {creds.role}")
    if creds.warehouse:
        session.append(f"USE WAREHOUSE {creds.warehouse}")
    if creds.database:
        session.append(f"USE DATABASE {creds.database}")
    if creds.schema:
        session.append(f"USE SCHEMA {creds.schema}")

    copy_sql = f"""
    COPY INTO {raw_table_fqn}
    FROM {stage_fqn}/{stage_file_name}
    FILE_FORMAT = (FORMAT_NAME = '{file_format_fqn}')
    ON_ERROR = {on_error}
    FORCE = {'TRUE' if force else 'FALSE'};
    """.strip()

    full_sql = "; ".join(session + [copy_sql])
    return _run_snowsql(creds, full_sql, snowsql_path=snowsql_path, timeout_sec=600)


In [40]:
# -----------------------------
# 4) End-to-end: Postgres -> Stage -> RAW (temp files deleted)
# -----------------------------
def postgres_query_to_snowflake_raw(
    pg_conn,
    *,
    sql: str,
    creds: SnowflakeCreds,
    stage_fqn: str,             # e.g. "@HOTEL_ANALYTICS.RAW.LANDING_STAGE"
    raw_table_fqn: str,         # e.g. "HOTEL_ANALYTICS.RAW.BOOKING"
    file_format_fqn: str,       # e.g. "HOTEL_ANALYTICS.RAW.CSV_FMT"
    target_filename: str,       # e.g. "booking_2025_01_part_001.csv.gz"
    delimiter: str = ",",
    header: bool = True,
    null_as: str = "",
    snowsql_path: str = "snowsql",
) -> None:
    """
    1) Export query to TEMP csv.gz
    2) PUT temp file to stage as target_filename
    3) COPY INTO RAW table
    4) Delete temp file + temp dir
    """
    tmp_file = None
    try:
        # 1) Export to temp file
        tmp_file = export_query_to_temp_csv_gz(pg_conn, sql, delimiter=delimiter, header=header, null_as=null_as)

        # Rename to the desired final file name (so stage has predictable names)
        final_path = tmp_file.parent / target_filename
        tmp_file.rename(final_path)

        # 2) PUT to stage
        put_file_to_stage(
            creds,
            local_path=final_path,
            stage_fqn=stage_fqn,
            snowsql_path=snowsql_path,
            overwrite=True,
            auto_compress=False,  # file already gz
        )

        # 3) COPY INTO RAW
        copy_stage_to_raw_table(
            creds,
            stage_fqn=stage_fqn,
            stage_file_name=target_filename,
            raw_table_fqn=raw_table_fqn,
            file_format_fqn=file_format_fqn,
            snowsql_path=snowsql_path,
        )

    finally:
        # 4) Cleanup temp files (your “no local storage” requirement)
        if tmp_file is not None:
            tmp_dir = tmp_file.parent
            # delete all files we created in that temp dir
            for p in tmp_dir.glob("*"):
                try:
                    p.unlink()
                except Exception:
                    pass
            try:
                tmp_dir.rmdir()
            except Exception:
                pass


In [42]:
    postgres_query_to_snowflake_raw(
        conn,
        sql=final_query,
        creds=creds,
        stage_fqn="@HOTEL_ANALYTICS.RAW.LANDING_STAGE",
        raw_table_fqn="HOTEL_ANALYTICS.RAW.HOTEL",
        file_format_fqn="HOTEL_ANALYTICS.RAW.CSV_FMT",
        target_filename="extract.csv.gz",
        snowsql_path="snowsql",  # or r"C:\path\to\snowsql.exe"
    )


RuntimeError: SnowSQL command failed.
Command: snowsql -a BAJLLOB-CN38560 -u ALEKYAKASTURY -o exit_on_error=true -o friendly=false -o quiet=true -q 'USE ROLE ACCOUNTADMIN; USE DATABASE HOTEL_ANALYTICS; USE SCHEMA RAW; COPY INTO HOTEL_ANALYTICS.RAW.HOTEL
    FROM @HOTEL_ANALYTICS.RAW.LANDING_STAGE/extract.csv.gz
    FILE_FORMAT = (FORMAT_NAME = '"'"'HOTEL_ANALYTICS.RAW.CSV_FMT'"'"')
    ON_ERROR = ABORT_STATEMENT
    FORCE = TRUE;'
STDOUT:

STDERR:
001757 (42601): SQL compilation error:
Table 'HOTEL_ANALYTICS.RAW.HOTEL' does not exist
