# ML を動かしてみよう

##  お題
TPC-H にある注文データを使用し、3か月間の取引金額予測を行います。

※ 本ハンズオンでは、時間に限りがあるため、特徴量選択やモデルのパラメータチューニング等は省略させていただきますが、
実際にモデル構築を行う際は、目的に応じてご検討いただくのが良いかと思います。


## 準備

Notebook のセクションと同じく、認証情報を入力してセッションを作成します。

In [None]:
from snowflake.snowpark.session import Session
from config import connection_parameters

# config.py の内容：
# connection_parameters = {
#     'account': '<org_name>-<account_name>',  # お使いの Snowflake アカウントの識別子
#     'user': '<your_username>',  # Snowflake アカウントにサインインするユーザー名
#     'password': '<your_password>',  # ユーザーのパスワード
#     'role': 'SYSADMIN',  # ハンズオンで使用するロール。変更は不要です
#     'database': 'DEGEEKS_HO_DB',  # ハンズオンで使用するデータベース。存在しなければ新規作成します（「gegeeks_ho_notebook.ipynb」をご参照ください）
#     'schema': 'PUBLIC',  # ハンズオンで使用するスキーマ
#     'warehouse': 'DEGEEKS_HO_WH'  # ハンズオンで使用するウェアハウス。存在しなければ新規作成します（「gegeeks_ho_notebook.ipynb」をご参照ください）
# }

session = Session.builder.configs(connection_parameters).create()
print(session)

In [None]:
session.sql(f"use database {connection_parameters['database']}").collect()
session.sql(f"use schema {connection_parameters['schema']}").collect()
session.sql(f"use warehouse {connection_parameters['warehouse']}").collect()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

モデル保存用のステージを作成します。


In [None]:
model_stage_name = 'model_stage'
session.sql(f'create or replace stage "{model_stage_name}";').collect()

## 前処理

### データセットを作成する
日次で注文に関する金額・個数情報をサマリしたデータを作成します。
今回は、TPC-H のサンプルクエリとほぼ同じ項目を使用します。
https://docs.snowflake.com/ja/user-guide/sample-data-tpch.html

なお、実際に予測モデルを作成する際は、特徴量の選択やデータクレンジングなどをしっかり行うことをお勧めします。



In [None]:
from snowflake.snowpark.window import Window
import snowflake.snowpark.functions as F


table_name = 'TPCH_SF1_ORDER_SUMMARY'

# 移動平均をとるためにウインドウ関数の条件を作っておく
# rows_between() で発行されるクエリが ROW BETWEEN <start> FOLLOWING  AND <end> なので合わせる
window_3days = Window.order_by(F.col('ORDER_DATE').desc()).rows_between(Window.CURRENT_ROW, 2)
window_1weeks = Window.order_by(F.col('ORDER_DATE').desc()).rows_between(Window.CURRENT_ROW, 6)

df_orders = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS")
df_lineitem = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM")
df = df_orders.join(
    df_lineitem,
    df_orders.col('O_ORDERKEY') == df_lineitem.col('L_ORDERKEY'),
    how='inner'
).with_column_renamed(
    F.col('O_ORDERDATE'), 'ORDER_DATE'
).group_by(
    F.col('ORDER_DATE')
).agg(
    F.sum(F.col('L_EXTENDEDPRICE')).name('SUM_BASE_PRICE'),  # 合計価格
    F.avg(F.col('SUM_BASE_PRICE')).over(window_3days).name('MOVING_3DAY_SUM_BASE_PRICE_AVG'),  # 合計金額の3日間移動平均
    F.avg(F.col('SUM_BASE_PRICE')).over(window_1weeks).name('MOVING_1WEEK_SUM_BASE_PRICE_AVG'),  # 合計金額の1週間移動平均
    F.sum(F.col('L_EXTENDEDPRICE') * (1-F.col('L_DISCOUNT'))).name('SUM_DISC_PRICE'),  # 割引合計価格
    F.sum(F.col('L_EXTENDEDPRICE') * (1-F.col('L_DISCOUNT')) * (1+F.col('L_TAX'))).name('SUM_CHARGE'),  # 割引合計価格+税金
    F.sum(F.col('L_QUANTITY')).name('SUM_QTY'),  # 合計数量
    F.avg(F.col('SUM_QTY')).over(window_3days).name('MOVING_3DAY_SUM_QTY_AVG'),  # 合計数量の3日間移動平均
    F.avg(F.col('SUM_QTY')).over(window_1weeks).name('MOVING_1WEEK_SUM_QTY_AVG'),  # 合計数量の1週間移動平均
    F.avg(F.col('L_QUANTITY')).name('AVG_QTY'),  # 平均数量
    F.avg(F.col('L_EXTENDEDPRICE')).name('AVG_PRICE'),  # 平均価格
    F.avg(F.col('L_DISCOUNT')).name('AVG_DISC'),  # 平均割引
    F.count(F.col('L_ORDERKEY')).name('COUNT_ORDER')  # 注文件数
).order_by(
    F.col('ORDER_DATE').asc()
)


SQLで書くと、このような処理：

<details>
<summary>クエリ</summary>

```sql

-- 注文件数か金額を日別で並べる
-- TPC-Hのサンプルクエリと同じ数値を日別集計しただけ
-- 本当なら、注文ステータスや優先度なども見るべきじゃないかしら
-- https://docs.snowflake.com/ja/user-guide/sample-data-tpch.html
create or replace temporary table TPCH_SF1_ORDER_SUMMARY as
    select -- top 1000000
        orders.o_orderdate as order_date,
        sum(l_extendedprice) as sum_base_price,  -- 合計価格
        avg(sum_base_price) over (
            order by order_date asc 
            rows between 2 preceding and current row) as moving_3day_sum_base_price_avg, -- 合計金額の3日間移動平均
        avg(sum_base_price) over (
            order by order_date asc 
            rows between 6 preceding and current row) as moving_week_sum_base_price_avg, -- 合計金額の7日間移動平均
        avg(sum_base_price) over (
            order by order_date asc 
            rows between 13 preceding and current row) as moving_2week_sum_base_price_avg, -- 合計金額の14日間移動平均
        sum(l_extendedprice * (1-l_discount)) as sum_disc_price,  -- 割引合計価格
        sum(l_extendedprice * (1-l_discount) * (1+l_tax)) as sum_charge, -- 割引合計価格+税金
        sum(l_quantity) as sum_qty, -- 合計数量
        avg(sum_qty) over (
            order by order_date asc 
            rows between 2 preceding and current row) as moving_3day_sum_qty_avg, -- 合計数量の3日間移動平均
        avg(sum_qty) over (
            order by order_date asc 
            rows between 6 preceding and current row) as moving_week_sum_qty_avg, -- 合計数量の7日間移動平均
        avg(sum_qty) over (
            order by order_date asc 
            rows between 13 preceding and current row) as moving_2week_sum_qty_avg, -- 合計数量の14日間移動平均
        avg(l_quantity) as avg_qty,  -- 平均数量
        avg(l_extendedprice) as avg_price,  -- 平均合計価格
        avg(l_discount) as avg_disc,  -- 平均割引の合計
        count(distinct l_orderkey) as count_order  -- 注文件数
        -- count(*) as count_lineitem -- 注文ごとのラインアイテム数合計
    from
        SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS as orders
    inner join SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM as lineitem
        on orders.o_orderkey = lineitem.l_orderkey
    group by
        order_date;
```

</details>

In [None]:
df.limit(10).to_pandas()


In [None]:
df.describe().to_pandas()


In [None]:
import snowflake.snowpark.functions as F
from matplotlib import pyplot


df.select(F.col('ORDER_DATE'), F.col('COUNT_ORDER')).to_pandas().set_index('ORDER_DATE').plot(figsize=(15,5))

事前にデータセットをトレーニング用/評価用に分けます。
TPC-H データには 1992-01-01 から 1998-08-02 のデータが含まれています(2023年2月10日現在)。
今回は、

- トレーニング用： 1992-01-01 ～ 1997-12-31
- 評価用：1998-01-01～1998-03-31

と分割します。

In [None]:
train_table_name = 'TPCH_SF1_ORDER_SUMMARY_TRAIN'
test_table_name = 'TPCH_SF1_ORDER_SUMMARY_TEST'

In [None]:
import snowflake.snowpark.types as T


df_train = df.where(F.col('ORDER_DATE') <= '1997-12-31')
df_test = df.where(
    (F.col('ORDER_DATE') >= '1998-01-01')
    & (F.col('ORDER_DATE') < '1998-03-31')
)

df_train.write.mode("overwrite").save_as_table(train_table_name, table_type="temporary")
df_test.write.mode("overwrite").save_as_table(test_table_name, table_type="temporary")

時間のある方は、データを見ておきましょう。

In [None]:
df_train.select(F.col('ORDER_DATE'), F.col('COUNT_ORDER')).to_pandas().set_index('ORDER_DATE').plot(figsize=(15,5))

In [None]:
df_test.select(F.col('ORDER_DATE'), F.col('COUNT_ORDER')).to_pandas().set_index('ORDER_DATE').plot(figsize=(15,5))

## 学習
学習を実行してみましょう。
今回は、ストアドプロシージャで学習の実行と学習済みモデルの保存を行います。
<!-- 先日の Snowflake Summit 2023 にて発表された[Snowpark ML](https://docs.snowflake.com/developer-guide/snowpark-ml/index)を早速使用してみましょう！Snowpark MLは、MLモデル構築・デプロイのためのツールセットです。主要なコンポーネントとして、ML モデルを効率的に開発するための Snowpark ML Development（`snowflake.ml.modeling`）とモデル管理機能をもつ Snowpark ML Ops（プライベートプレビュー中）が含まれます。  
今回のハンズオンでは、Snowpark ML Development を使い、データの前処理、特徴量エンジニアリング、トレーニングを行ってみましょう -->


学習を実行する際、集計や変換などの処理より大きなメモリが必要な場合があります。その場合は Snowpark 最適化インスタンスを使うとよいでしょう。

In [None]:
import snowflake.snowpark
import snowflake.snowpark.types as T
from snowflake.snowpark import FileOperation
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session
# from snowflake.ml.modeling.xgboost import XGBRegressor
from xgboost import XGBRegressor

import io
import dill

import pandas as pd


In [None]:
def save_file(
            session: snowflake.snowpark.Session, 
            model, 
            path: str
        ) -> str:
    ''' model を session で接続している環境の path に保存する'''
    input_stream = io.BytesIO()
    dill.dump(model, input_stream)
    try:
        session._conn._cursor.upload_stream(input_stream, path)
        msg = "successfully created file: " + path
    except Exception as e:
        msg = f'upload stream no exists. ({e})'
    
    return msg



def train_xgboost_model(
        session: Session, 
        training_table: str,
        feature_cols: list,
        target_col: str,
        model_name: str) -> T.Variant:
    # 対象データを取得
    local_training_data = session.table(training_table).to_pandas()
    X = local_training_data[feature_cols]
    y = local_training_data[target_col]

    # 学習
    xgbmodel = XGBRegressor(random_state=123)
    xgbmodel.fit(X,y)
 
    # 特徴量重要度を取得
    feat_importance = pd.DataFrame(xgbmodel.feature_importances_,feature_cols,columns=['FeatImportance']).to_dict()

    # モデルを内部ステージに保存
    print(save_file(session, xgbmodel, f'@"{model_stage_name}"/{model_name}'))

    return feat_importance


ストアドプロシージャを登録し、実行しましょう。
ここでは `register()` を使用ししますが、デコレータ `@register()` を使う方法もあります。


In [None]:
train_xgboost_model_sproc = session.sproc.register(
    train_xgboost_model,
    stage_location=f'@"{model_stage_name}"',
    packages=['snowflake-snowpark-python', 'xgboost', 'pandas', 'dill'],
    replace=True
)

In [None]:
training_table = 'TPCH_SF1_ORDER_SUMMARY_TRAIN'
model_name = 'xgboost_model.sav'

feature_cols = df_train.columns
# target_col = 'SUM_BASE_PRICE'  # 当たりすぎる
target_col = 'COUNT_ORDER'
feature_cols.remove(target_col)
feature_cols.remove('ORDER_DATE')

feat_importance = train_xgboost_model_sproc(
    training_table, 
    feature_cols,
    target_col,
    model_name, 
    session=session
)

print(feat_importance)

## 予測
予測を実行します。
UDF を作成して、Snowflake にあるテーブルに対して予測を実行し、結果を Snowflake テーブルに保存します。


In [None]:
import sys
import cachetools
import os
import dill
from snowflake.snowpark.functions import udf

In [None]:
%%time


@cachetools.cached(cache={})
def load_from_snowflake_import_dir(filename):
    '''指定したモデルファイルを読み込む。
    対象ファイルがインポートされているか、対象ファイルが存在するステージが stage_location に指定されていることが必要'''
    import_dir = sys._xoptions.get('snowflake_import_directory')
    with open(os.path.join(import_dir, filename), 'rb') as file:
        m = dill.load(file)
        return m

def predict(args: list) -> float:
    model = load_from_snowflake_import_dir(model_name)  
    row = pd.DataFrame(
        [args],
        columns=feature_cols
    )
    return model.predict(row)


predict_xgboost_regression_udf = session.udf.register(
    func=predict, 
    name='predict_xgboost_regression_udf', 
    stage_location=f'@"{model_stage_name}"',
    return_type = T.FloatType(),
    replace=True, 
    is_permanent=True, 
    imports=[f'@"{model_stage_name}"/{model_name}'],
    packages=['pandas', 'xgboost', 'cachetools', 'dill'], 
    session=session
)

予測を実行します。

In [None]:
pred_table_name = 'TPCH_SF1_ORDER_SUMMARY_PRED'

In [None]:
%%time
df_test.select(
      F.col('ORDER_DATE'),
      F.col(target_col),
      F.call_udf("predict_xgboost_regression_udf", F.array_construct(*feature_cols)).alias(f'PREDICTED_{target_col}')
).write.mode('overwrite').saveAsTable(pred_table_name)


予測結果を確認してみましょう。

なお、ここでは流れで Notebook 上でグラフを確認しますが、Snowsight で見ることも可能です。モデルを運用していく場合、Snowsight 上で精度確認できるようなダッシュボードを作っておくとよいかもしれません。

現在プライベートプレビュー中の Streamlit in Snowflake が公開されたら、精度可視化アプリケーションを作成していつでも精度確認できるようにするのも良さそうですね。

In [None]:
pdf_score = session.table(pred_table_name).toPandas()

In [None]:
pdf_score.set_index('ORDER_DATE').plot(figsize=(15,5))

In [None]:
pdf_score