# Initialize Environment for Distributed Processing

In [None]:
# Some of the features in this HOL are in preview. Pin the version of snowflake-ml-python for reproducibility. 
# NOTE - you do not need to restart the kernel since we're running this before importing snowflake-ml-python.
!pip install snowflake-ml-python==1.8.3

In [None]:
!pip freeze | grep snowflake

In [None]:
import ray
import logging
logging.getLogger().setLevel(logging.WARNING)


context = ray.data.DataContext.get_current()
context.execution_options.verbose_progress = False
context.enable_operator_progress_bars = False
context.enable_progress_bars = False

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
from snowflake.ml.runtime_cluster import scale_cluster

# Scale out the notebook to have multiple nodes available for execution
SCALE_FACTOR = 2
scale_cluster(SCALE_FACTOR)

# Process Review Text Data
- Load reviews with `SFStageTextDataSource`
- Parse review text with Ray data

In [None]:
# use ray data to process sentiment 
from snowflake.ml.ray.datasource import SFStageTextDataSource

file_name = "*.txt"
stage_name = "REVIEWS"

text_source = SFStageTextDataSource(
    stage_location=stage_name,
    file_pattern=file_name
)

text_dataset = ray.data.read_datasource(text_source)


In [None]:
def parse_reviews(batch):
    """
    Parse reviews to extract UUID and review text from the input string.
    
    Args:
        batch: Dictionary containing 'text' and 'file_name' keys
        
    Returns:
        Dictionary with parsed UUID and review text
    """
    # Initialize empty dictionary for results
    parsed_data = {}
    
    value = batch["text"]
    # Split on the first occurrence of comma
    parts = value.split('","', 1)
    
    # Clean up the UUID (remove leading/trailing quotes)
    uuid = parts[0].strip('"')
    
    # Clean up the review text (remove trailing quote)
    review_text = parts[1].rstrip('"')
    
    # Store parsed values
    parsed_data['UUID'] = uuid
    parsed_data['REVIEW_TEXT'] = review_text
        
    return parsed_data

# Apply the parsing function to the dataset
parsed_dataset = text_dataset.map(parse_reviews)

# Predict Review Quality
- Predict the quality with one-shot classification via HF pipeline

In [None]:
from transformers import pipeline
import numpy as np


class ModelPredictor:
    def __init__(self):
        # Load model
        self.classifier = pipeline("zero-shot-classification",
                      model="facebook/bart-large-mnli")

    # define your batch operations    
    def __call__(self, batch):
        candidate_labels = ['detailed with specific information and experience', 'basic accurate information', 'generic brief with no details']
        resp = self.classifier(batch["REVIEW_TEXT"].tolist(), candidate_labels)

        # Handle both resp and batch results
        if isinstance(resp, dict):
            raise ValueError(f"Expected batch response, got {resp} for batch {batch['REVIEW_TEXT']}")
            
        # Add results to batch
        batch["REVIEW_QUALITY"] = np.array([result["labels"][np.argmax(result["scores"])] for result in resp])
        

        return batch

# Apply batch operations to your dataset. HF Pipeline is itself a batch operation, so we use Ray data just to scale across nodes, setting concurrency to number of nodes we have started.
dataset = parsed_dataset.map_batches(ModelPredictor, concurrency=SCALE_FACTOR, batch_size=10, num_cpus=25)

# Store Processed Data in Snowflake

In [None]:
from snowflake.ml.ray.datasink.table_data_sink import SnowflakeTableDatasink

datasink = SnowflakeTableDatasink(
    table_name="REVIEWS",
    auto_create_table=True,
    override=False,
    )
dataset.write_datasink(datasink)

In [None]:
show tables;

# Execute Sentiment Analysis

In [None]:
ALTER TABLE
  REVIEWS
ADD
  COLUMN if not exists REVIEW_SENTIMENT FLOAT
  /* Add the REVIEW_SENTIMENT column */;
  /* Update the table with sentiment analysis */
UPDATE
  REVIEWS
SET
  REVIEW_SENTIMENT = SNOWFLAKE.CORTEX.SENTIMENT (REVIEW_TEXT);

In [None]:
select * from reviews limit 10;

# Prepare Data for Model Training

In [None]:
tabular_data = session.table("TABULAR_DATA")
review_data = session.table("REVIEWS")

train_dataframe = tabular_data.join(
    review_data,
    review_data['UUID'] == tabular_data['UUID'],
    'inner'
)

In [None]:
train_dataframe.count()

In [None]:
train_dataframe.columns

In [None]:
# Encode review sentiment and review quality
from snowflake.ml.modeling.preprocessing import LabelEncoder

# Select the columns to encode
columns_to_encode = ["REVIEW_QUALITY", "PRODUCT_LAYOUT"]

# Initialize LabelEncoder for each column
encoders = [LabelEncoder(input_cols=[col], output_cols=[f"{col}_OUT"]) for col in columns_to_encode]
for encoder in encoders:
    train_dataframe = encoder.fit(train_dataframe).transform(train_dataframe)


# Train an XGBoost Model
- Trains an XGBoost model over two nodes using Snowflake distributed `XGBEstimator`

In [None]:
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector

INPUT_COLS = ["REVIEW_QUALITY_OUT", "PRODUCT_LAYOUT_OUT", "PAGE_LOAD_TIME", "REVIEW_SENTIMENT", "PRODUCT_RATING"]
LABEL_COL = 'PURCHASE_DECISION'

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)


dc = DataConnector.from_dataframe(train_dataframe)
xgb_model = estimator.fit(
    dc, input_cols=INPUT_COLS, label_col=LABEL_COL
)

# Register and Deploy the Model
- Register model to Snowflake Model Registry
- Deploy code outside of notebook using ML Jobs

In [None]:
from snowflake.ml.registry import registry
reg = registry.Registry(session=session)

# Log the model in Snowflake Model Registry
model_ref = reg.log_model(
    model_name="deployed_xgb",
    model=xgb_model,
    conda_dependencies=["scikit-learn","xgboost"],
    sample_input_data=train_dataframe.select(INPUT_COLS),
    comment="XGBoost model for forecasting customer demand",
    options= {"enable_explainability": True},
    target_platforms = ["WAREHOUSE"]
)


In [None]:
# Now that we're done processing data, scale back down
scale_cluster(1, is_async=True)

In [None]:
from snowflake.ml.jobs import remote
@remote(compute_pool="HOL_COMPUTE_POOL_HIGHMEM", stage_name="payload_stage", external_access_integrations=["ALLOW_ALL_ACCESS_INTEGRATION"])
def update_reviews():
    import ray
    from snowflake.ml.ray.datasink.table_data_sink import SnowflakeTableDatasink
    from snowflake.ml.ray.datasource import SFStageTextDataSource
    
    file_name = "*.txt"
    stage_name = "REVIEWS"
    
    text_source = SFStageTextDataSource(
        stage_location=stage_name,
        file_pattern=file_name
    )
    
    text_dataset = ray.data.read_datasource(text_source)

    # text_dataset = ray.data.read_datasource(text_source)
    parsed_dataset = text_dataset.map(parse_reviews)
    dataset = parsed_dataset.map_batches(ModelPredictor, concurrency=1, batch_size=10, num_cpus=24)

    datasink = SnowflakeTableDatasink(
        table_name="REVIEWS",
        auto_create_table=True,
        override=False,
        )
    dataset.write_datasink(datasink)


In [None]:
# Create a training job
@remote(compute_pool="HOL_COMPUTE_POOL_HIGHMEM", stage_name="payload_stage", external_access_integrations=["ALLOW_ALL_ACCESS_INTEGRATION"])
def retrain():
    import datetime
    from snowflake.snowpark.context import get_active_session
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
    from snowflake.ml.data.data_connector import DataConnector

    session = get_active_session()

    tabular_data = session.table("HOL_DB.HOL_SCHEMA.TABULAR_DATA")
    review_data = session.table("HOL_DB.HOL_SCHEMA.REVIEWS")
        
    INPUT_COLS = ["REVIEW_QUALITY_OUT", "PRODUCT_LAYOUT_OUT", "PAGE_LOAD_TIME", "REVIEW_SENTIMENT", "PRODUCT_RATING"]
    LABEL_COL = 'PURCHASE_DECISION'
    
    train_dataframe = tabular_data.join(
        review_data,
        review_data['UUID'] == tabular_data['UUID'],
        'inner'
    )

    # Encode review sentiment and review quality
    from snowflake.ml.modeling.preprocessing import LabelEncoder
    
    # Select the columns to encode
    columns_to_encode = ["REVIEW_QUALITY", "PRODUCT_LAYOUT"]
    
    # Initialize LabelEncoder for each column
    encoders = [LabelEncoder(input_cols=[col], output_cols=[f"{col}_OUT"]) for col in columns_to_encode]
    for encoder in encoders:
        train_dataframe = encoder.fit(train_dataframe).transform(train_dataframe)
        
    params = {
        "eta": 0.1,
        "max_depth": 8,
        "min_child_weight": 100,
        "tree_method": "hist",
    }
    
    scaling_config = XGBScalingConfig(
        use_gpu=False
    )
    
    estimator = XGBEstimator(
        n_estimators=50,
        objective="reg:squarederror",
        params=params,
        scaling_config=scaling_config,
    )
    
    
    dc = DataConnector.from_dataframe(train_dataframe)
    xgb_model = estimator.fit(
        dc, input_cols=INPUT_COLS, label_col=LABEL_COL
    )
    
    dc = DataConnector.from_dataframe(train_dataframe)
    xgb_model = estimator.fit(
        dc, input_cols=INPUT_COLS, label_col=LABEL_COL
    )

    from snowflake.ml.registry import registry
    reg = registry.Registry(session=session)
    
    # Log the model in Snowflake Model Registry
    _ = reg.log_model(
        model_name="CONVERSTION_CLASSIFIER",
        model=xgb_model,
        version_name=f"retrain_{datetime.datetime.now().strftime('v%Y%m%d_%H%M%S')}",
        conda_dependencies=["scikit-learn","xgboost"],
        sample_input_data=train_dataframe.select(INPUT_COLS),
        comment="XGBoost model for forecasting customer demand",
        options= {"enable_explainability": True},
        target_platforms = ["WAREHOUSE"]
    )



In [None]:
# You can run the job manually, and get the status and logs of the job
train_job = retrain()

In [None]:
while train_job.status == "PENDING":
    time.sleep(1)

# Once job starts running, we can view the logs
train_job.get_logs()

In [None]:
# we can also see all the jobs, and manage them with the job manager
from snowflake.ml import jobs

all_jobs = jobs.list_jobs().to_pandas()

job_manager = jobs.manager

mask = all_jobs['status'].str.contains("FAILED")
filtered_df = all_jobs[mask]

job_ids = filtered_df["id"]
for id in job_ids:
    job_manager.delete_job(id)

# Create Automated ML Pipeline
- Automate the deployment of the pipeline using Snowflake Tasks
- After DAG creation, navigate to Monitoring -> Task History to view execution

In [None]:
from snowflake.core.task.dagv1 import DAG, DAGTask
from snowflake.core.task.context import TaskContext
from datetime import timedelta
from snowflake.snowpark import Session
import snowflake.ml.jobs.manager as manager
import datetime
import json

WAREHOUSE = session.get_current_warehouse()


def refresh_reviews(session: Session) -> None:
    job = update_reviews()
    # Throw error if job fails
    final_status = job.wait()

    if final_status == "FAILED":
        raise RuntimeError(f"Job {job} failed with logs ")

def update_sentiment(session: Session) -> None:
    sql_text = """
        UPDATE
          REVIEWS
        SET
          REVIEW_SENTIMENT = SNOWFLAKE.CORTEX.SENTIMENT (REVIEW_TEXT);
    """
    session.sql(sql_text).collect()

def retrain_model(session: Session) -> None:
    job = retrain()
    # Throw error if job fails
    final_status = job.wait()

    if final_status == "FAILED":
        raise RuntimeError(f"Job {job} failed with logs ")

def setup(session: Session) -> str:
    info = dict(
        run_id=datetime.datetime.now().strftime("v%Y%m%d_%H%M%S"),
    )
    return json.dumps(info)

def create_dag() -> DAG:
    with DAG(
        "review_model_dag",
        warehouse=WAREHOUSE,
        schedule=timedelta(days=1),
        stage_location="payload_stage",
        packages=["snowflake-snowpark-python", "snowflake-ml-python==1.8.2", "transformers"]
    ) as dag:
        # Need to wrap first function in a DAGTask to make >> operator work properly
        setup_task = DAGTask("setup", definition=setup)

        # Build the DAG
        setup_task >> refresh_reviews >> update_sentiment >> retrain_model

    return dag

from snowflake.core import CreateMode, Root
from snowflake.core.task.dagv1 import DAGOperation
api_root = Root(session)

dag_op = DAGOperation(
    schema=api_root.databases[session.get_current_database()].schemas[session.get_current_schema()]
)

dag = create_dag()
dag_op.deploy(dag, mode=CreateMode.or_replace)
dag_op.run(dag)

current_runs = dag_op.get_current_dag_runs(dag)
for r in current_runs:
    print(f"RunId={r.run_id} State={r.state}")

# Assess Feature Importance with Explainability

In [None]:
show models;

In [None]:
explanations = model_ref.run(train_dataframe.select(INPUT_COLS), function_name="explain")

In [None]:
explanations