In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import upper, col, avg
from snowflake.snowpark.types import StringType
from snowflake.snowpark.context import get_active_session

from snowflake.ml.modeling.preprocessing import StandardScaler, OrdinalEncoder
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.pipeline import Pipeline

In [None]:
session = get_active_session()

In [None]:
CREATE OR REPLACE STAGE SPROC_STAGE;

In [None]:
# 1つめのストアドプロシージャ
# cleaningする

def sproc_clean_customer_data(session: Session) -> str:
    df = session.table("CUSTOMER_DATA_1000")
    df_clean = (
        df.dropna().with_column("GENDER", upper(col("GENDER")))
    )
    df_clean.write.mode("overwrite")\
        .save_as_table("CUSTOMER_CLEAN")

    return "updated"
    

In [None]:
session.sproc.register(
    func = sproc_clean_customer_data,
    name = "SP_CLEAN_CUSTOMER_DATA",
    packages = ["snowflake-snowpark-python"],
    input_types = [],
    return_type = StringType(),
    is_permanent=True,
    stage_location="@SPROC_STAGE",
    replace=True
)

In [None]:
# 2)モデリングをするストアドプロシージャ
def sproc_train_churn_model(session:Session) -> str:
    df = session.table("CUSTOMER_CLEAN")
    train_df, test_df = df.random_split([0.8,0.2],seed=1)

    scl = StandardScaler(
        input_cols=["AGE","ANNUAL_INCOME"],
        output_cols=["AGE","ANNUAL_INCOME"],
        passthrough_cols=["GENDER","CHURN"],
        drop_input_cols=True
    )

    enc = OrdinalEncoder(
        input_cols = ["GENDER"],
        output_cols = ["GENDER_CODE"],
        passthrough_cols=["AGE","ANNUAL_INCOME","CHURN"],
        drop_input_cols=True
    )

    clf = XGBClassifier(
        input_cols=["AGE","ANNUAL_INCOME","GENDER_CODE"],
        label_cols=["CHURN"],
        output_cols=["PREDICTED_CHURN"],
    )

    pipeline = Pipeline(steps=[("scl",scl),("enc",enc),("clf",clf)])

    model = pipeline.fit(train_df)

    pred_df = model.predict(test_df).select(
        "ID","GENDER_CODE","CHURN","PREDICTED_CHURN"
    )

    pred_df.write.mode("overwrite").save_as_table("CUSTOMER_PREDICTION")

    return "updated"

In [None]:
session.sproc.register(
    func = sproc_train_churn_model,
    name = "SP_TRAIN_CHURN_MODEL",
    packages = ["snowflake-snowpark-python", "snowflake-ml-python"],
    input_types = [],
    return_type = StringType(),
    is_permanent=True,
    stage_location="@SPROC_STAGE",
    replace=True,
    execute_as_owner=True    
)

In [None]:
CREATE OR REPLACE TASK TASK1_CLEAN
    WAREHOUSE = 'SAMPLE_WH'
    SCHEDULE = 'USING CRON 55 11 * * * Asia/Tokyo'
AS CALL SP_CLEAN_CUSTOMER_DATA();

In [None]:
CREATE OR REPLACE TASK TASK2_MODEL
    WAREHOUSE = 'SAMPLE_WH'
    AFTER TASK1_CLEAN
AS CALL SP_TRAIN_CHURN_MODEL();

In [None]:
ALTER TASK TASK1_CLEAN SUSPEND;

In [None]:
ALTER TASK TASK2_MODEL SUSPEND;

In [None]:
ALTER TASK TASK2_MODEL RESUME;

In [None]:
ALTER TASK TASK1_CLEAN RESUME;

In [None]:
select * from CUSTOMER_PREDICTION;