# Simulated Data Pipeline Playbook

This notebook demonstrates how to simulate end-to-end pipelines that move data from an on-premises environment into a data lake and onward to Snowflake.
We will focus on showing the **techniques** that practitioners would use, including how to reconcile schema differences before merging data into a final analytics database.

## 0. Imports & Configuration

We rely on pandas to stand in for the compute engines (Spark, Snowflake) and use Python dictionaries to emulate configuration artifacts.

In [1]:
import pandas as pd
from pathlib import Path

BASE_PATH = Path('simulated_storage')
BASE_PATH.mkdir(exist_ok=True)
print('Working directory prepared:', BASE_PATH)

Working directory prepared: simulated_storage


## 1. Scenario Setup

To keep a consistent business story, we will follow a fictional retail company consolidating online orders. Data originates from multiple source systems:

* **AVS (On-Prem / VM)**: Legacy ERP extracts that use snake_case headers.
* **Data Lake Landing**: Semi-structured JSON exports with camelCase headers and nested attributes.
* **Snowflake**: Target warehouse expecting business-friendly Pascal Case headers.

In real life we would connect to each platform with vendor-specific connectors. Here, we simulate those reads with local files while emphasizing best practices such as configuration-driven ingestion and schema validation.

### 1.1 Sample Source Files

We create three mock datasets with intentionally mismatched headers to illustrate transformation needs:

In [2]:
avs_orders = pd.DataFrame({
    'order_id': [101, 102, 103],
    'customer_id': ['C001', 'C002', 'C003'],
    'order_total': [250.0, 190.5, 330.25],
    'order_ts': ['2024-03-01 10:15:00', '2024-03-01 12:30:00', '2024-03-02 09:45:00']
})

raw_lake_orders = pd.DataFrame({
    'OrderID': ['A-900', 'A-901'],
    'CustomerCode': ['C002', 'C004'],
    'GrossAmount': [210.0, 480.0],
    'UpdatedAt': ['2024-03-02T14:00:00Z', '2024-03-02T16:20:00Z']
})

partner_feed = pd.DataFrame({
    'ORDER NUMBER': [5001, 5002],
    'CUSTOMER': ['C001', 'C005'],
    'TOTAL $': [125.5, 275.75],
    'MODIFIED': ['2024/03/03 08:30:00', '2024/03/03 09:10:00']
})

avs_orders, raw_lake_orders, partner_feed

(   order_id customer_id  order_total             order_ts
 0       101        C001       250.00  2024-03-01 10:15:00
 1       102        C002       190.50  2024-03-01 12:30:00
 2       103        C003       330.25  2024-03-02 09:45:00,
   OrderID CustomerCode  GrossAmount             UpdatedAt
 0   A-900         C002        210.0  2024-03-02T14:00:00Z
 1   A-901         C004        480.0  2024-03-02T16:20:00Z,
    ORDER NUMBER CUSTOMER  TOTAL $             MODIFIED
 0          5001     C001   125.50  2024/03/03 08:30:00
 1          5002     C005   275.75  2024/03/03 09:10:00)

## 2. Ingestion Techniques

### 2.1 Configuration-Driven Connectors

Real pipelines rely on configuration maps to determine connection parameters and mapping logic. Below we define metadata that describes each source, its storage layer, and how headers should be normalized.

In [3]:
source_config = {
    'avs_orders': {
        'layer': 'AVS',
        'read_format': 'csv',
        'primary_key': 'order_id',
        'expected_headers': ['order_id', 'customer_id', 'order_total', 'order_ts'],
        'rename_map': {
            'order_id': 'OrderID',
            'customer_id': 'CustomerID',
            'order_total': 'Amount',
            'order_ts': 'UpdatedAt'
        }
    },
    'raw_lake_orders': {
        'layer': 'DataLake',
        'read_format': 'json',
        'primary_key': 'OrderID',
        'expected_headers': ['OrderID', 'CustomerCode', 'GrossAmount', 'UpdatedAt'],
        'rename_map': {
            'OrderID': 'OrderID',
            'CustomerCode': 'CustomerID',
            'GrossAmount': 'Amount',
            'UpdatedAt': 'UpdatedAt'
        }
    },
    'partner_feed': {
        'layer': 'PartnerFTP',
        'read_format': 'xlsx',
        'primary_key': 'ORDER NUMBER',
        'expected_headers': ['ORDER NUMBER', 'CUSTOMER', 'TOTAL $', 'MODIFIED'],
        'rename_map': {
            'ORDER NUMBER': 'OrderID',
            'CUSTOMER': 'CustomerID',
            'TOTAL $': 'Amount',
            'MODIFIED': 'UpdatedAt'
        }
    }
}
source_config

{'avs_orders': {'layer': 'AVS',
  'read_format': 'csv',
  'primary_key': 'order_id',
  'expected_headers': ['order_id', 'customer_id', 'order_total', 'order_ts'],
  'rename_map': {'order_id': 'OrderID',
   'customer_id': 'CustomerID',
   'order_total': 'Amount',
   'order_ts': 'UpdatedAt'}},
 'raw_lake_orders': {'layer': 'DataLake',
  'read_format': 'json',
  'primary_key': 'OrderID',
  'expected_headers': ['OrderID', 'CustomerCode', 'GrossAmount', 'UpdatedAt'],
  'rename_map': {'OrderID': 'OrderID',
   'CustomerCode': 'CustomerID',
   'GrossAmount': 'Amount',
   'UpdatedAt': 'UpdatedAt'}},
 'partner_feed': {'layer': 'PartnerFTP',
  'read_format': 'xlsx',
  'primary_key': 'ORDER NUMBER',
  'expected_headers': ['ORDER NUMBER', 'CUSTOMER', 'TOTAL $', 'MODIFIED'],
  'rename_map': {'ORDER NUMBER': 'OrderID',
   'CUSTOMER': 'CustomerID',
   'TOTAL $': 'Amount',
   'MODIFIED': 'UpdatedAt'}}}

### 2.2 Schema Validation Utility

A lightweight validation function checks whether the incoming headers match expectations. In production, this step would raise alerts or move files to quarantine when mismatches occur.

In [4]:
def validate_headers(df: pd.DataFrame, config: dict, *, source: str) -> None:
    actual = list(df.columns)
    expected = config[source]['expected_headers']
    missing = set(expected) - set(actual)
    extra = set(actual) - set(expected)
    if missing or extra:
        print(f"[WARN] {source}: header mismatch detected")
        if missing:
            print('  Missing columns:', ', '.join(sorted(missing)))
        if extra:
            print('  Unexpected columns:', ', '.join(sorted(extra)))
    else:
        print(f"[OK] {source}: headers aligned")

validate_headers(avs_orders, source_config, source='avs_orders')
validate_headers(raw_lake_orders, source_config, source='raw_lake_orders')
validate_headers(partner_feed, source_config, source='partner_feed')

[OK] avs_orders: headers aligned
[OK] raw_lake_orders: headers aligned
[OK] partner_feed: headers aligned


## 3. Data Lake Landing (Bronze/Silver)

We mimic a bronze-to-silver process. Bronze keeps the data as-is but tracks metadata. Silver standardizes header names, harmonizes datatypes, and enriches records with a unified surrogate key.

In [5]:
def normalize_headers(df: pd.DataFrame, rename_map: dict) -> pd.DataFrame:
    return df.rename(columns=rename_map)

bronze_tables = {
    'avs_orders_bronze': avs_orders.copy(),
    'raw_lake_orders_bronze': raw_lake_orders.copy(),
    'partner_feed_bronze': partner_feed.copy()
}

silver_tables = {}
for source, bronze_df in bronze_tables.items():
    cfg_key = source.replace('_bronze', '')
    renamed = normalize_headers(bronze_df, source_config[cfg_key]['rename_map'])
    renamed['IngestedFrom'] = source_config[cfg_key]['layer']
    silver_tables[source.replace('_bronze', '_silver')] = renamed

silver_tables

{'avs_orders_silver':    OrderID CustomerID  Amount            UpdatedAt IngestedFrom
 0      101       C001  250.00  2024-03-01 10:15:00          AVS
 1      102       C002  190.50  2024-03-01 12:30:00          AVS
 2      103       C003  330.25  2024-03-02 09:45:00          AVS,
 'raw_lake_orders_silver':   OrderID CustomerID  Amount             UpdatedAt IngestedFrom
 0   A-900       C002   210.0  2024-03-02T14:00:00Z     DataLake
 1   A-901       C004   480.0  2024-03-02T16:20:00Z     DataLake,
 'partner_feed_silver':    OrderID CustomerID  Amount            UpdatedAt IngestedFrom
 0     5001       C001  125.50  2024/03/03 08:30:00   PartnerFTP
 1     5002       C005  275.75  2024/03/03 09:10:00   PartnerFTP}

### 3.1 Type Harmonization

Notice that timestamps and numeric fields follow different patterns. We centralize casting rules to avoid repeated transformation logic.

In [6]:
dtype_rules = {
    'OrderID': 'string',
    'CustomerID': 'string',
    'Amount': 'float',
    'UpdatedAt': 'datetime64[ns]'
}

for table_name, df in silver_tables.items():
    typed_df = df.astype({k: v for k, v in dtype_rules.items() if k in df.columns}, errors='ignore')
    typed_df['UpdatedAt'] = pd.to_datetime(typed_df['UpdatedAt'], errors='coerce')
    silver_tables[table_name] = typed_df

silver_tables['avs_orders_silver'].dtypes

OrderID         string[python]
CustomerID      string[python]
Amount                 float64
UpdatedAt       datetime64[ns]
IngestedFrom            object
dtype: object

## 4. Snowflake Loading & Transformation

In production we would use Snowflake stages and `COPY INTO` commands. Here, we consolidate the silver tables into a single **gold** table while applying business rules:

* Deduplicate on `OrderID` preferring the most recent `UpdatedAt`.
* Create human-readable headers expected by analytics teams.
* Split the output into two presentation formats: a wide executive report and a transactional view.

In [8]:
combined = pd.concat(silver_tables.values(), ignore_index=True)
combined['UpdatedAt'] = pd.to_datetime(combined['UpdatedAt'], utc=True, errors='coerce')
combined = combined.sort_values('UpdatedAt', na_position='first').drop_duplicates('OrderID', keep='last')
combined['Amount'] = pd.to_numeric(combined['Amount'], errors='coerce').round(2)
combined

Unnamed: 0,OrderID,CustomerID,Amount,UpdatedAt,IngestedFrom
0,101,C001,250.0,2024-03-01 10:15:00+00:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00+00:00,AVS
2,103,C003,330.25,2024-03-02 09:45:00+00:00,AVS
3,A-900,C002,210.0,2024-03-02 14:00:00+00:00,DataLake
4,A-901,C004,480.0,2024-03-02 16:20:00+00:00,DataLake
5,5001,C001,125.5,2024-03-03 08:30:00+00:00,PartnerFTP
6,5002,C005,275.75,2024-03-03 09:10:00+00:00,PartnerFTP


### 4.1 Final Warehouse Schema

Snowflake tables often use Pascal Case. We also demonstrate how to produce variant header layouts.

In [9]:
warehouse_view = combined.rename(columns={
    'OrderID': 'OrderId',
    'CustomerID': 'CustomerId',
    'Amount': 'NetAmount',
    'UpdatedAt': 'UpdatedAtUtc',
    'IngestedFrom': 'SourceSystem'
})
warehouse_view

Unnamed: 0,OrderId,CustomerId,NetAmount,UpdatedAtUtc,SourceSystem
0,101,C001,250.0,2024-03-01 10:15:00+00:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00+00:00,AVS
2,103,C003,330.25,2024-03-02 09:45:00+00:00,AVS
3,A-900,C002,210.0,2024-03-02 14:00:00+00:00,DataLake
4,A-901,C004,480.0,2024-03-02 16:20:00+00:00,DataLake
5,5001,C001,125.5,2024-03-03 08:30:00+00:00,PartnerFTP
6,5002,C005,275.75,2024-03-03 09:10:00+00:00,PartnerFTP


### 4.2 Executive Summary Layout

Business stakeholders sometimes request multi-level headers that distinguish metrics from metadata. Pandas styling allows us to mimic the resulting presentation layer.

In [10]:
executive_summary = warehouse_view.copy()
executive_summary.columns = pd.MultiIndex.from_tuples([
    ('Identity', 'OrderId'),
    ('Identity', 'CustomerId'),
    ('Financials', 'NetAmount'),
    ('Operational', 'UpdatedAtUtc'),
    ('Operational', 'SourceSystem')
])
executive_summary

Unnamed: 0_level_0,Identity,Identity,Financials,Operational,Operational
Unnamed: 0_level_1,OrderId,CustomerId,NetAmount,UpdatedAtUtc,SourceSystem
0,101,C001,250.0,2024-03-01 10:15:00+00:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00+00:00,AVS
2,103,C003,330.25,2024-03-02 09:45:00+00:00,AVS
3,A-900,C002,210.0,2024-03-02 14:00:00+00:00,DataLake
4,A-901,C004,480.0,2024-03-02 16:20:00+00:00,DataLake
5,5001,C001,125.5,2024-03-03 08:30:00+00:00,PartnerFTP
6,5002,C005,275.75,2024-03-03 09:10:00+00:00,PartnerFTP


## 5. Orchestration & Monitoring Patterns

Below is a pseudo-code Airflow DAG highlighting how these transformations would be orchestrated in a production setting.

In [11]:
from textwrap import dedent

airflow_dag = dedent('''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id="retail_orders_pipeline",
    start_date=datetime(2024, 3, 1),
    schedule_interval="0 * * * *",
    catchup=False,
) as dag:

    extract_avs = PythonOperator(
        task_id="extract_avs",
        python_callable=extract_from_avs,
    )

    load_bronze = PythonOperator(
        task_id="load_bronze",
        python_callable=write_to_bronze,
    )

    transform_silver = PythonOperator(
        task_id="transform_silver",
        python_callable=standardize_headers,
    )

    publish_gold = PythonOperator(
        task_id="publish_gold",
        python_callable=publish_to_snowflake,
    )

    extract_avs >> load_bronze >> transform_silver >> publish_gold
''')
print(airflow_dag)


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id="retail_orders_pipeline",
    start_date=datetime(2024, 3, 1),
    schedule_interval="0 * * * *",
    catchup=False,
) as dag:

    extract_avs = PythonOperator(
        task_id="extract_avs",
        python_callable=extract_from_avs,
    )

    load_bronze = PythonOperator(
        task_id="load_bronze",
        python_callable=write_to_bronze,
    )

    transform_silver = PythonOperator(
        task_id="transform_silver",
        python_callable=standardize_headers,
    )

    publish_gold = PythonOperator(
        task_id="publish_gold",
        python_callable=publish_to_snowflake,
    )

    extract_avs >> load_bronze >> transform_silver >> publish_gold



## 6. End-to-End Demonstration

Finally, we package the transformations into reusable functions to demonstrate how a single orchestration call could drive the pipeline.

> **Tip:** In a real deployment, each function would live in its own module with logging, error handling, retry logic, and parameterization for environment-specific values.

In [12]:
def standardize_source(df: pd.DataFrame, cfg_key: str) -> pd.DataFrame:
    cfg = source_config[cfg_key]
    validate_headers(df, source_config, source=cfg_key)
    normalized = normalize_headers(df, cfg['rename_map'])
    normalized['IngestedFrom'] = cfg['layer']
    return normalized.astype({
        'OrderID': 'string',
        'CustomerID': 'string',
        'Amount': 'float'
    }, errors='ignore')

def run_pipeline(sources: dict[str, pd.DataFrame]) -> tuple[pd.DataFrame, pd.DataFrame]:
    silver = [standardize_source(df, key) for key, df in sources.items()]
    combined_df = pd.concat(silver, ignore_index=True)
    combined_df['UpdatedAt'] = pd.to_datetime(combined_df['UpdatedAt'], errors='coerce')
    combined_df = combined_df.sort_values('UpdatedAt').drop_duplicates('OrderID', keep='last')
    warehouse_df = combined_df.rename(columns={
        'OrderID': 'OrderId',
        'CustomerID': 'CustomerId',
        'Amount': 'NetAmount',
        'UpdatedAt': 'UpdatedAtUtc',
        'IngestedFrom': 'SourceSystem'
    })
    exec_summary = warehouse_df.copy()
    exec_summary.columns = pd.MultiIndex.from_tuples([
        ('Identity', 'OrderId'),
        ('Identity', 'CustomerId'),
        ('Financials', 'NetAmount'),
        ('Operational', 'UpdatedAtUtc'),
        ('Operational', 'SourceSystem')
    ])
    return warehouse_df, exec_summary

warehouse_df, exec_summary = run_pipeline({
    'avs_orders': avs_orders,
    'raw_lake_orders': raw_lake_orders,
    'partner_feed': partner_feed
})
warehouse_df, exec_summary

[OK] avs_orders: headers aligned
[OK] raw_lake_orders: headers aligned
[OK] partner_feed: headers aligned


(  OrderId CustomerId  NetAmount        UpdatedAtUtc SourceSystem
 0     101       C001     250.00 2024-03-01 10:15:00          AVS
 1     102       C002     190.50 2024-03-01 12:30:00          AVS
 2     103       C003     330.25 2024-03-02 09:45:00          AVS
 3   A-900       C002     210.00                 NaT     DataLake
 4   A-901       C004     480.00                 NaT     DataLake
 5    5001       C001     125.50                 NaT   PartnerFTP
 6    5002       C005     275.75                 NaT   PartnerFTP,
   Identity            Financials         Operational             
    OrderId CustomerId  NetAmount        UpdatedAtUtc SourceSystem
 0      101       C001     250.00 2024-03-01 10:15:00          AVS
 1      102       C002     190.50 2024-03-01 12:30:00          AVS
 2      103       C003     330.25 2024-03-02 09:45:00          AVS
 3    A-900       C002     210.00                 NaT     DataLake
 4    A-901       C004     480.00                 NaT     DataLake
 5

## 7. Key Takeaways

* **Schema reconciliation** is the heart of multi-source consolidation. The configuration maps give a repeatable way to translate headers, enforce types, and annotate provenance.
* **Layered storage (Bronze ? Silver ? Gold)** enables auditable transformations and simplifies debugging when data drifts.
* **Presentation models** can diverge dramatically from raw data structures; designing reusable formatting logic avoids one-off reporting hacks.

This notebook can serve as a sandbox for experimenting with additional sources, validation rules, and monitoring hooks before wiring up production pipelines.

## 8. Snowflake Load (Simulated)

We stage the warehouse view to a CSV and generate the Snowflake DDL/DML you would execute to load or merge the data. In practice you'd use Snowflake stages (internal or S3/Azure) and an orchestrator to run these commands.


In [13]:
staging = BASE_PATH / 'staging'
staging.mkdir(exist_ok=True)
stage_file = staging / 'orders_warehouse.csv'
warehouse_df.to_csv(stage_file, index=False)
print('Staged file:', stage_file.resolve())

pk = 'OrderId'
create_sql = (
"""
create table if not exists ORDERS_WAREHOUSE (
    OrderId string,
    CustomerId string,
    NetAmount number(18,2),
    UpdatedAtUtc timestamp_tz,
    SourceSystem string
);
"""
).strip()

copy_sql = (
f"""
copy into ORDERS_WAREHOUSE
from @~/{stage_file.name}
file_format = (type = csv field_optionally_enclosed_by='"' skip_header = 1)
on_error = 'CONTINUE';
"""
).strip()

merge_sql = (
f"""
merge into ORDERS_WAREHOUSE t
using (select * from @~/{stage_file.name} (file_format => 'CSV', skip_header => 1)) s
on t.{pk} = s.$1
when matched then update set
    CustomerId = s.$2,
    NetAmount = s.$3,
    UpdatedAtUtc = s.$4,
    SourceSystem = s.$5
when not matched then insert (OrderId, CustomerId, NetAmount, UpdatedAtUtc, SourceSystem)
values (s.$1, s.$2, s.$3, s.$4, s.$5);
"""
).strip()

print("\n-- Snowflake DDL")
print(create_sql)
print("\n-- Snowflake COPY")
print(copy_sql)
print("\n-- Snowflake MERGE")
print(merge_sql)


Staged file: C:\Users\pertt\Alleyfoo\pipe-transformation\simulated_storage\staging\orders_warehouse.csv

-- Snowflake DDL
create table if not exists ORDERS_WAREHOUSE (
    OrderId string,
    CustomerId string,
    NetAmount number(18,2),
    UpdatedAtUtc timestamp_tz,
    SourceSystem string
);

-- Snowflake COPY
copy into ORDERS_WAREHOUSE
from @~/orders_warehouse.csv
file_format = (type = csv field_optionally_enclosed_by='"' skip_header = 1)
on_error = 'CONTINUE';

-- Snowflake MERGE
merge into ORDERS_WAREHOUSE t
using (select * from @~/orders_warehouse.csv (file_format => 'CSV', skip_header => 1)) s
on t.OrderId = s.$1
when matched then update set
    CustomerId = s.$2,
    NetAmount = s.$3,
    UpdatedAtUtc = s.$4,
    SourceSystem = s.$5
when not matched then insert (OrderId, CustomerId, NetAmount, UpdatedAtUtc, SourceSystem)
values (s.$1, s.$2, s.$3, s.$4, s.$5);


## 9. Header Variants & Exports

Demonstrates producing multiple header conventions from the same canonical model and exporting them for downstream systems.


In [14]:
import re

def to_snake(name: str) -> str:
    s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
    s2 = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1)
    return s2.lower()

def to_camel_from_pascal(name: str) -> str:
    return name[:1].lower() + name[1:] if name else name

snake_df = warehouse_df.copy()
snake_df.columns = [to_snake(c) for c in warehouse_df.columns]

camel_df = warehouse_df.copy()
camel_df.columns = [to_camel_from_pascal(c) for c in warehouse_df.columns]

pascal_df = warehouse_df.copy()  # already Pascal Case

exports = BASE_PATH / 'exports'
exports.mkdir(exist_ok=True)
snake_path = exports / 'orders_snake_case.csv'
camel_path = exports / 'orders_camelCase.csv'
pascal_path = exports / 'orders_PascalCase.csv'
snake_df.to_csv(snake_path, index=False)
camel_df.to_csv(camel_path, index=False)
pascal_df.to_csv(pascal_path, index=False)

from IPython.display import display
print('Wrote:')
print(' -', snake_path.resolve())
print(' -', camel_path.resolve())
print(' -', pascal_path.resolve())
display(snake_df.head())
display(camel_df.head())
display(pascal_df.head())


Wrote:
 - C:\Users\pertt\Alleyfoo\pipe-transformation\simulated_storage\exports\orders_snake_case.csv
 - C:\Users\pertt\Alleyfoo\pipe-transformation\simulated_storage\exports\orders_camelCase.csv
 - C:\Users\pertt\Alleyfoo\pipe-transformation\simulated_storage\exports\orders_PascalCase.csv


Unnamed: 0,order_id,customer_id,net_amount,updated_at_utc,source_system
0,101,C001,250.0,2024-03-01 10:15:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00,AVS
2,103,C003,330.25,2024-03-02 09:45:00,AVS
3,A-900,C002,210.0,NaT,DataLake
4,A-901,C004,480.0,NaT,DataLake


Unnamed: 0,orderId,customerId,netAmount,updatedAtUtc,sourceSystem
0,101,C001,250.0,2024-03-01 10:15:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00,AVS
2,103,C003,330.25,2024-03-02 09:45:00,AVS
3,A-900,C002,210.0,NaT,DataLake
4,A-901,C004,480.0,NaT,DataLake


Unnamed: 0,OrderId,CustomerId,NetAmount,UpdatedAtUtc,SourceSystem
0,101,C001,250.0,2024-03-01 10:15:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00,AVS
2,103,C003,330.25,2024-03-02 09:45:00,AVS
3,A-900,C002,210.0,NaT,DataLake
4,A-901,C004,480.0,NaT,DataLake


## 10. Incremental Loads & Idempotent Upserts

Uses a simple watermark on the update timestamp to select new/changed rows and simulates an idempotent merge into a target table.


In [15]:
last_watermark = pd.Timestamp('2024-03-02 12:00:00', tz='UTC')
ts = pd.to_datetime(warehouse_df['UpdatedAtUtc'], errors='coerce', utc=True)
incremental = warehouse_df.loc[ts > last_watermark]
print(f'Records after watermark ({last_watermark}):', len(incremental))

# Simulate an existing target snapshot (e.g., previous load)
target_before = warehouse_df.iloc[[0, 1]].copy()
print('Target before:', len(target_before))

# Idempotent upsert: keep non-matching existing + replace with latest for matching keys
updated = pd.concat([
    target_before[~target_before['OrderId'].isin(incremental['OrderId'])],
    incremental
], ignore_index=True)
print('Target after:', len(updated))
updated.sort_values('OrderId').head()


Records after watermark (2024-03-02 12:00:00+00:00): 0
Target before: 2
Target after: 2


Unnamed: 0,OrderId,CustomerId,NetAmount,UpdatedAtUtc,SourceSystem
0,101,C001,250.0,2024-03-01 10:15:00,AVS
1,102,C002,190.5,2024-03-01 12:30:00,AVS


## 11. Connector Techniques (Stubs)

When live credentials or SDKs are unavailable, we can still demonstrate the technique. Below are minimal templates for common connectors:

- Azure Blob Storage (landing/bronze)
- AWS S3 (landing/bronze)
- Snowflake (warehouse)
- SFTP/AVS (on-prem legacy feed)

Install (suggested):

pip install azure-storage-blob boto3 snowflake-connector-python paramiko


In [16]:
azure_blob_template = """
from azure.storage.blob import BlobServiceClient

conn_str = "<AZURE_STORAGE_CONNECTION_STRING>"
container = "landing"
prefix = "orders/"

svc = BlobServiceClient.from_connection_string(conn_str)
container_client = svc.get_container_client(container)
for blob in container_client.list_blobs(name_starts_with=prefix):
    print(blob.name)
"""

s3_template = """
import boto3

s3 = boto3.client('s3')
bucket = '<BUCKET_NAME>'
prefix = 'orders/'

resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in resp.get('Contents', []):
    print(obj['Key'])
"""

snowflake_template = """
import snowflake.connector as sf

conn = sf.connect(
    user='<USER>', password='<PASSWORD>', account='<ACCOUNT>',
    warehouse='COMPUTE_WH', database='ANALYTICS', schema='PUBLIC'
)
cur = conn.cursor()
cur.execute("create table if not exists ORDERS_WAREHOUSE (OrderId string, ...)")
cur.execute("copy into ORDERS_WAREHOUSE from @~/<staged_file>.csv file_format=(type=csv skip_header=1)")
cur.close(); conn.close()
"""

sftp_template = """
import paramiko

host = '<SFTP_HOST>'
user = '<USER>'
key_path = '<PRIVATE_KEY_PATH>'

key = paramiko.RSAKey.from_private_key_file(key_path)
client = paramiko.SSHClient(); client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, username=user, pkey=key)
sftp = client.open_sftp()
for name in sftp.listdir('incoming/avs'): print(name)
sftp.close(); client.close()
"""

print("--- Azure Blob example ---\n", azure_blob_template)
print("\n--- S3 example ---\n", s3_template)
print("\n--- Snowflake example ---\n", snowflake_template)
print("\n--- SFTP/AVS example ---\n", sftp_template)


--- Azure Blob example ---
 
from azure.storage.blob import BlobServiceClient

conn_str = "<AZURE_STORAGE_CONNECTION_STRING>"
container = "landing"
prefix = "orders/"

svc = BlobServiceClient.from_connection_string(conn_str)
container_client = svc.get_container_client(container)
for blob in container_client.list_blobs(name_starts_with=prefix):
    print(blob.name)


--- S3 example ---
 
import boto3

s3 = boto3.client('s3')
bucket = '<BUCKET_NAME>'
prefix = 'orders/'

resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in resp.get('Contents', []):
    print(obj['Key'])


--- Snowflake example ---
 
import snowflake.connector as sf

conn = sf.connect(
    user='<USER>', password='<PASSWORD>', account='<ACCOUNT>',
    warehouse='COMPUTE_WH', database='ANALYTICS', schema='PUBLIC'
)
cur = conn.cursor()
cur.execute("create table if not exists ORDERS_WAREHOUSE (OrderId string, ...)")
cur.execute("copy into ORDERS_WAREHOUSE from @~/<staged_file>.csv file_format=(type=csv skip_head

## 12. Header Rename Before Merge (Hyphenated)

Example: transform a source header `key_value1` into `key-value` before merging into the main database table. Note that hyphenated identifiers require quoting in SQL (for example Snowflake/ANSI SQL: "key-value").


In [17]:
import re

# Example source feed with a non-canonical header name
kv_feed = pd.DataFrame({
    'key_value1': ['A001', 'B002', 'C003'],
    'amount': [120.0, 220.0, 150.0]
})

# Simulated main database snapshot (already using hyphenated header)
main_db = pd.DataFrame({
    'key-value': ['A001', 'B002'],
    'amount': [100.0, 200.0]
})

# Transform rules: underscores -> hyphens, drop trailing digits
def header_transform(name: str) -> str:
    name = name.replace('_', '-')
    name = re.sub(r'\d+$', '', name)  # remove trailing digits
    return name

kv_renamed = kv_feed.rename(columns=header_transform)

print('Before merge (main):')
print(main_db)
print('\nIncoming (renamed):')
print(kv_renamed)

# Idempotent upsert on the hyphenated key
key = 'key-value'
updated = pd.concat([
    main_db[~main_db[key].isin(kv_renamed[key])],
    kv_renamed
], ignore_index=True)

print('\nAfter merge (upserted):')
print(updated.sort_values(key).reset_index(drop=True))

# Example SQL to mirror this behavior in Snowflake (note quoted identifier)
sql_merge = """
merge into MAIN_DB t
using (select column1 as "key-value", column2 as amount from @~/upsert.csv (file_format => 'CSV', skip_header => 1)) s
on t."key-value" = s."key-value"
when matched then update set amount = s.amount
when not matched then insert ("key-value", amount) values (s."key-value", s.amount);
""".strip()
print('\nMerge template SQL (Snowflake):\n', sql_merge)


Before merge (main):
  key-value  amount
0      A001   100.0
1      B002   200.0

Incoming (renamed):
  key-value  amount
0      A001   120.0
1      B002   220.0
2      C003   150.0

After merge (upserted):
  key-value  amount
0      A001   120.0
1      B002   220.0
2      C003   150.0

Merge template SQL (Snowflake):
 merge into MAIN_DB t
using (select column1 as "key-value", column2 as amount from @~/upsert.csv (file_format => 'CSV', skip_header => 1)) s
on t."key-value" = s."key-value"
when matched then update set amount = s.amount
when not matched then insert ("key-value", amount) values (s."key-value", s.amount);


## 13. Config-Driven Orchestration (Demo)

Loads a YAML configuration and uses the reusable toolkit functions to normalize headers, perform a hyphenated-key upsert, and produce a Snowflake MERGE statement. This demonstrates the technique even if live connectors are unavailable.


In [18]:
import sys
from pathlib import Path
import yaml

# Make 'src' importable
sys.path.append('src')
from pipeline.toolkit import (
    load_yaml_config,
    header_transform_underscore_to_hyphen_remove_digits as hyph,
    idempotent_upsert_key,
    normalize_headers,
    snowflake_merge_sql,
)

cfg_path = Path('config/pipeline.example.yaml')
cfg = load_yaml_config(str(cfg_path)) if cfg_path.exists() else {}
print('Config loaded:', bool(cfg))
if cfg:
    print('Sources:', list(cfg.get('sources', {}).keys()))
    print('Targets:', list(cfg.get('targets', {}).keys()))

# Demo: header hyphenation before merge
source_df = pd.DataFrame({'key_value1': ['A001','B002'], 'amount':[120.0,210.0]})
target_df = pd.DataFrame({'key-value': ['A001'], 'amount':[100.0]})

renamed_df = source_df.rename(columns=hyph)
merged = idempotent_upsert_key(target_df, renamed_df, 'key-value')
print('After upsert rows:', len(merged))
renamed_df.head(), merged.sort_values('key-value').reset_index(drop=True).head()

print('\nMERGE SQL preview:')
print(snowflake_merge_sql('ORDERS_WAREHOUSE', 'OrderId', 'orders_warehouse.csv'))


Config loaded: True
Sources: ['abfs_orders_url', 's3_orders_url']
Targets: ['snowflake']
After upsert rows: 2

MERGE SQL preview:
merge into ORDERS_WAREHOUSE t
using (select * from @~/orders_warehouse.csv (file_format => 'CSV', skip_header => 1)) s
on t.OrderId = s.$1
when matched then update set
    CustomerId = s.$2,
    NetAmount = s.$3,
    UpdatedAtUtc = s.$4,
    SourceSystem = s.$5
when not matched then insert (OrderId, CustomerId, NetAmount, UpdatedAtUtc, SourceSystem)
values (s.$1, s.$2, s.$3, s.$4, s.$5);


## 14. Connection Methods & Pipes (Techniques)

This section summarizes practical ways to connect to common data deposits and shows how to define ingestion “pipes.”
Each example is safe-by-default: it will only execute if required libraries and environment variables are present; otherwise it prints the exact template code to use.

Covered below:
- Azure Blob Storage (connection string or SAS)
- AWS S3 (default credential chain)
- SFTP/AVS (key or password)
- Snowflake connector (session) and Snowpipe (CREATE STAGE/PIPE)


In [19]:
import os

def _chk(name, mod):
    try:
        __import__(mod)
        print(f"[OK] {name}: library present ({mod})")
        return True
    except Exception as e:
        print(f"[MISS] {name}: install package for {mod}")
        return False

has_azure = _chk('Azure Blob', 'azure.storage.blob')
has_boto3 = _chk('AWS S3', 'boto3')
has_paramiko = _chk('SFTP', 'paramiko')
has_snowflake = _chk('Snowflake', 'snowflake.connector')
print('\nEnvironment hints:')
for var in ['AZURE_STORAGE_CONNECTION_STRING','AZURE_BLOB_CONTAINER','AZURE_BLOB_PREFIX',
            'AWS_PROFILE','AWS_ACCESS_KEY_ID','S3_BUCKET','S3_PREFIX',
            'SFTP_HOST','SFTP_USER','SFTP_KEY_PATH','SFTP_DIR',
            'SNOWFLAKE_USER','SNOWFLAKE_PASSWORD','SNOWFLAKE_ACCOUNT','SNOWFLAKE_WAREHOUSE','SNOWFLAKE_DATABASE','SNOWFLAKE_SCHEMA']:
    if os.environ.get(var):
        print(' -', var, '= set')


[MISS] Azure Blob: install package for azure.storage.blob
[MISS] AWS S3: install package for boto3
[MISS] SFTP: install package for paramiko
[MISS] Snowflake: install package for snowflake.connector

Environment hints:


In [None]:
# Azure Blob: List blobs under prefix when available; otherwise print template
import os
try:
    from azure.storage.blob import BlobServiceClient  # type: ignore
    _has_lib = True
except Exception:
    _has_lib = False

conn = os.environ.get('AZURE_STORAGE_CONNECTION_STRING')
container = os.environ.get('AZURE_BLOB_CONTAINER')
prefix = os.environ.get('AZURE_BLOB_PREFIX','')

if _has_lib and conn and container:
    try:
        svc = BlobServiceClient.from_connection_string(conn)
        cc = svc.get_container_client(container)
        print(f'Listing azure blobs: container={container} prefix={prefix!r}')
        for i, blob in enumerate(cc.list_blobs(name_starts_with=prefix)):
            print(' -', blob.name)
            if i >= 20:
                print(' ... (truncated)')
                break
    except Exception as e:
        print('[ERR] Azure list failed:', e)
else:
    azure_template = """
from azure.storage.blob import BlobServiceClient
conn_str = os.environ['AZURE_STORAGE_CONNECTION_STRING']
container = os.environ.get('AZURE_BLOB_CONTAINER','landing')
prefix = os.environ.get('AZURE_BLOB_PREFIX','orders/')
svc = BlobServiceClient.from_connection_string(conn_str)
cc = svc.get_container_client(container)
for blob in cc.list_blobs(name_starts_with=prefix):
    print(blob.name)
""".strip()
    print('Azure Blob template:\n', azure_template)


In [20]:
# AWS S3: List keys under prefix when available; otherwise print template
import os
try:
    import boto3  # type: ignore
    _has_boto = True
except Exception:
    _has_boto = False

bucket = os.environ.get('S3_BUCKET')
prefix = os.environ.get('S3_PREFIX','')

if _has_boto and bucket:
    try:
        s3 = boto3.client('s3')
        print(f'Listing s3 keys: bucket={bucket} prefix={prefix!r}')
        paginator = s3.get_paginator('list_objects_v2')
        count = 0
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get('Contents', []):
                print(' -', obj['Key'])
                count += 1
                if count >= 25:
                    print(' ... (truncated)')
                    raise StopIteration
    except StopIteration:
        pass
    except Exception as e:
        print('[ERR] S3 list failed:', e)
else:
    s3_template = """
import boto3
s3 = boto3.client('s3')
bucket = os.environ.get('S3_BUCKET','my-raw-bucket')
prefix = os.environ.get('S3_PREFIX','orders/')
for page in s3.get_paginator('list_objects_v2').paginate(Bucket=bucket, Prefix=prefix):
    for obj in page.get('Contents', []):
        print(obj['Key'])
""".strip()
    print('S3 template:\n', s3_template)


S3 template:
 import boto3
s3 = boto3.client('s3')
bucket = os.environ.get('S3_BUCKET','my-raw-bucket')
prefix = os.environ.get('S3_PREFIX','orders/')
for page in s3.get_paginator('list_objects_v2').paginate(Bucket=bucket, Prefix=prefix):
    for obj in page.get('Contents', []):
        print(obj['Key'])


In [None]:
# SFTP (AVS-like): only prints template unless DO_SFTP_DEMO=1 is set
import os
_do = os.environ.get('DO_SFTP_DEMO') == '1'
try:
    import paramiko  # type: ignore
    _has_paramiko = True
except Exception:
    _has_paramiko = False

if _has_paramiko and _do:
    host = os.environ['SFTP_HOST']
    user = os.environ['SFTP_USER']
    key_path = os.environ['SFTP_KEY_PATH']
    remote_dir = os.environ.get('SFTP_DIR','/incoming')
    key = paramiko.RSAKey.from_private_key_file(key_path)
    client = paramiko.SSHClient(); client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(hostname=host, username=user, pkey=key)
    sftp = client.open_sftp()
    try:
        for name in sftp.listdir(remote_dir):
            print(' -', name)
    finally:
        sftp.close(); client.close()
else:
    sftp_template = """
import paramiko
host = os.environ['SFTP_HOST']
user = os.environ['SFTP_USER']
key_path = os.environ['SFTP_KEY_PATH']
remote_dir = os.environ.get('SFTP_DIR','/incoming')
key = paramiko.RSAKey.from_private_key_file(key_path)
client = paramiko.SSHClient(); client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, username=user, pkey=key)
sftp = client.open_sftp()
for name in sftp.listdir(remote_dir):
    print(name)
sftp.close(); client.close()
""".strip()
    print('SFTP template:\n', sftp_template)


### Snowflake Sessions, Stages, and Pipes

Below: how to connect a session, define an external stage for your object store, and create a Snowpipe (auto-ingest) or use COPY/MERGE for batch loads.


In [21]:
import os
sf_user = os.environ.get('SNOWFLAKE_USER')
sf_pwd = os.environ.get('SNOWFLAKE_PASSWORD')
sf_account = os.environ.get('SNOWFLAKE_ACCOUNT')
sf_wh = os.environ.get('SNOWFLAKE_WAREHOUSE','COMPUTE_WH')
sf_db = os.environ.get('SNOWFLAKE_DATABASE','ANALYTICS')
sf_schema = os.environ.get('SNOWFLAKE_SCHEMA','PUBLIC')

try:
    import snowflake.connector as sf
    _has_sf = True
except Exception:
    _has_sf = False

if _has_sf and sf_user and sf_pwd and sf_account:
    try:
        conn = sf.connect(user=sf_user, password=sf_pwd, account=sf_account, warehouse=sf_wh, database=sf_db, schema=sf_schema)
        cur = conn.cursor()
        cur.execute('select 1')
        print('[OK] Snowflake connected; example query ran.')
        cur.close(); conn.close()
    except Exception as e:
        print('[ERR] Snowflake connect failed:', e)

stage_azure_template = """
-- Azure Blob external stage (SAS or storage integration)
create or replace stage RAW_ORDERS
  url='azure://<account>.blob.core.windows.net/<container>/<prefix>'
  credentials=(azure_sas_token='<SAS_TOKEN>');
""".strip()

stage_s3_template = """
-- S3 external stage
create or replace stage RAW_ORDERS
  url='s3://my-raw-bucket/orders/'
  credentials=(aws_key_id='$AWS_ACCESS_KEY_ID' aws_secret_key='$AWS_SECRET_ACCESS_KEY');
""".strip()

pipe_template = """
-- Snowpipe auto-ingest from external stage to table
create or replace pipe LOAD_ORDERS as
  copy into ORDERS_WAREHOUSE
  from @RAW_ORDERS
  file_format = (type = csv field_optionally_enclosed_by='"' skip_header = 1)
  on_error = 'CONTINUE';
-- Optionally set notifications (S3 SNS / Azure Event Grid) per Snowflake docs.
""".strip()

print('\nExternal stage (Azure) template:\n', stage_azure_template)
print('\nExternal stage (S3) template:\n', stage_s3_template)
print('\nSnowpipe template:\n', pipe_template)



External stage (Azure) template:
 -- Azure Blob external stage (SAS or storage integration)
create or replace stage RAW_ORDERS
  url='azure://<account>.blob.core.windows.net/<container>/<prefix>'
  credentials=(azure_sas_token='<SAS_TOKEN>');

External stage (S3) template:
 -- S3 external stage
create or replace stage RAW_ORDERS
  url='s3://my-raw-bucket/orders/'
  credentials=(aws_key_id='$AWS_ACCESS_KEY_ID' aws_secret_key='$AWS_SECRET_ACCESS_KEY');

Snowpipe template:
 -- Snowpipe auto-ingest from external stage to table
create or replace pipe LOAD_ORDERS as
  copy into ORDERS_WAREHOUSE
  from @RAW_ORDERS
  file_format = (type = csv field_optionally_enclosed_by='"' skip_header = 1)
  on_error = 'CONTINUE';
-- Optionally set notifications (S3 SNS / Azure Event Grid) per Snowflake docs.


## 15. FSSPEC Paths (abfs://, s3://) + Config

Use pandas with fsspec-compatible URLs to read from object stores directly. This cell scans the YAML config for any sources with a `url` and shows how we would read them. For safety, set `FSSPEC_DEMO=1` to actually attempt reads.


In [22]:
import os, sys
from pathlib import Path
import yaml

sys.path.append('src')
from pipeline.toolkit import read_uri

cfg_path = Path('config/pipeline.example.yaml')
if not cfg_path.exists():
    print('No config found at', cfg_path)
else:
    cfg = yaml.safe_load(cfg_path.read_text(encoding='utf-8'))
    with_url = {k:v for k,v in cfg.get('sources',{}).items() if isinstance(v, dict) and 'url' in v}
    if not with_url:
        print('No url-based sources defined in config.')
    else:
        print('URL-based sources:')
        for k, meta in with_url.items():
            print(' -', k, '=>', meta['url'])

    if os.environ.get('FSSPEC_DEMO') == '1':
        for k, meta in with_url.items():
            try:
                so = meta.get('storage_options', {}) if isinstance(meta, dict) else {}
                print(f"Reading head from {k} ...")
                df = read_uri(meta['url'], storage_options=so)
                display(df.head())
            except Exception as e:
                print(f"[WARN] {k}: could not read ->", e)
    else:
        print('\nSet FSSPEC_DEMO=1 to attempt real reads; otherwise templates are shown above.')


URL-based sources:
 - abfs_orders_url => abfs://<container>@<account>.dfs.core.windows.net/orders/orders.csv
 - s3_orders_url => s3://my-raw-bucket/orders/orders.csv

Set FSSPEC_DEMO=1 to attempt real reads; otherwise templates are shown above.


## 16. Snowflake write_pandas (Append and Upsert)

Directly write a pandas DataFrame to Snowflake using `write_pandas`. This is convenient for demos and small/medium loads. For safety, set `DO_SNOWFLAKE_DEMO=1` to actually run; otherwise templates are printed.


In [23]:
import os
try:
    import snowflake.connector as sf
    from snowflake.connector.pandas_tools import write_pandas
    _has_sf = True
except Exception:
    _has_sf = False

do_demo = os.environ.get('DO_SNOWFLAKE_DEMO') == '1'
sf_user = os.environ.get('SNOWFLAKE_USER')
sf_pwd = os.environ.get('SNOWFLAKE_PASSWORD')
sf_account = os.environ.get('SNOWFLAKE_ACCOUNT')
sf_wh = os.environ.get('SNOWFLAKE_WAREHOUSE','COMPUTE_WH')
sf_db = os.environ.get('SNOWFLAKE_DATABASE','ANALYTICS')
sf_schema = os.environ.get('SNOWFLAKE_SCHEMA','PUBLIC')
table = 'ORDERS_WAREHOUSE'

append_template = """
from snowflake.connector import connect
from snowflake.connector.pandas_tools import write_pandas
conn = connect(user=..., password=..., account=..., warehouse='COMPUTE_WH', database='ANALYTICS', schema='PUBLIC')
success, nchunks, nrows, _ = write_pandas(conn, warehouse_df, table_name='ORDERS_WAREHOUSE', quote_identifiers=True, auto_create_table=True)
conn.close()
""".strip()

merge_template = """
-- Upsert pattern using a staging table
merge into ORDERS_WAREHOUSE t
using ORDERS_WAREHOUSE_LOAD s
on t."OrderId" = s."OrderId"
when matched then update set
  "CustomerId" = s."CustomerId",
  "NetAmount" = s."NetAmount",
  "UpdatedAtUtc" = s."UpdatedAtUtc",
  "SourceSystem" = s."SourceSystem"
when not matched then insert ("OrderId","CustomerId","NetAmount","UpdatedAtUtc","SourceSystem")
values (s."OrderId", s."CustomerId", s."NetAmount", s."UpdatedAtUtc", s."SourceSystem");
""".strip()

if not _has_sf or not do_demo or not (sf_user and sf_pwd and sf_account):
    print('write_pandas append template:\n', append_template)
    print('\nMERGE upsert template:\n', merge_template)
else:
    conn = sf.connect(user=sf_user, password=sf_pwd, account=sf_account, warehouse=sf_wh, database=sf_db, schema=sf_schema)
    try:
        ok, nchunks, nrows, _ = write_pandas(conn, warehouse_df, table_name=table, quote_identifiers=True, auto_create_table=True)
        print('Appended rows:', nrows, 'chunks:', nchunks, 'ok:', ok)
        # Optionally demonstrate upsert via staging table
        ok, nchunks, nrows, _ = write_pandas(conn, warehouse_df, table_name=f"{table}_LOAD", quote_identifiers=True, overwrite=True, auto_create_table=True)
        cur = conn.cursor();
        cur.execute("""
merge into ORDERS_WAREHOUSE t
using ORDERS_WAREHOUSE_LOAD s
on t."OrderId" = s."OrderId"
when matched then update set
  "CustomerId" = s."CustomerId",
  "NetAmount" = s."NetAmount",
  "UpdatedAtUtc" = s."UpdatedAtUtc",
  "SourceSystem" = s."SourceSystem"
when not matched then insert ("OrderId","CustomerId","NetAmount","UpdatedAtUtc","SourceSystem")
values (s."OrderId", s."CustomerId", s."NetAmount", s."UpdatedAtUtc", s."SourceSystem");
""")
        cur.close()
        print('Upsert (MERGE) applied from staging table')
    finally:
        conn.close()


write_pandas append template:
 from snowflake.connector import connect
from snowflake.connector.pandas_tools import write_pandas
conn = connect(user=..., password=..., account=..., warehouse='COMPUTE_WH', database='ANALYTICS', schema='PUBLIC')
success, nchunks, nrows, _ = write_pandas(conn, warehouse_df, table_name='ORDERS_WAREHOUSE', quote_identifiers=True, auto_create_table=True)
conn.close()

MERGE upsert template:
 -- Upsert pattern using a staging table
merge into ORDERS_WAREHOUSE t
using ORDERS_WAREHOUSE_LOAD s
on t."OrderId" = s."OrderId"
when matched then update set
  "CustomerId" = s."CustomerId",
  "NetAmount" = s."NetAmount",
  "UpdatedAtUtc" = s."UpdatedAtUtc",
  "SourceSystem" = s."SourceSystem"
when not matched then insert ("OrderId","CustomerId","NetAmount","UpdatedAtUtc","SourceSystem")
values (s."OrderId", s."CustomerId", s."NetAmount", s."UpdatedAtUtc", s."SourceSystem");


## 17. Semi-Structured JSON (VARIANT) Pattern

When structure is unknown or variable, land JSON into a `VARIANT` column and project only the fields you need later.


In [24]:
json_ddl = """
create or replace file format JSON_FF type = json;
create table if not exists RAW_JSON (
  v variant,
  src string,
  loaded_at timestamp_tz default current_timestamp()
);
""".strip()

json_load = """
-- Stage JSON files and load
PUT file://C:/path/to/events/*.json @~/json AUTO_COMPRESS=TRUE OVERWRITE=TRUE;
COPY INTO RAW_JSON FROM @~/json FILE_FORMAT=(FORMAT_NAME=>JSON_FF) ON_ERROR='CONTINUE';
""".strip()

json_project = """
-- Project to a typed table/view
create or replace table CURATED_JSON as
select
  v:"orderId"::string       as "OrderId",
  v:"customer"::string      as "CustomerId",
  v:"amount"::number        as "NetAmount",
  v:"updatedAt"::timestamp_tz as "UpdatedAtUtc"
from RAW_JSON;
""".strip()

print('-- JSON DDL:\n', json_ddl)
print('\n-- JSON LOAD:\n', json_load)
print('\n-- JSON PROJECT:\n', json_project)


-- JSON DDL:
 create or replace file format JSON_FF type = json;
create table if not exists RAW_JSON (
  v variant,
  src string,
  loaded_at timestamp_tz default current_timestamp()
);

-- JSON LOAD:
 -- Stage JSON files and load
PUT file://C:/path/to/events/*.json @~/json AUTO_COMPRESS=TRUE OVERWRITE=TRUE;
COPY INTO RAW_JSON FROM @~/json FILE_FORMAT=(FORMAT_NAME=>JSON_FF) ON_ERROR='CONTINUE';

-- JSON PROJECT:
 -- Project to a typed table/view
create or replace table CURATED_JSON as
select
  v:"orderId"::string       as "OrderId",
  v:"customer"::string      as "CustomerId",
  v:"amount"::number        as "NetAmount",
  v:"updatedAt"::timestamp_tz as "UpdatedAtUtc"
from RAW_JSON;


## 18. Automation & Scheduling (Safe Templates)

This section shows how to automate daily runs on Windows using Task Scheduler. It also provides templates for Airflow and GitHub Actions. These examples are safe: they only print the exact commands unless you opt in.

- Daily run target (with optional Snowflake write):
  - `python scripts/run_pipeline.py --run`
  - `python scripts/run_pipeline.py --run --write-snowflake upsert`
- Environment: use `.env` (see `.env.example`); the CLI auto-loads it.
- Helper script: `scripts/scheduled_run.ps1` wraps activation, execution, and logging.


In [25]:
import os, textwrap
repo = r"C:\\Users\\pertt\\Alleyfoo\\pipe-transformation"
ps_cmd = "cd '" + repo + "'; . .\\.venv\\Scripts\\Activate.ps1; python scripts\\run_pipeline.py --run --write-snowflake upsert"
print('Task Scheduler (PowerShell) templates:\n')
print('1) UI setup (Action):\n  Program: powershell.exe\n  Arguments: -NoProfile -ExecutionPolicy Bypass -Command "' + ps_cmd + '"')
print('\n2) Register via PowerShell:')
print(textwrap.dedent(f"""
$action  = New-ScheduledTaskAction -Execute 'powershell.exe' -Argument "-NoProfile -ExecutionPolicy Bypass -Command \"{ps_cmd}\""
$trigger = New-ScheduledTaskTrigger -Daily -At 2:00am
Register-ScheduledTask -Action $action -Trigger $trigger -TaskName 'PipeTransformationDaily' -Description 'Run pipeline demo daily' -RunLevel Highest
"""))
print('\n3) Log rotation tip (inside scheduled_run.ps1): Tee-Object to logs\\run-<timestamp>.log')

print('\nAirflow DAG (high-level): extract -> bronze -> silver -> warehouse')
print('See earlier Section 5 for an example DAG definition (pseudo-code).')

print('\nGitHub Actions (Windows runner) outline:')
print(textwrap.dedent("""
name: nightly-pipeline
on:
  schedule:
    - cron: '0 2 * * *'
jobs:
  run:
    runs-on: windows-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: '3.12' }
      - run: python -m pip install -r requirements.txt
      - run: python scripts/run_pipeline.py --run
        env:
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_WAREHOUSE: COMPUTE_WH
          SNOWFLAKE_DATABASE: ANALYTICS
          SNOWFLAKE_SCHEMA: PUBLIC
"""))


Task Scheduler (PowerShell) templates:

1) UI setup (Action):
  Program: powershell.exe
  Arguments: -NoProfile -ExecutionPolicy Bypass -Command "cd 'C:\\Users\\pertt\\Alleyfoo\\pipe-transformation'; . .\.venv\Scripts\Activate.ps1; python scripts\run_pipeline.py --run --write-snowflake upsert"

2) Register via PowerShell:

$action  = New-ScheduledTaskAction -Execute 'powershell.exe' -Argument "-NoProfile -ExecutionPolicy Bypass -Command "cd 'C:\\Users\\pertt\\Alleyfoo\\pipe-transformation'; . .\.venv\Scripts\Activate.ps1; python scripts\run_pipeline.py --run --write-snowflake upsert""
$trigger = New-ScheduledTaskTrigger -Daily -At 2:00am
Register-ScheduledTask -Action $action -Trigger $trigger -TaskName 'PipeTransformationDaily' -Description 'Run pipeline demo daily' -RunLevel Highest


3) Log rotation tip (inside scheduled_run.ps1): Tee-Object to logs\run-<timestamp>.log

Airflow DAG (high-level): extract -> bronze -> silver -> warehouse
See earlier Section 5 for an example DAG defini