# Snowflake to BigQuery SQL Translation

This notebook translates SQL queries from Snowflake to BigQuery using the Datafold translation service.

In [None]:
# FOR LOCAL DEVELOPMENT, NOT NEEDED ON DATABRICKS
import os
import pathlib
from dotenv import load_dotenv
load_dotenv()

LOCAL_DATABRICKS_NOTEBOOK_PATH = os.getenv('LOCAL_DATABRICKS_NOTEBOOK_PATH')
if LOCAL_DATABRICKS_NOTEBOOK_PATH and pathlib.Path(LOCAL_DATABRICKS_NOTEBOOK_PATH).exists():
    print(f"Installing databricks-notebook from {LOCAL_DATABRICKS_NOTEBOOK_PATH}")
    %pip install --editable "{LOCAL_DATABRICKS_NOTEBOOK_PATH}"
    
    # Restart to make dependencies available
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
# Install dependencies
%pip install --force-reinstall --no-cache-dir git+https://github.com/datafold/databricks-notebook.git

# Restart to make dependencies available
%restart_python

In [3]:
import time
from typing import List, Dict, Tuple
from databricks_notebook.utils import prepare_api_url, prepare_headers, post_data, get_data
from databricks_notebook import (
    create_organization,
    TranslationJobStatus,
    TranslationStatus,
    _get_host,
    _get_current_api_key,
    _set_current_api_key,
    _translation_results_html,
    DEFAULT_HOST,
    DEFAULT_ORG_TOKEN
)

def translate_snowflake_to_bigquery(
    queries: List[str],
    org_token: str | None = None,
    host: str | None = None,
) -> None:
    """
    Translate SQL queries from Snowflake to BigQuery and render results.

    Args:
        queries: List of Snowflake SQL queries to translate
        org_token: Organization token for authentication (defaults to DEFAULT_ORG_TOKEN)
        host: Host URL for Datafold instance (defaults to DEFAULT_HOST)
    """
    # Use default org token if not provided
    if org_token is None:
        org_token = DEFAULT_ORG_TOKEN

    # Get or create API key
    api_key = _get_current_api_key(org_token, host)
    if api_key is None:
        raise ValueError(
            "API key is not set. Please call create_organization or set the API key manually."
        )
    
    # Get host
    host = _get_host(host)
    
    # Create DMA project for Snowflake -> BigQuery
    data_sources = _get_data_sources(api_key, host)
    
    # Find Snowflake source and BigQuery target
    snowflake_ds = next((d for d in data_sources if d['type'] == 'snowflake'), None)
    bigquery_ds = next((d for d in data_sources if d['type'] == 'bigquery'), None)
    
    if snowflake_ds is None:
        raise ValueError("No Snowflake data source found. Please configure a Snowflake data source.")
    if bigquery_ds is None:
        raise ValueError("No BigQuery data source found. Please configure a BigQuery data source.")
    
    source_data_source_id = snowflake_ds['id']
    target_data_source_id = bigquery_ds['id']
    
    project = _create_dma_project(
        api_key, source_data_source_id, target_data_source_id, 'Snowflake to BigQuery Translation', host
    )
    project_id = project['id']
    print(f"✓ Project created with id {project_id}")

    # Upload queries to translate
    _upload_queries(host=host, api_key=api_key, project_id=project_id, queries=queries)
    print(f"✓ Queries uploaded")

    # Start translating queries
    translation_id = _start_translation(api_key, project_id, host)
    print(f"✓ Started translation with id {translation_id}")
    
    # Wait for and display results
    translation_results = _wait_for_translation_results(
        api_key, project_id, translation_id, 5, host
    )
    html = _translation_results_html(translation_results)

    from IPython.display import HTML, display
    display(HTML(html))


def _get_data_sources(api_key: str, host: str) -> List[Dict]:
    """Fetch all data sources from the Datafold API."""
    url = prepare_api_url(host, "api/v1/data_sources")
    headers = prepare_headers(api_key)
    response = get_data(url, headers=headers)
    return response.json()


def _create_dma_project(
    api_key: str, source_ds_id: int, target_ds_id: int, name: str, host: str
) -> Dict:
    """Create a DMA project."""
    url = prepare_api_url(host, "api/internal/dma/projects")
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    payload = {
        "name": name,
        "from_data_source_id": source_ds_id,
        "to_data_source_id": target_ds_id,
        "version": 2,
        "settings": {
            "error_on_zero_diff": False,
            "transform_group_creation_strategy": "group_individual_operations",
            "experimental": {
                "import_sql_files_as_script_objects": True,
                "infer_schema_from_scripts": True,
                "generate_synthetic_data": True,
            },
        },
    }

    response = post_data(url, json_data=payload, headers=headers)
    return response.json()['project']


def _upload_queries(
    api_key: str, project_id: int, queries: List[str], host: str
) -> Dict:
    """Upload multiple queries to be translated."""
    url = prepare_api_url(host, f"api/internal/dma/v2/projects/{project_id}/files")
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    payload = {
        'files': [
            {"filename": f"query_{i+1}.sql", "content": query} for i, query in enumerate(queries)
        ]
    }
    response = post_data(url, json_data=payload, headers=headers)
    return response.json()


def _start_translation(api_key: str, project_id: int, host: str) -> int:
    """Start translation."""
    url = prepare_api_url(host, f"api/internal/dma/v2/projects/{project_id}/translate/jobs")
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    response = post_data(
        url,
        json_data={"project_id": project_id},
        headers=headers,
    )
    translation_id = response.json()["task_id"]
    return translation_id


def _wait_for_translation_results(
    api_key: str, project_id: int, translation_id: int, poll_interval: int, host: str
) -> Dict:
    """Poll for translation completion."""
    import sys

    url = prepare_api_url(
        host, f"api/internal/dma/v2/projects/{project_id}/translate/jobs/{translation_id}"
    )
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    spinner = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
    spinner_speed = 0.1

    last_check_time = 0
    i = 0

    while True:
        current_time = time.time()

        # Check API status at poll_interval
        if current_time - last_check_time >= poll_interval:
            response = get_data(url, headers=headers)
            result = response.json()
            status = result["status"]

            if status in [TranslationJobStatus.DONE, TranslationJobStatus.FAILED]:
                print(f"\r✓ Translation completed with status: {status}")
                sys.stdout.flush()
                return result

            last_check_time = current_time

        # Update spinner display more frequently
        print(f"\r{spinner[i % len(spinner)]} Waiting for translation results...", end='')
        sys.stdout.flush()
        i += 1
        time.sleep(spinner_speed)

## Example Usage

Now you can translate Snowflake queries to BigQuery. Here are some example queries:

In [4]:
queries = [
    # JSON handling in Snowflake
    "SELECT product_id, attributes:color::string as color, attributes:size::string as size FROM products WHERE attributes IS NOT NULL",
    
    # FLATTEN function
    "SELECT f.value:sku::string as sku, f.value:quantity::number as qty FROM orders, LATERAL FLATTEN(input => PARSE_JSON(items)) f",
    
    # Window functions
    """SELECT 
        store_id, 
        product_id, 
        DATE_TRUNC('week', sale_date) as week, 
        SUM(quantity) as units, 
        SUM(revenue) as total_revenue, 
        AVG(SUM(revenue)) OVER (
            PARTITION BY store_id, product_id 
            ORDER BY DATE_TRUNC('week', sale_date) 
            ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
        ) as moving_avg 
    FROM sales 
    GROUP BY 1, 2, 3""",
    
    # OBJECT_CONSTRUCT
    "SELECT customer_id, OBJECT_CONSTRUCT('total_orders', COUNT(*), 'total_spent', SUM(total), 'avg_order', AVG(total)) as customer_stats FROM orders GROUP BY customer_id",
    
    # Time travel
    "SELECT * FROM orders AT(TIMESTAMP => DATEADD(days, -1, CURRENT_TIMESTAMP()))",
    
    # Sampling
    "SELECT * FROM sales SAMPLE (10)",
]

translate_snowflake_to_bigquery(queries, host="http://localhost:5000", org_token='token1')

✓ Organization created with id 85
✓ Project created with id 450
✓ Queries uploaded
✓ Started translation with id 3d96ae8f-1ca0-416e-bfe0-7f0426895804
✓ Translation completed with status: done
