# Zero to Snowflake - Simple Data Pipeline

**Asset:** Zero to Snowflake - Simple Data Pipeline  
**Version:** v1  
**Copyright(c):** 2025 Snowflake Inc. All rights reserved.

## 目次
1. 外部ステージからの取り込み
2. 半構造化データとVARIANTデータ型
3. 動的テーブル
4. 動的テーブルによるシンプルなパイプライン
5. 有向非循環グラフ（DAG）によるパイプライン可視化

In [None]:
ALTER SESSION SET query_tag = '{"origin":"sf_sit-is","name":"tb_101_v2","version":{"major":1, "minor":1},"attributes":{"is_quickstart":0, "source":"tastybytes", "vignette": "data_pipeline"}}';

コンテキストの設定

In [None]:
USE DATABASE tb_101;
USE ROLE tb_data_engineer;
USE WAREHOUSE tb_de_wh;

## 1. 外部ステージからの取り込み

**SQLリファレンス:** https://docs.snowflake.com/en/sql-reference/sql/copy-into-table

Snowflakeでは、ステージはデータファイルが保存される場所を指定する名前付きデータベースオブジェクトで、テーブルへのデータの読み込みとテーブルからのデータの書き出しを可能にします。

In [None]:
CREATE OR REPLACE STAGE raw_pos.menu_stage
COMMENT = 'メニューデータ用ステージ'
URL = 's3://sfquickstarts/frostbyte_tastybytes/raw_pos/menu/'
FILE_FORMAT = public.csv_ff;

In [None]:
CREATE OR REPLACE TABLE raw_pos.menu_staging
(
    menu_id NUMBER(19,0),
    menu_type_id NUMBER(38,0),
    menu_type VARCHAR(16777216),
    truck_brand_name VARCHAR(16777216),
    menu_item_id NUMBER(38,0),
    menu_item_name VARCHAR(16777216),
    item_category VARCHAR(16777216),
    item_subcategory VARCHAR(16777216),
    cost_of_goods_usd NUMBER(38,4),
    sale_price_usd NUMBER(38,4),
    menu_item_health_metrics_obj VARIANT
);

In [None]:
COPY INTO raw_pos.menu_staging
FROM @raw_pos.menu_stage;

In [None]:
SELECT * FROM raw_pos.menu_staging;

## 2. Snowflakeでの半構造化データ

**ユーザーガイド:** https://docs.snowflake.com/en/sql-reference/data-types-semistructured

SnowflakeはVARIANTデータ型を使用してJSONなどの半構造化データの処理に優れています。

In [None]:
SELECT menu_item_health_metrics_obj FROM raw_pos.menu_staging;

VARIANTオブジェクトからデータを抽出する方法:
- コロン演算子（:）はキー名でデータにアクセス
- 角括弧（[]）は数値位置で配列から要素を選択
- キャスト構文: `::` を使用

In [None]:
SELECT
    menu_item_name,
    CAST(menu_item_health_metrics_obj:menu_item_id AS INTEGER) AS menu_item_id,
    menu_item_health_metrics_obj:menu_item_health_metrics[0]:ingredients::ARRAY AS ingredients
FROM raw_pos.menu_staging;

FLATTEN関数を使用して配列を展開

In [None]:
SELECT
    i.value::STRING AS ingredient_name,
    m.menu_item_health_metrics_obj:menu_item_id::INTEGER AS menu_item_id
FROM
    raw_pos.menu_staging m,
    LATERAL FLATTEN(INPUT => m.menu_item_health_metrics_obj:menu_item_health_metrics[0]:ingredients::ARRAY) i;

## 3. 動的テーブル

**ユーザーガイド:** https://docs.snowflake.com/en/user-guide/dynamic-tables-about

動的テーブルは、データ変換パイプラインを簡素化するために設計された強力なツールです。

主な特徴:
- 宣言的構文で作成
- 自動データ更新
- データの新鮮さを管理

In [None]:
CREATE OR REPLACE DYNAMIC TABLE harmonized.ingredient
    LAG = '1 minute'
    WAREHOUSE = 'TB_DE_WH'
AS
    SELECT
    ingredient_name,
    menu_ids
FROM (
    SELECT DISTINCT
        i.value::STRING AS ingredient_name,
        ARRAY_AGG(m.menu_item_id) AS menu_ids
    FROM
        raw_pos.menu_staging m,
        LATERAL FLATTEN(INPUT => menu_item_health_metrics_obj:menu_item_health_metrics[0]:ingredients::ARRAY) i
    GROUP BY i.value::STRING
);

In [None]:
SELECT * FROM harmonized.ingredient;

新しいメニュー項目（バインミーサンドイッチ）を追加して動的テーブルの自動更新を実演

In [None]:
INSERT INTO raw_pos.menu_staging 
SELECT 
    10101,
    15,
    'Sandwiches',
    'Better Off Bread',
    157,
    'Banh Mi',
    'Main',
    'Cold Option',
    9.0,
    12.0,
    PARSE_JSON('{
      "menu_item_health_metrics": [
        {
          "ingredients": [
            "French Baguette",
            "Mayonnaise",
            "Pickled Daikon",
            "Cucumber",
            "Pork Belly"
          ],
          "is_dairy_free_flag": "N",
          "is_gluten_free_flag": "N",
          "is_healthy_flag": "Y",
          "is_nut_free_flag": "Y"
        }
      ],
      "menu_item_id": 157
    }'
);

新しい原材料が追加されたことを確認（動的テーブルが更新されるまで最大1分待機）

In [None]:
SELECT * FROM harmonized.ingredient 
WHERE ingredient_name IN ('French Baguette', 'Pickled Daikon');

## 4. 動的テーブルによるシンプルなパイプライン

次の動的テーブルを作成して、原材料とメニュー項目の関係を追跡します。

In [None]:
CREATE OR REPLACE DYNAMIC TABLE harmonized.ingredient_to_menu_lookup
    LAG = '1 minute'
    WAREHOUSE = 'TB_DE_WH'    
AS
SELECT
    i.ingredient_name,
    m.menu_item_health_metrics_obj:menu_item_id::INTEGER AS menu_item_id
FROM
    raw_pos.menu_staging m,
    LATERAL FLATTEN(INPUT => m.menu_item_health_metrics_obj:menu_item_health_metrics[0]:ingredients) f
JOIN harmonized.ingredient i ON f.value::STRING = i.ingredient_name;

In [None]:
SELECT * 
FROM harmonized.ingredient_to_menu_lookup
ORDER BY menu_item_id;

テスト用の注文データを挿入

In [None]:
INSERT INTO raw_pos.order_header
SELECT 
    459520441,
    15,
    1030,
    101565,
    null,
    200322900,
    TO_TIMESTAMP_NTZ('08:00:00', 'hh:mi:ss'),
    TO_TIMESTAMP_NTZ('14:00:00', 'hh:mi:ss'),
    null,
    TO_TIMESTAMP_NTZ('2022-01-27 08:21:08.000'),
    null,
    'USD',
    14.00,
    null,
    null,
    14.00;

In [None]:
INSERT INTO raw_pos.order_detail
SELECT
    904745311,
    459520441,
    157,
    null,
    0,
    2,
    14.00,
    28.00,
    null;

トラック別の原材料使用量を追跡する動的テーブルを作成

In [None]:
CREATE OR REPLACE DYNAMIC TABLE harmonized.ingredient_usage_by_truck 
    LAG = '2 minute'
    WAREHOUSE = 'TB_DE_WH'  
    AS 
    SELECT
        oh.truck_id,
        EXTRACT(YEAR FROM oh.order_ts) AS order_year,
        MONTH(oh.order_ts) AS order_month,
        i.ingredient_name,
        SUM(od.quantity) AS total_ingredients_used
    FROM
        raw_pos.order_detail od
        JOIN raw_pos.order_header oh ON od.order_id = oh.order_id
        JOIN harmonized.ingredient_to_menu_lookup iml ON od.menu_item_id = iml.menu_item_id
        JOIN harmonized.ingredient i ON iml.ingredient_name = i.ingredient_name
        JOIN raw_pos.location l ON l.location_id = oh.location_id
    WHERE l.country = 'United States'
    GROUP BY
        oh.truck_id,
        order_year,
        order_month,
        i.ingredient_name
    ORDER BY
        oh.truck_id,
        total_ingredients_used DESC;

In [None]:
SELECT
    truck_id,
    ingredient_name,
    SUM(total_ingredients_used) AS total_ingredients_used,
FROM
    harmonized.ingredient_usage_by_truck
WHERE
    order_month = 1
    AND truck_id = 15
GROUP BY truck_id, ingredient_name
ORDER BY total_ingredients_used DESC;

## 5. 有向非循環グラフ（DAG）によるパイプライン可視化

DAGを使用してデータパイプラインを視覚化できます。

DAGにアクセスするには:
- ナビゲーションメニューの「データ」ボタンをクリック
- 「TB_101」を展開
- 「HARMONIZED」を展開し、「動的テーブル」を展開
- 「INGREDIENT」テーブルをクリック

---
## RESET
---

In [None]:
USE ROLE accountadmin;

In [None]:
DROP TABLE IF EXISTS raw_pos.menu_staging;
DROP TABLE IF EXISTS harmonized.ingredient;
DROP TABLE IF EXISTS harmonized.ingredient_to_menu_lookup;
DROP TABLE IF EXISTS harmonized.ingredient_usage_by_truck;

In [None]:
DELETE FROM raw_pos.order_detail
WHERE order_detail_id = 904745311;
DELETE FROM raw_pos.order_header
WHERE order_id = 459520441;

In [None]:
ALTER SESSION UNSET query_tag;

In [None]:
ALTER WAREHOUSE tb_de_wh SUSPEND;