# Snowflake to BigQuery SQL Translation

This notebook translates SQL queries from Snowflake to BigQuery using the Datafold translation service.

In [None]:
# FOR LOCAL DEVELOPMENT, NOT NEEDED ON COLAB/DATABRICKS
import os
import pathlib
from dotenv import load_dotenv
load_dotenv()

LOCAL_DMA_SDK_PATH = os.getenv('LOCAL_DMA_SDK_PATH')
if LOCAL_DMA_SDK_PATH and pathlib.Path(LOCAL_DMA_SDK_PATH).exists():
    print(f"Installing dma-sdk from {LOCAL_DMA_SDK_PATH}")
    %pip install --editable "{LOCAL_DMA_SDK_PATH}"
    
    # Restart to make dependencies available
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
# Install dependencies
%pip install --force-reinstall --no-cache-dir git+https://github.com/datafold/dma-sdk.git

# Restart to make dependencies available
%restart_python

In [None]:
import time
from typing import List, Dict, Tuple
from dma_sdk.utils import prepare_api_url, prepare_headers, post_data, get_data
from dma_sdk import (
    create_organization,
    TranslationJobStatus,
    TranslationStatus,
    _get_host,
    _get_current_api_key,
    _set_current_api_key,
    _translation_results_html,
    DEFAULT_HOST,
    DEFAULT_ORG_TOKEN
)

def translate_snowflake_to_bigquery(
    queries: List[str],
    org_token: str | None = None,
    host: str | None = None,
) -> None:
    """
    Translate SQL queries from Snowflake to BigQuery and render results.

    Args:
        queries: List of Snowflake SQL queries to translate
        org_token: Organization token for authentication (defaults to DEFAULT_ORG_TOKEN)
        host: Host URL for Datafold instance (defaults to DEFAULT_HOST)
    """
    # Use default org token if not provided
    if org_token is None:
        org_token = DEFAULT_ORG_TOKEN

    # Get or create API key
    api_key = _get_current_api_key(org_token, host)
    if api_key is None:
        raise ValueError(
            "API key is not set. Please call create_organization or set the API key manually."
        )
    
    # Get host
    host = _get_host(host)
    
    # Create DMA project for Snowflake -> BigQuery
    data_sources = _get_data_sources(api_key, host)
    
    # Find Snowflake source and BigQuery target
    snowflake_ds = next((d for d in data_sources if d['type'] == 'snowflake'), None)
    bigquery_ds = next((d for d in data_sources if d['type'] == 'bigquery'), None)
    
    if snowflake_ds is None:
        raise ValueError("No Snowflake data source found. Please configure a Snowflake data source.")
    if bigquery_ds is None:
        raise ValueError("No BigQuery data source found. Please configure a BigQuery data source.")
    
    source_data_source_id = snowflake_ds['id']
    target_data_source_id = bigquery_ds['id']
    
    project = _create_dma_project(
        api_key, source_data_source_id, target_data_source_id, 'Snowflake to BigQuery Translation', host
    )
    project_id = project['id']
    print(f"✓ Project created with id {project_id}")

    # Upload queries to translate
    _upload_queries(host=host, api_key=api_key, project_id=project_id, queries=queries)
    print(f"✓ Queries uploaded")

    # Start translating queries
    translation_id = _start_translation(api_key, project_id, host)
    print(f"✓ Started translation with id {translation_id}")
    
    # Wait for and display results
    translation_results = _wait_for_translation_results(
        api_key, project_id, translation_id, 5, host
    )
    html = _translation_results_html(translation_results)

    from IPython.display import HTML, display
    display(HTML(html))


def _get_data_sources(api_key: str, host: str) -> List[Dict]:
    """Fetch all data sources from the Datafold API."""
    url = prepare_api_url(host, "api/v1/data_sources")
    headers = prepare_headers(api_key)
    response = get_data(url, headers=headers)
    return response.json()


def _create_dma_project(
    api_key: str, source_ds_id: int, target_ds_id: int, name: str, host: str
) -> Dict:
    """Create a DMA project."""
    url = prepare_api_url(host, "api/internal/dma/projects")
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    payload = {
        "name": name,
        "from_data_source_id": source_ds_id,
        "to_data_source_id": target_ds_id,
        "version": 2,
        "settings": {
            "error_on_zero_diff": False,
            "transform_group_creation_strategy": "group_individual_operations",
            "experimental": {
                "import_sql_files_as_script_objects": True,
                "infer_schema_from_scripts": True,
                "generate_synthetic_data": True,
            },
        },
    }

    response = post_data(url, json_data=payload, headers=headers)
    return response.json()['project']


def _upload_queries(
    api_key: str, project_id: int, queries: List[str], host: str
) -> Dict:
    """Upload multiple queries to be translated."""
    url = prepare_api_url(host, f"api/internal/dma/v2/projects/{project_id}/files")
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    payload = {
        'files': [
            {"filename": f"query_{i+1}.sql", "content": query} for i, query in enumerate(queries)
        ]
    }
    response = post_data(url, json_data=payload, headers=headers)
    return response.json()


def _start_translation(api_key: str, project_id: int, host: str) -> int:
    """Start translation."""
    url = prepare_api_url(host, f"api/internal/dma/v2/projects/{project_id}/translate/jobs")
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    response = post_data(
        url,
        json_data={"project_id": project_id},
        headers=headers,
    )
    translation_id = response.json()["task_id"]
    return translation_id


def _wait_for_translation_results(
    api_key: str, project_id: int, translation_id: int, poll_interval: int, host: str
) -> Dict:
    """Poll for translation completion."""
    import sys

    url = prepare_api_url(
        host, f"api/internal/dma/v2/projects/{project_id}/translate/jobs/{translation_id}"
    )
    headers = prepare_headers(api_key)
    headers["Content-Type"] = "application/json"

    spinner = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
    spinner_speed = 0.1

    last_check_time = 0
    i = 0

    while True:
        current_time = time.time()

        # Check API status at poll_interval
        if current_time - last_check_time >= poll_interval:
            response = get_data(url, headers=headers)
            result = response.json()
            status = result["status"]

            if status in [TranslationJobStatus.DONE, TranslationJobStatus.FAILED]:
                print(f"\r✓ Translation completed with status: {status}")
                sys.stdout.flush()
                return result

            last_check_time = current_time

        # Update spinner display more frequently
        print(f"\r{spinner[i % len(spinner)]} Waiting for translation results...", end='')
        sys.stdout.flush()
        i += 1
        time.sleep(spinner_speed)

## Example Usage

Now you can translate Snowflake queries to BigQuery. Here are some example queries:

In [6]:
queries = [
    # JSON handling in Snowflake
    "SELECT product_id, attributes:color::string as color, attributes:size::string as size FROM products WHERE attributes IS NOT NULL",
    
    # FLATTEN function
    "SELECT f.value:sku::string as sku, f.value:quantity::number as qty FROM orders, LATERAL FLATTEN(input => PARSE_JSON(items)) f",
    
    # Window functions
    """SELECT 
        store_id, 
        product_id, 
        DATE_TRUNC('week', sale_date) as week, 
        SUM(quantity) as units, 
        SUM(revenue) as total_revenue, 
        AVG(SUM(revenue)) OVER (
            PARTITION BY store_id, product_id 
            ORDER BY DATE_TRUNC('week', sale_date) 
            ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
        ) as moving_avg 
    FROM sales 
    GROUP BY 1, 2, 3""",
    
    # OBJECT_CONSTRUCT
    "SELECT customer_id, OBJECT_CONSTRUCT('total_orders', COUNT(*), 'total_spent', SUM(total), 'avg_order', AVG(total)) as customer_stats FROM orders GROUP BY customer_id",
    
"""
SELECT 
    o.order_id,
    o.customer_id,
    o.order_date,
    o.total_amount,
    c.customer_name,
    c.customer_segment,
    PARSE_JSON(o.order_metadata):shipping_address::STRING AS shipping_address,
    PARSE_JSON(o.order_metadata):payment_method::STRING AS payment_method,
    ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.order_date DESC) AS order_rank
FROM raw_sales.orders o
JOIN raw_customer.customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= DATEADD(month, -6, CURRENT_DATE())
    AND o.status = 'COMPLETED'
QUALIFY order_rank <= 5
ORDER BY o.customer_id, o.order_date DESC
""",

"""
CREATE TABLE analytics.daily_product_metrics AS
SELECT 
    product_id,
    DATE_TRUNC('day', sale_timestamp) AS sale_date,
    COUNT(DISTINCT order_id) AS order_count,
    SUM(quantity) AS total_quantity_sold,
    SUM(net_revenue) AS total_revenue,
    AVG(unit_price) AS avg_unit_price,
    MEDIAN(unit_price) AS median_unit_price,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM raw_sales.order_line_items
WHERE sale_timestamp >= DATEADD(year, -1, CURRENT_DATE())
GROUP BY product_id, DATE_TRUNC('day', sale_timestamp)
""",

"""
WITH customer_orders AS (
    SELECT 
        customer_id,
        COUNT(DISTINCT order_id) AS total_orders,
        SUM(order_total) AS lifetime_value,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS last_order_date
    FROM raw_sales.orders
    WHERE order_status NOT IN ('CANCELLED', 'REFUNDED')
    GROUP BY customer_id
),
customer_categories AS (
    SELECT 
        co.*,
        CASE 
            WHEN total_orders >= 20 THEN 'VIP'
            WHEN total_orders >= 10 THEN 'FREQUENT'
            WHEN total_orders >= 5 THEN 'REGULAR'
            ELSE 'OCCASIONAL'
        END AS customer_tier,
        DATEDIFF(day, first_order_date, last_order_date) AS customer_lifespan_days
    FROM customer_orders co
)
SELECT 
    customer_tier,
    COUNT(*) AS customer_count,
    AVG(lifetime_value) AS avg_lifetime_value,
    AVG(total_orders) AS avg_orders,
    AVG(customer_lifespan_days) AS avg_lifespan_days
FROM customer_categories
GROUP BY customer_tier
ORDER BY avg_lifetime_value DESC
""",

"""
CREATE TABLE analytics.customer_cohort_analysis AS
WITH first_purchase AS (
    SELECT 
        customer_id,
        DATE_TRUNC('month', MIN(order_date)) AS cohort_month
    FROM raw_sales.orders
    WHERE order_status = 'COMPLETED'
    GROUP BY customer_id
),
monthly_activity AS (
    SELECT 
        fp.customer_id,
        fp.cohort_month,
        DATE_TRUNC('month', o.order_date) AS activity_month,
        SUM(o.order_total) AS monthly_revenue
    FROM first_purchase fp
    JOIN raw_sales.orders o ON fp.customer_id = o.customer_id
    WHERE o.order_status = 'COMPLETED'
    GROUP BY fp.customer_id, fp.cohort_month, DATE_TRUNC('month', o.order_date)
)
SELECT 
    cohort_month,
    activity_month,
    DATEDIFF(month, cohort_month, activity_month) AS months_since_first_purchase,
    COUNT(DISTINCT customer_id) AS active_customers,
    SUM(monthly_revenue) AS cohort_revenue
FROM monthly_activity
GROUP BY cohort_month, activity_month
""",

"""
WITH product_sales_7day AS (
    SELECT 
        product_id,
        SUM(quantity) AS qty_sold_7d,
        SUM(net_revenue) AS revenue_7d
    FROM raw_sales.order_line_items
    WHERE sale_timestamp >= DATEADD(day, -7, CURRENT_TIMESTAMP())
    GROUP BY product_id
),
product_sales_30day AS (
    SELECT 
        product_id,
        SUM(quantity) AS qty_sold_30d,
        SUM(net_revenue) AS revenue_30d
    FROM raw_sales.order_line_items
    WHERE sale_timestamp >= DATEADD(day, -30, CURRENT_TIMESTAMP())
    GROUP BY product_id
)
SELECT 
    p.product_id,
    p.product_name,
    p.category,
    ps7.qty_sold_7d,
    ps30.qty_sold_30d,
    ps7.revenue_7d,
    ps30.revenue_30d,
    RATIO_TO_REPORT(ps7.revenue_7d) OVER () AS pct_total_revenue_7d
FROM raw_product.products p
LEFT JOIN product_sales_7day ps7 ON p.product_id = ps7.product_id
LEFT JOIN product_sales_30day ps30 ON p.product_id = ps30.product_id
WHERE ps7.qty_sold_7d > 0 OR ps30.qty_sold_30d > 0
ORDER BY ps7.revenue_7d DESC NULLS LAST
""",

"""
CREATE TABLE analytics.store_performance_weekly AS
SELECT 
    s.store_id,
    s.store_name,
    s.region,
    DATE_TRUNC('week', o.order_date) AS week_start_date,
    COUNT(DISTINCT o.order_id) AS weekly_orders,
    COUNT(DISTINCT o.customer_id) AS weekly_customers,
    SUM(o.order_total) AS weekly_revenue,
    AVG(o.order_total) AS avg_order_value,
    LAG(SUM(o.order_total), 1) OVER (PARTITION BY s.store_id ORDER BY DATE_TRUNC('week', o.order_date)) AS prev_week_revenue,
    (SUM(o.order_total) - LAG(SUM(o.order_total), 1) OVER (PARTITION BY s.store_id ORDER BY DATE_TRUNC('week', o.order_date))) 
        / NULLIF(LAG(SUM(o.order_total), 1) OVER (PARTITION BY s.store_id ORDER BY DATE_TRUNC('week', o.order_date)), 0) AS revenue_growth_rate
FROM raw_operations.stores s
JOIN raw_sales.orders o ON s.store_id = o.store_id
WHERE o.order_date >= DATEADD(month, -3, CURRENT_DATE())
GROUP BY s.store_id, s.store_name, s.region, DATE_TRUNC('week', o.order_date)
""",

"""
WITH returns_analysis AS (
    SELECT 
        r.return_id,
        r.order_id,
        r.return_date,
        o.order_date,
        DATEDIFF(day, o.order_date, r.return_date) AS days_to_return,
        r.return_reason,
        oli.product_id,
        oli.quantity AS ordered_quantity,
        r.returned_quantity,
        oli.net_revenue AS original_revenue
    FROM raw_sales.returns r
    JOIN raw_sales.orders o ON r.order_id = o.order_id
    JOIN raw_sales.order_line_items oli ON r.order_id = oli.order_id AND r.product_id = oli.product_id
    WHERE r.return_date >= DATEADD(month, -6, CURRENT_DATE())
)
SELECT 
    product_id,
    return_reason,
    COUNT(DISTINCT return_id) AS return_count,
    SUM(returned_quantity) AS total_returned_qty,
    SUM(original_revenue * (returned_quantity / ordered_quantity)) AS revenue_impact,
    AVG(days_to_return) AS avg_days_to_return,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY days_to_return) AS median_days_to_return
FROM returns_analysis
GROUP BY product_id, return_reason
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY return_count DESC) <= 3
ORDER BY product_id, return_count DESC
"""
]

translate_snowflake_to_bigquery(queries, host="http://localhost:5000", org_token='token1')

✓ Project created with id 452
✓ Queries uploaded
✓ Started translation with id 4d86a06d-3409-45d6-a7ed-b2665b6eaea7
✓ Translation completed with status: done
