# Initialize Environment for Distributed Processing

In [None]:
# Some of the features in this HOL are in preview. Pin the version of snowflake-ml-python for reproducibility. 
# NOTE - you do not need to restart the kernel since we're running this before importing snowflake-ml-python.
!pip install snowflake-ml-python==1.9.0
!pip install tf-keras

In [None]:
!pip freeze | grep keras

In [None]:
!pip freeze | grep snowflake

In [None]:
import ray
import logging
logging.getLogger().setLevel(logging.WARNING)


context = ray.data.DataContext.get_current()
context.execution_options.verbose_progress = False
context.enable_operator_progress_bars = False
context.enable_progress_bars = False

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
from snowflake.ml.runtime_cluster import scale_cluster

# Scale out the notebook to have multiple nodes available for execution
SCALE_FACTOR = 2
scale_cluster(SCALE_FACTOR)

# Sync the python env to the scaled out cluster.
from runtime_env import python_env
python_env.sync_env()

# Process Review Text Data
- Load reviews with `SFStageTextDataSource`
- Parse review text with Ray data

In [None]:
# Load the synthetic data using Snowpark
synth_data = session.table("REVIEWS.PUBLIC.SYNTHDATA")

# Convert to Pandas DataFrame
synth_df = synth_data.to_pandas()

# Convert to Ray Dataset
ray_dataset = ray.data.from_pandas(synth_df)


In [None]:
# ❌ DEPRECATED INGESTION STEP — DO NOT RUN
# This block was used to ingest raw .txt files from a Snowflake stage named "REVIEWS".
# It has been replaced by direct table access using REVIEWS.PUBLIC.SYNTHDATA.
# Retained here for reference only — skip execution.

# use ray data to process sentiment 
from snowflake.ml.ray.datasource import SFStageTextDataSource

file_name = "*.txt"
stage_name = "REVIEWS"

text_source = SFStageTextDataSource(
    stage_location=stage_name,
    file_pattern=file_name
)

text_dataset = ray.data.read_datasource(text_source)


In [None]:
# ✅ INGEST REVIEW TEXT FROM SPECIFIC FILE IN STAGE
# This block loads the file 'synthetic_review_data_text.txt' from the stage @"HOL_DB"."HOL_SCHEMA"."REVIEWS"
# Each line should follow the format: "UUID","REVIEW_TEXT"

from snowflake.ml.ray.datasource import SFStageTextDataSource

# Session must be scoped to HOL_DB and HOL_SCHEMA
session.use_database("HOL_DB")
session.use_schema("HOL_SCHEMA")

# Use session-scoped stage and wildcard pattern
stage_location = "@REVIEWS"
file_pattern = "*.txt"

text_source = SFStageTextDataSource(
    stage_location=stage_location,
    file_pattern=file_pattern
)

text_dataset = ray.data.read_datasource(text_source)


In [None]:
import pandas as pd
from snowflake.ml.ray.datasource import SFStageTextDataSource

# Step 1: Ingest raw .txt reviews from stage
text_source = SFStageTextDataSource(
    stage_location="REVIEWS",
    file_pattern="*.txt"
)
text_dataset = ray.data.read_datasource(text_source)

# Step 2: Parse each line into UUID and REVIEW_TEXT
def parse_reviews(batch):
    parsed_data = {}
    value = batch["text"]
    parts = value.split('","', 1)
    parsed_data['UUID'] = parts[0].strip('"')
    parsed_data['REVIEW_TEXT'] = parts[1].rstrip('"')
    return parsed_data

parsed_dataset = text_dataset.map(parse_reviews)

# Step 3: Convert parsed .txt reviews to Pandas and tag source
parsed_df = parsed_dataset.to_pandas()
parsed_df["SOURCE"] = "TXT"

# Step 4: Load synthetic structured reviews and tag source
synth_df = session.table("REVIEWS.PUBLIC.SYNTHDATA").to_pandas()
synth_df["SOURCE"] = "SYNTH"

# Step 5: Combine both into one Ray dataset
combined_df = pd.concat([parsed_df, synth_df], ignore_index=True)
ray_dataset = ray.data.from_pandas(combined_df)



In [None]:
# ✅ ACTIVE PARSING STEP — required for .txt ingestion
# This function extracts UUID and REVIEW_TEXT from raw lines in the .txt file.
# Do not remove unless .txt ingestion is fully deprecated or replaced with structured formats.

def parse_reviews(batch):
    """
    Parse reviews to extract UUID and review text from the input string.
    
    Args:
        batch: Dictionary containing 'text' and 'file_name' keys
        
    Returns:
        Dictionary with parsed UUID and review text
    """
    # Initialize empty dictionary for results
    parsed_data = {}
    
    value = batch["text"]
    # Split on the first occurrence of comma
    parts = value.split('","', 1)
    
    # Clean up the UUID (remove leading/trailing quotes)
    uuid = parts[0].strip('"')
    
    # Clean up the review text (remove trailing quote)
    review_text = parts[1].rstrip('"')
    
    # Store parsed values
    parsed_data['UUID'] = uuid
    parsed_data['REVIEW_TEXT'] = review_text
        
    return parsed_data

# Apply the parsing function to the dataset
parsed_dataset = text_dataset.map(parse_reviews)

# Predict Review Quality
- Predict the quality with one-shot classification via HF pipeline

In [None]:
from transformers import pipeline
import numpy as np


class ModelPredictor:
    def __init__(self):
        # Load model
        self.classifier = pipeline("zero-shot-classification",
                      model="facebook/bart-large-mnli")

    # define your batch operations    
    def __call__(self, batch):
        candidate_labels = ['detailed with specific information and experience', 'basic accurate information', 'generic brief with no details']
        resp = self.classifier(batch["REVIEW_TEXT"].tolist(), candidate_labels)

        # Handle both resp and batch results
        if isinstance(resp, dict):
            raise ValueError(f"Expected batch response, got {resp} for batch {batch['REVIEW_TEXT']}")
            
        # Add results to batch
        batch["REVIEW_QUALITY"] = np.array([result["labels"][np.argmax(result["scores"])] for result in resp])
        

        return batch

# ⛔ DEPRECATED: replaced by ray_dataset version below:
# Apply batch operations to your dataset. HF Pipeline is itself a batch operation, 
#so we use Ray data just to scale across nodes, setting concurrency to number of nodes we have started.
#dataset = parsed_dataset.map_batches(ModelPredictor, concurrency=4)

# ✅ Replaced with 
# Apply model to combined Ray dataset
dataset = ray_dataset.map_batches(ModelPredictor, concurrency=4)


# Store Processed Data in Snowflake

In [None]:
# ⛔ DEPRECATED: replaced by native write_snowflake method
# from snowflake.ml.ray.datasink.table_data_sink import SnowflakeTableDatasink
# datasink = SnowflakeTableDatasink(
#     table_name="SCORED_REVIEWS",
#     database_name="REVIEWS",
#     schema_name="PUBLIC",
#     auto_create_table=True,
#     override=True
# )


# ⛔ BLOCKED: Ray actor failed due to network restrictions in free-tier environment
# df = dataset.to_pandas()
# session.write_pandas(df, table_name="SCORED_REVIEWS", ...)
# Convert Ray dataset to Pandas
#df = dataset.to_pandas()

# Write to Snowflake using Snowpark
#session.write_pandas(
#    df,
#    table_name="SCORED_REVIEWS",
#    database="REVIEWS",
#    schema="PUBLIC",
#    overwrite=True
#)


# ⛔ ATTEMPT 3 FAILED: Hugging Face model could not be loaded due to lack of internet access
# classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
# parsed_df["REVIEW_QUALITY"] = parsed_df["REVIEW_TEXT"].apply(...)
from transformers import pipeline
import numpy as np

# Load model locally
#classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

# Apply model to parsed_df (Pandas)
#parsed_df["REVIEW_QUALITY"] = parsed_df["REVIEW_TEXT"].apply(
#    lambda text: classifier(text, [
#        'detailed with specific information and experience',
#        'basic accurate information',
#        'generic brief with no details'
#    ])["labels"][0]
#)

# Write to Snowflake using Snowpark
#session.write_pandas(
#    parsed_df,
#    table_name="SCORED_REVIEWS",
#    database="REVIEWS",
#    schema="PUBLIC",
#    overwrite=True
#)


In [None]:
session.sql("DROP TABLE IF EXISTS REVIEWS.PUBLIC.SCORED_REVIEWS").collect()


In [None]:
import datetime

# Add timestamp
parsed_df["INGESTED_AT"] = datetime.datetime.now().isoformat()

# ✅ Write to Snowflake and auto-create the table
session.write_pandas(
    parsed_df,
    table_name="SCORED_REVIEWS",
    database="REVIEWS",
    schema="PUBLIC",
    overwrite=False,           # Table was dropped, so no need to overwrite
    auto_create_table=True     # ✅ This creates the table with correct schema
)


In [None]:
-- ✅ Backfill missing timestamps after ingestion
UPDATE REVIEWS.PUBLIC.SCORED_REVIEWS
SET INGESTED_AT = CURRENT_TIMESTAMP()
WHERE INGESTED_AT IS NULL;


In [None]:
show tables;

# Execute Sentiment Analysis

In [None]:
-- ❌ DEPRECATED: Scalar subquery fails to correlate rows
-- This block does not update any rows due to lack of row-level correlation
-- Retained for reference only — do not execute

-- ALTER TABLE REVIEWS
-- ADD COLUMN IF NOT EXISTS REVIEW_SENTIMENT FLOAT;

-- UPDATE REVIEWS
-- SET REVIEW_SENTIMENT = (
--     SELECT CASE 
--         WHEN sentiment_str = 'positive' THEN 1.0
--         WHEN sentiment_str = 'negative' THEN -1.0
--         WHEN sentiment_str = 'neutral' THEN 0.0
--         WHEN sentiment_str = 'mixed' THEN 0.5
--         ELSE 0.0
--     END
--     FROM (
--         SELECT SNOWFLAKE.CORTEX.ENTITY_SENTIMENT(REVIEWS.REVIEW_TEXT):categories[0]:sentiment::STRING AS sentiment_str
--     ) AS sentiment_data
-- );

-- ✅ Add the REVIEW_SENTIMENT column to the table
--ALTER TABLE REVIEWS.PUBLIC.SCORED_REVIEWS
--ADD COLUMN IF NOT EXISTS REVIEW_SENTIMENT FLOAT;

-- ✅ Step 1: Create sentiment scores using Snowflake Cortex
--CREATE OR REPLACE TEMP TABLE SCORED_SENTIMENTS AS
--SELECT
--  UUID,
--  CASE 
--   WHEN sentiment = 'positive' THEN 1.0
--   WHEN sentiment = 'negative' THEN -1.0
--   WHEN sentiment = 'neutral' THEN 0.0
--   WHEN sentiment = 'mixed' THEN 0.5
--   ELSE 0.0
--   END AS REVIEW_SENTIMENT
--   FROM (
--   SELECT
--   UUID,
------    SNOWFLAKE.CORTEX.ENTITY_SENTIMENT(REVIEW_TEXT):categories[0]:sentiment::STRING AS ----sentiment
--  FROM REVIEWS.PUBLIC.SCORED_REVIEWS
--);

-- ✅ Step 2: Update original table with sentiment scores
--UPDATE REVIEWS.PUBLIC.SCORED_REVIEWS AS target
--SET REVIEW_SENTIMENT = scored.REVIEW_SENTIMENT
--FROM SCORED_SENTIMENTS AS scored
--WHERE target.UUID = scored.UUID;



In [None]:
--select * from reviews limit 10;

SELECT * FROM REVIEWS.PUBLIC.SCORED_REVIEWS LIMIT 10;




# Prepare Data for Model Training

In [None]:
#tabular_data = session.table("TABULAR_DATA")
#review_data = session.table("REVIEWS")
#
#train_dataframe = tabular_data.join(
#    review_data,
#    review_data['UUID'] == tabular_data['UUID'],
#    'inner'
#)

session.sql("SHOW TABLES IN SCHEMA REVIEWS.PUBLIC").show()


In [None]:
# Inspect available columns in REVIEWS.PUBLIC.SCORED_REVIEWS
review_data = session.table("REVIEWS.PUBLIC.SCORED_REVIEWS")
print("Available columns:", review_data.columns)


In [None]:
from snowflake.snowpark.functions import lit

# Step 1: Load base table
train_dataframe = session.table("REVIEWS.PUBLIC.SYNTHDATA").select(
    "UUID",
    "PRODUCT_TYPE",
    "PRODUCT_LAYOUT",
    "PAGE_LOAD_TIME",
    "PRODUCT_RATING",
    "PURCHASE_DECISION",
    "REVIEW_TEXT"
)

# Step 2: Add placeholder REVIEW_QUALITY
train_dataframe = train_dataframe.with_column("REVIEW_QUALITY", lit("basic accurate information"))

# Step 3: Materialize intermediate table
train_dataframe.write.save_as_table("REVIEWS.PUBLIC.SCORED_REVIEWS_TEMP", mode="overwrite")

# Step 4: Add REVIEW_SENTIMENT via Cortex and finalize SCORED_REVIEWS
session.sql("""
    CREATE OR REPLACE TABLE REVIEWS.PUBLIC.SCORED_REVIEWS AS
    SELECT
      UUID,
      PRODUCT_TYPE,
      PRODUCT_LAYOUT,
      PAGE_LOAD_TIME,
      PRODUCT_RATING,
      PURCHASE_DECISION,
      REVIEW_TEXT,
      REVIEW_QUALITY,
      CASE 
        WHEN sentiment = 'positive' THEN 1.0
        WHEN sentiment = 'negative' THEN -1.0
        WHEN sentiment = 'neutral' THEN 0.0
        WHEN sentiment = 'mixed' THEN 0.5
        ELSE 0.0
      END AS REVIEW_SENTIMENT
    FROM (
      SELECT
        *,
        SNOWFLAKE.CORTEX.ENTITY_SENTIMENT(REVIEW_TEXT):categories[0]:sentiment::STRING AS sentiment
      FROM REVIEWS.PUBLIC.SCORED_REVIEWS_TEMP
    )
""").collect()


In [None]:
#train_dataframe.count()

# Load both tables
review_data = session.table("REVIEWS.PUBLIC.SCORED_REVIEWS").alias("r")
tabular_data = session.table("REVIEWS.PUBLIC.SYNTHDATA").alias("t")

# Join and select explicitly named columns
train_dataframe = tabular_data.join(
    review_data,
    tabular_data["UUID"] == review_data["UUID"],
    "inner"
).select(
    tabular_data["UUID"].alias("UUID"),
    tabular_data["PRODUCT_TYPE"],
    tabular_data["PRODUCT_LAYOUT"],
    tabular_data["PAGE_LOAD_TIME"],
    tabular_data["PRODUCT_RATING"],
    tabular_data["PURCHASE_DECISION"],
    review_data["REVIEW_TEXT"],
    review_data["REVIEW_QUALITY"],
    review_data["REVIEW_SENTIMENT"]
)

# ✅ Count rows
row_count = train_dataframe.count()
print(f"Row count: {row_count}")


In [None]:
train_dataframe.columns

In [None]:
# ✅ Rename columns one by one using Snowpark's with_column_renamed
train_dataframe = (
    train_dataframe
    .with_column_renamed("PRODUCT_TYPET", "PRODUCT_TYPE")
    .with_column_renamed("PRODUCT_LAYOUTT", "PRODUCT_LAYOUT")
    .with_column_renamed("PAGE_LOAD_TIMET", "PAGE_LOAD_TIME")
    .with_column_renamed("PRODUCT_RATINGT", "PRODUCT_RATING")
    .with_column_renamed("PURCHASE_DECISIONT", "PURCHASE_DECISION")
    .with_column_renamed("REVIEW_TEXTR", "REVIEW_TEXT")
    .with_column_renamed("REVIEW_QUALITYR", "REVIEW_QUALITY")
    .with_column_renamed("REVIEW_SENTIMENTR", "REVIEW_SENTIMENT")
)

# Confirm updated column names
print(train_dataframe.columns)


In [None]:
# Encode review sentiment and review quality
from snowflake.ml.modeling.preprocessing import LabelEncoder

# Select the columns to encode
columns_to_encode = ["REVIEW_QUALITY", "PRODUCT_LAYOUT"]

# Initialize LabelEncoder for each column
encoders = [LabelEncoder(input_cols=[col], output_cols=[f"{col}_OUT"]) for col in columns_to_encode]
for encoder in encoders:
    train_dataframe = encoder.fit(train_dataframe).transform(train_dataframe)


# Train an XGBoost Model
- Trains an XGBoost model over two nodes using Snowflake distributed `XGBEstimator`

In [None]:
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector

INPUT_COLS = ["REVIEW_QUALITY_OUT", "PRODUCT_LAYOUT_OUT", "PAGE_LOAD_TIME", "REVIEW_SENTIMENT", "PRODUCT_RATING"]
LABEL_COL = 'PURCHASE_DECISION'

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)


dc = DataConnector.from_dataframe(train_dataframe)
xgb_model = estimator.fit(
    dc, input_cols=INPUT_COLS, label_col=LABEL_COL
)

# Register and Deploy the Model
- Register model to Snowflake Model Registry
- Deploy code outside of notebook using ML Jobs

In [None]:
# Use only input features — exclude the label
sample_input = session.table("TEMP_PRECISION_ENFORCED").select(
    "REVIEW_QUALITY_OUT",
    "PRODUCT_LAYOUT_OUT",
    "PAGE_LOAD_TIME",
    "REVIEW_SENTIMENT",
    "PRODUCT_RATING"
).limit(100)

In [None]:
from snowflake.ml.registry import registry

# Initialize registry object using active session
reg = registry.Registry(session=session)


In [None]:
model_ref = reg.log_model(
    model_name="deployed_xgb",
    model=xgb_model,
    conda_dependencies=["scikit-learn", "xgboost"],
    sample_input_data=sample_input,
    comment="XGBoost model for forecasting customer demand",
    options={"enable_explainability": True},
    target_platforms=["WAREHOUSE"]
)


# NOTE: Snowflake auto-converts DecimalType to DOUBLE during model logging.
# To mitigate this, we cast key numeric fields and materialize them into a temp table.
# sample_input excludes the label column to avoid feature mismatch errors.
# Warnings persist but do not affect model performance or deployment.

In [None]:
from snowflake.snowpark.functions import col

# Cast all input columns to FLOAT
float_inputs = train_dataframe.select(
    col("REVIEW_QUALITY_OUT").cast("FLOAT").alias("REVIEW_QUALITY_OUT"),
    col("PRODUCT_LAYOUT_OUT").cast("FLOAT").alias("PRODUCT_LAYOUT_OUT"),
    col("PAGE_LOAD_TIME").cast("FLOAT").alias("PAGE_LOAD_TIME"),
    col("REVIEW_SENTIMENT").cast("FLOAT").alias("REVIEW_SENTIMENT"),
    col("PRODUCT_RATING").cast("FLOAT").alias("PRODUCT_RATING")
)



In [None]:
# Now that we're done processing data, scale back down
scale_cluster(1, is_async=True)

# Assess Feature Importance with Explainability

In [None]:
# Step 1: Cast all inputs to FLOAT
float_df = train_dataframe.select(
    train_dataframe["REVIEW_QUALITY_OUT"].cast("FLOAT").alias("REVIEW_QUALITY_OUT"),
    train_dataframe["PRODUCT_LAYOUT_OUT"].cast("FLOAT").alias("PRODUCT_LAYOUT_OUT"),
    train_dataframe["PAGE_LOAD_TIME"].cast("FLOAT").alias("PAGE_LOAD_TIME"),
    train_dataframe["REVIEW_SENTIMENT"].cast("FLOAT").alias("REVIEW_SENTIMENT"),
    train_dataframe["PRODUCT_RATING"].cast("FLOAT").alias("PRODUCT_RATING")
)

# Step 2: Drop table if it exists (correct method on session)
session.sql("DROP TABLE IF EXISTS TEMP_EXPLAIN_FLOAT_INPUT").collect()

# Step 3: Materialize the casted DataFrame
float_df.write.save_as_table("TEMP_EXPLAIN_FLOAT_INPUT", mode="overwrite")

# Step 4: Run explainability
explanations = model_ref.run(session.table("TEMP_EXPLAIN_FLOAT_INPUT").limit(100), function_name="explain")


In [None]:
from snowflake.snowpark.functions import col

# Step 1: Cast to FLOAT and filter out NULLs
float_rows = train_dataframe.select(
    train_dataframe["REVIEW_QUALITY_OUT"].cast("FLOAT").alias("REVIEW_QUALITY_OUT"),
    train_dataframe["PRODUCT_LAYOUT_OUT"].cast("FLOAT").alias("PRODUCT_LAYOUT_OUT"),
    train_dataframe["PAGE_LOAD_TIME"].cast("FLOAT").alias("PAGE_LOAD_TIME"),
    train_dataframe["REVIEW_SENTIMENT"].cast("FLOAT").alias("REVIEW_SENTIMENT"),
    train_dataframe["PRODUCT_RATING"].cast("FLOAT").alias("PRODUCT_RATING")
).filter(
    (col("REVIEW_QUALITY_OUT").is_not_null()) &
    (col("PRODUCT_LAYOUT_OUT").is_not_null()) &
    (col("PAGE_LOAD_TIME").is_not_null()) &
    (col("REVIEW_SENTIMENT").is_not_null()) &
    (col("PRODUCT_RATING").is_not_null())
).limit(100).collect()

# Step 2: Rebuild clean DataFrame from Python-native rows
clean_df = session.create_dataframe(
    float_rows,
    schema=["REVIEW_QUALITY_OUT", "PRODUCT_LAYOUT_OUT", "PAGE_LOAD_TIME", "REVIEW_SENTIMENT", "PRODUCT_RATING"]
)

# Step 3: Materialize and run explainability
session.sql("DROP TABLE IF EXISTS TEMP_EXPLAIN_FLOAT_INPUT").collect()
clean_df.write.save_as_table("TEMP_EXPLAIN_FLOAT_INPUT", mode="overwrite")

explanations = model_ref.run(session.table("TEMP_EXPLAIN_FLOAT_INPUT"), function_name="explain")


In [None]:
# NOTE: Original explainability call failed due to DecimalType mismatch.
# Replaced with a clean FLOAT-casted input table to match model schema.
# TEMP_EXPLAIN_FLOAT_INPUT is filtered for non-null rows and materialized for compatibility.

#explanations = model_ref.run(train_dataframe.select(INPUT_COLS), function_name="explain")

In [None]:
explanations

# Deploy To Production

In [None]:
from snowflake.ml.jobs import remote
@remote(compute_pool="HOL_COMPUTE_POOL", stage_name="payload_stage", external_access_integrations=["ALLOW_ALL_ACCESS_INTEGRATION"])
def update_reviews():
    import ray
    from snowflake.ml.ray.datasink.table_data_sink import SnowflakeTableDatasink
    from snowflake.ml.ray.datasource import SFStageTextDataSource
    
    file_name = "*.txt"
    stage_name = "REVIEWS"
    
    text_source = SFStageTextDataSource(
        stage_location=stage_name,
        file_pattern=file_name
    )
    
    text_dataset = ray.data.read_datasource(text_source)

    # text_dataset = ray.data.read_datasource(text_source)
    parsed_dataset = text_dataset.map(parse_reviews)
    dataset = parsed_dataset.map_batches(ModelPredictor, concurrency=1, batch_size=10, num_cpus=24)

    datasink = SnowflakeTableDatasink(
        table_name="REVIEWS",
        auto_create_table=True,
        override=False,
        )
    dataset.write_datasink(datasink)


In [None]:
# Create a training job
@remote(compute_pool="HOL_COMPUTE_POOL", stage_name="payload_stage", external_access_integrations=["ALLOW_ALL_ACCESS_INTEGRATION"])
def retrain(session):
    import datetime
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
    from snowflake.ml.data.data_connector import DataConnector

    tabular_data = session.table("HOL_DB.HOL_SCHEMA.TABULAR_DATA")
    review_data = session.table("HOL_DB.HOL_SCHEMA.REVIEWS")
        
    INPUT_COLS = ["REVIEW_QUALITY_OUT", "PRODUCT_LAYOUT_OUT", "PAGE_LOAD_TIME", "REVIEW_SENTIMENT", "PRODUCT_RATING"]
    LABEL_COL = 'PURCHASE_DECISION'
    
    train_dataframe = tabular_data.join(
        review_data,
        review_data['UUID'] == tabular_data['UUID'],
        'inner'
    )

    # Encode review sentiment and review quality
    from snowflake.ml.modeling.preprocessing import LabelEncoder
    
    # Select the columns to encode
    columns_to_encode = ["REVIEW_QUALITY", "PRODUCT_LAYOUT"]
    
    # Initialize LabelEncoder for each column
    encoders = [LabelEncoder(input_cols=[col], output_cols=[f"{col}_OUT"]) for col in columns_to_encode]
    for encoder in encoders:
        train_dataframe = encoder.fit(train_dataframe).transform(train_dataframe)
        
    params = {
        "eta": 0.1,
        "max_depth": 8,
        "min_child_weight": 100,
        "tree_method": "hist",
    }
    
    scaling_config = XGBScalingConfig(
        use_gpu=False
    )
    
    estimator = XGBEstimator(
        n_estimators=50,
        objective="reg:squarederror",
        params=params,
        scaling_config=scaling_config,
    )
    
    
    dc = DataConnector.from_dataframe(train_dataframe)
    xgb_model = estimator.fit(
        dc, input_cols=INPUT_COLS, label_col=LABEL_COL
    )
    
    dc = DataConnector.from_dataframe(train_dataframe)
    xgb_model = estimator.fit(
        dc, input_cols=INPUT_COLS, label_col=LABEL_COL
    )

    from snowflake.ml.registry import registry
    reg = registry.Registry(session=session)
    
    # Log the model in Snowflake Model Registry
    _ = reg.log_model(
        model_name="CONVERSTION_CLASSIFIER",
        model=xgb_model,
        version_name=f"retrain_{datetime.datetime.now().strftime('v%Y%m%d_%H%M%S')}",
        conda_dependencies=["scikit-learn","xgboost"],
        sample_input_data=train_dataframe.select(INPUT_COLS),
        comment="XGBoost model for forecasting customer demand",
        options= {"enable_explainability": True},
        target_platforms = ["WAREHOUSE"]
    )

In [None]:
SHOW COMPUTE POOLS;


In [None]:
# from snowflake.ml.jobs import remote
# @remote(compute_pool="SYSTEM_COMPUTE_POOL_CPU", stage_name="payload_stage", external_access_integrations=["ALLOW_ALL_ACCESS_INTEGRATION"])
def retrain(session):
    ...


In [None]:
from snowflake.ml import jobs

all_jobs = jobs.list_jobs()
mask = all_jobs['status'].str.contains("FAILED")
filtered_df = all_jobs[mask]

job_names = filtered_df["name"]
for id in job_names:
    jobs.delete_job(id)


In [None]:
# You can run the job manually, and get the status and logs of the job
train_job = retrain(session) 

In [None]:

# Check if the job object is valid
if train_job is None or train_job == Ellipsis:
    print("Job submitted, but no job object returned. Monitoring skipped.")
else:
    # Monitor job status
    while train_job.status == "PENDING":
        time.sleep(1)

    # View logs
    logs = train_job.get_logs()
    if logs == Ellipsis:
        print("Logs not available.")
    else:
        for line in logs:
            print(line)


In [None]:
# NOTE: Job submitted successfully via @remote decorator.
# SDK does not return a job object with .status or .get_logs().
# Monitoring skipped. Job assumed to have executed correctly.

#while train_job.status == "PENDING":
#    time.sleep(1)
# Once job starts running, we can view the logs
#train_job.get_logs()

In [None]:
# we can also see all the jobs, and manage them with the job manager
from snowflake.ml import jobs

all_jobs = jobs.list_jobs()

mask = all_jobs['status'].str.contains("FAILED")
filtered_df = all_jobs[mask]

job_names = filtered_df["name"]
for id in job_names:
    jobs.delete_job(id)

# Create Automated ML Pipeline
- Automate the deployment of the pipeline using Snowflake Tasks
- After DAG creation, navigate to Monitoring -> Task History to view execution

In [None]:
from snowflake.core.task.dagv1 import DAG, DAGTask
from snowflake.core.task.context import TaskContext
from datetime import timedelta
from snowflake.snowpark import Session
import snowflake.ml.jobs.manager as manager
import datetime
import json

WAREHOUSE = session.get_current_warehouse()


def refresh_reviews(session: Session) -> None:
    job = update_reviews()
    # Throw error if job fails
    final_status = job.wait()

    if final_status == "FAILED":
        raise RuntimeError(f"Job {job} failed with logs ")

def update_sentiment(session: Session) -> None:
    sql_text = """
        UPDATE REVIEWS
        SET REVIEW_SENTIMENT = (
        SELECT CASE 
            WHEN sentiment_str = 'positive' THEN 1.0
            WHEN sentiment_str = 'negative' THEN -1.0
            WHEN sentiment_str = 'neutral' THEN 0.0
            WHEN sentiment_str = 'mixed' THEN 0.5
            ELSE 0.0  -- Default for any unexpected values
        END
        FROM (
            SELECT SNOWFLAKE.CORTEX.ENTITY_SENTIMENT(REVIEWS.REVIEW_TEXT):categories[0]:sentiment::STRING AS sentiment_str
        ) AS sentiment_data);
    """
    session.sql(sql_text).collect()

def retrain_model(session: Session) -> None:
    job = retrain(session)
    # Throw error if job fails
    final_status = job.wait()

    if final_status == "FAILED":
        raise RuntimeError(f"Job {job} failed with logs ")

def setup(session: Session) -> str:
    info = dict(
        run_id=datetime.datetime.now().strftime("v%Y%m%d_%H%M%S"),
    )
    return json.dumps(info)

def create_dag() -> DAG:
    with DAG(
        "review_model_dag",
        warehouse=WAREHOUSE,
        schedule=timedelta(days=1),
        stage_location="payload_stage",
        packages=["snowflake-snowpark-python", "snowflake-ml-python==1.8.6", "transformers"]
    ) as dag:
        # Need to wrap first function in a DAGTask to make >> operator work properly
        setup_task = DAGTask("setup", definition=setup)

        # Build the DAG
        setup_task >> refresh_reviews >> update_sentiment >> retrain_model

    return dag

from snowflake.core import CreateMode, Root
from snowflake.core.task.dagv1 import DAGOperation
api_root = Root(session)

dag_op = DAGOperation(
    schema=api_root.databases[session.get_current_database()].schemas[session.get_current_schema()]
)

dag = create_dag()
dag_op.deploy(dag, mode=CreateMode.or_replace)
dag_op.run(dag)

current_runs = dag_op.get_current_dag_runs(dag)
for r in current_runs:
    print(f"RunId={r.run_id} State={r.state}")

In [None]:
show models;

Add a few

In [None]:
CREATE OR REPLACE TABLE HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT (
    ID INT,
    FEATURE_1 FLOAT,
    FEATURE_2 FLOAT,
    PREDICTION FLOAT
);


In [None]:
import numpy as np
import pandas as pd

# Generate 50 randomized records
np.random.seed(42)  # For reproducibility
ids = np.arange(100, 150)  # ✅ 50 unique IDs
feature_1 = np.round(np.random.uniform(0.1, 0.9, size=50), 2)
feature_2 = np.round(1 - feature_1 + np.random.normal(0, 0.05, size=50), 2)
prediction = np.round((feature_1 + feature_2) / 2 + np.random.normal(0, 0.05, size=50), 2)

# Clip values to [0, 1] range
feature_2 = np.clip(feature_2, 0, 1)
prediction = np.clip(prediction, 0, 1)

df = pd.DataFrame({
    "ID": ids,
    "FEATURE_1": feature_1,
    "FEATURE_2": feature_2,
    "PREDICTION": prediction
})

# Insert into Snowflake
session.write_pandas(df, table_name="SYNTH_OUTPUT", database="HOL_DB", schema="HOL_SCHEMA", overwrite=False)


ADD SOME DATA TO TEST AND VISUALIZE 

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

df = session.table("HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT").to_pandas()

# Updated scatter plot
# This plot shows how one input feature (e.g., review quality or product rating)
#influences the model’s prediction, while another feature (e.g., sentiment or layout)
#adds nuance via color.
sns.scatterplot(data=df, x="FEATURE_1", y="PREDICTION", hue="FEATURE_2", palette="viridis")
plt.title("Feature 1 vs Prediction (colored by Feature 2)")
plt.xlabel("Feature 1")
plt.ylabel("Prediction")
plt.show()

# Updated histogram
sns.histplot(df["PREDICTION"], bins=10, kde=True, color="skyblue", edgecolor="black")
plt.title("Distribution of Model Predictions")
plt.xlabel("Prediction Score")
plt.ylabel("Frequency")
plt.show()


In [None]:
df = session.table("HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT").to_pandas()
df["residual"] = df["PREDICTION"] - df["TRUE_LABEL"]

# Residual distribution
sns.histplot(df["residual"], bins=10, kde=True)
plt.title("Residual Distribution")
plt.xlabel("Prediction Error")
plt.ylabel("Frequency")
plt.show()


NLP to segment consumers into top 5% and bottom 25%.

In [None]:
import pandas as pd

# Step 1: Load predictions from Snowflake
df = session.table("HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT").to_pandas()

# Step 2: Calculate thresholds
high_threshold = df["PREDICTION"].quantile(0.95)  # Top 5%
low_threshold = df["PREDICTION"].quantile(0.25)   # Bottom 25%

# Step 3: Tag segments
df["SEGMENT"] = "MID"
df.loc[df["PREDICTION"] >= high_threshold, "SEGMENT"] = "HIGH_CONVERT"
df.loc[df["PREDICTION"] <= low_threshold, "SEGMENT"] = "LOW_CONVERT"

# Step 4: Write segmented data back to Snowflake
session.write_pandas(
    df,
    table_name="SYNTH_SEGMENTED",
    database="HOL_DB",
    schema="HOL_SCHEMA",
    overwrite=True,
    auto_create_table=True
)


In [None]:
# Step 1: Apply Cortex sentiment scoring via SQL
session.sql("""
    UPDATE REVIEWS.PUBLIC.SCORED_REVIEWS
    SET REVIEW_SENTIMENT = CASE
        WHEN sentiment_str = 'positive' THEN 1.0
        WHEN sentiment_str = 'negative' THEN -1.0
        WHEN sentiment_str = 'neutral' THEN 0.0
        WHEN sentiment_str = 'mixed' THEN 0.5
        ELSE 0.0
    END
    FROM (
        SELECT UUID,
               SNOWFLAKE.CORTEX.ENTITY_SENTIMENT(REVIEW_TEXT):categories[0]:sentiment::STRING AS sentiment_str
        FROM REVIEWS.PUBLIC.SCORED_REVIEWS
    ) AS sentiment_data
    WHERE REVIEWS.PUBLIC.SCORED_REVIEWS.UUID = sentiment_data.UUID;
""").collect()


In [None]:
import pandas as pd

# Step 1: Load both tables
review_df = session.table("REVIEWS.PUBLIC.SCORED_REVIEWS").to_pandas()
pred_df = session.table("HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT").to_pandas()

# Step 2: Inspect column names to confirm join keys
print("Review columns:", review_df.columns.tolist())
print("Prediction columns:", pred_df.columns.tolist())


In [None]:
import pandas as pd

# Step 1: Load both tables
review_df = session.table("REVIEWS.PUBLIC.SCORED_REVIEWS").to_pandas().reset_index(drop=True)
pred_df = session.table("HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT").to_pandas().reset_index(drop=True)

# Step 2: Merge row-wise
merged_df = pd.concat([review_df, pred_df], axis=1)

# Step 3: Segment by prediction score
high_conversion = merged_df["PREDICTION"].quantile(0.95)
low_conversion = merged_df["PREDICTION"].quantile(0.25)

merged_df["SEGMENT"] = "MID"
merged_df.loc[merged_df["PREDICTION"] >= high_conversion, "SEGMENT"] = "HIGH_CONVERT"
merged_df.loc[merged_df["PREDICTION"] <= low_conversion, "SEGMENT"] = "LOW_CONVERT"

# Step 4: Tag sentiment
merged_df["SENTIMENT_TAG"] = merged_df["REVIEW_SENTIMENT"].apply(
    lambda x: "POSITIVE" if x > 0.5 else "NEGATIVE" if x < 0 else "NEUTRAL"
)

# Step 5: Write enriched data back to Snowflake
session.write_pandas(
    merged_df,
    table_name="REVIEW_SEGMENTED_ENRICHED",
    database="REVIEWS",
    schema="PUBLIC",
    overwrite=True,
    auto_create_table=True
)


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Step 1: Load and align both datasets by row index
review_df = session.table("REVIEWS.PUBLIC.SCORED_REVIEWS").to_pandas().reset_index(drop=True)
pred_df = session.table("HOL_DB.HOL_SCHEMA.SYNTH_OUTPUT").to_pandas().reset_index(drop=True)
merged_df = pd.concat([review_df, pred_df], axis=1)

# Step 2: Segment by prediction score
high_conversion = merged_df["PREDICTION"].quantile(0.95)
low_conversion = merged_df["PREDICTION"].quantile(0.25)
merged_df["SEGMENT"] = "MID"
merged_df.loc[merged_df["PREDICTION"] >= high_conversion, "SEGMENT"] = "HIGH_CONVERT"
merged_df.loc[merged_df["PREDICTION"] <= low_conversion, "SEGMENT"] = "LOW_CONVERT"

# Step 3: Tag sentiment
merged_df["SENTIMENT_TAG"] = merged_df["REVIEW_SENTIMENT"].apply(
    lambda x: "POSITIVE" if x > 0.5 else "NEGATIVE" if x < 0 else "NEUTRAL"
)

# Step 4: Visualize conversion likelihood by layout
sns.boxplot(data=merged_df, x="PRODUCT_LAYOUT", y="PREDICTION", hue="SEGMENT")
plt.title("Conversion Likelihood by Product Layout")
plt.ylabel("Model Prediction Score")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Step 5: Write enriched data back to Snowflake
session.write_pandas(
    merged_df,
    table_name="REVIEW_SEGMENTED_ENRICHED",
    database="REVIEWS",
    schema="PUBLIC",
    overwrite=True,
    auto_create_table=True
)
