Step 1:  Import needed packages set active session

In [None]:
# Import python packages, added numpy 
import streamlit as st
import pandas as pd
import numpy as np
import random

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


Step 2:  Generate training data to Pandas DataFrame

In [None]:
def generate_batch_metrics(batch_id, machine_name, base_quality=0.98, is_outlier=False):
    """Generate metrics for a specific machine in a batch"""
    params = {
        'feed_rate': {
            'nominal_min': 100, 'nominal_max': 120,
            'total_min': 80, 'total_max': 140,
            'impact_weight': 0.3,
            'target_abnormal_pct': random.uniform(5, 25)
        },
        'vibration': {
            'nominal_min': 0.1, 'nominal_max': 0.3,
            'total_min': 0.05, 'total_max': 0.8,
            'impact_weight': 0.25,
            'target_abnormal_pct': random.uniform(5, 25)
        },
        'spindle_speed': {
            'nominal_min': 2800, 'nominal_max': 3200,
            'total_min': 2500, 'total_max': 3500,
            'impact_weight': 0.25,
            'target_abnormal_pct': random.uniform(5, 25)
        },
        'tool_wear': {
            'impact_weight': 0.2
        }
    }
    
    # Add machine-specific variations
    if machine_name == "Bjorn":
        params['feed_rate']['target_abnormal_pct'] *= 1.1  # 10% more feed rate issues
    elif machine_name == "Ragnar":
        params['vibration']['target_abnormal_pct'] *= 1.15  # 15% more vibration issues
    elif machine_name == "Magnus":
        params['spindle_speed']['target_abnormal_pct'] *= 1.05  # 5% more spindle speed issues
    elif machine_name == "Harald":
        # Harald is generally more reliable but wears tools faster
        for sensor in ['feed_rate', 'vibration', 'spindle_speed']:
            params[sensor]['target_abnormal_pct'] *= 0.9  # 10% fewer issues overall
    
    metrics = {
        'BATCH_ID': batch_id,
        'MACHINE_NAME': machine_name
    }
    quality_impact = 0
    
    # Generate outlier data if requested
    if is_outlier:
        # Pick a random sensor to have extreme values
        outlier_sensor = random.choice(['feed_rate', 'vibration', 'spindle_speed'])
        params[outlier_sensor]['target_abnormal_pct'] = random.uniform(60, 80)
        
        # For tool wear, potential extreme value
        if random.random() > 0.5:
            tool_wear = random.uniform(80, 95)  # Extremely worn tool
        else:
            tool_wear = random.uniform(30, 70)  # Normal range
    else:
        # Harald wears tools faster
        if machine_name == "Harald":
            tool_wear = random.uniform(40, 75)  # Higher tool wear
        else:
            tool_wear = random.uniform(30, 70)  # Normal range
    
    for sensor in ['feed_rate', 'vibration', 'spindle_speed']:
        p = params[sensor]
        abnormal_pct = p['target_abnormal_pct']
        metrics[f'{sensor.upper()}_ABNORMAL_PCT'] = abnormal_pct
        
        nominal_avg = (p['nominal_min'] + p['nominal_max']) / 2
        total_range = p['total_max'] - p['total_min']
        
        # Add more deviation for outliers
        if is_outlier and sensor == outlier_sensor:
            bias = 1 if random.random() > 0.5 else -1
            avg_deviation = total_range * 0.3 * bias  # More extreme deviation
        elif abnormal_pct > 15:
            bias = 1 if random.random() > 0.5 else -1
            avg_deviation = total_range * 0.15 * bias
        else:
            avg_deviation = total_range * 0.05 * random.uniform(-1, 1)
        
        metrics[f'{sensor.upper()}_AVG'] = nominal_avg + avg_deviation
        severity = (abnormal_pct / 100) * (abs(avg_deviation) / total_range)
        quality_impact += severity * p['impact_weight']
    
    metrics['TOOL_WEAR'] = tool_wear
    
    if tool_wear > 50:
        wear_impact = ((tool_wear - 50) / 50) ** 2 * params['tool_wear']['impact_weight']
        quality_impact += wear_impact
    
    # Calculate quality yield
    if is_outlier and random.random() < 0.3:
        # Sometimes outliers have unexpectedly good quality
        metrics['QUALITY_YIELD'] = random.uniform(0.92, 0.96)
    else:
        # Add slight machine-specific offsets to base quality
        machine_base_quality = base_quality
        if machine_name == "Bjorn":
            machine_base_quality -= 0.005  # Slightly lower base quality
        elif machine_name == "Harald":
            machine_base_quality += 0.005  # Slightly higher base quality
            
        metrics['QUALITY_YIELD'] = max(0, min(1, machine_base_quality - quality_impact))
    
    # Add quality category classification
    quality_value = metrics['QUALITY_YIELD']
    if quality_value >= 0.95:
        metrics['QUALITY_CATEGORY'] = 'Excellent'
    elif quality_value >= 0.92:
        metrics['QUALITY_CATEGORY'] = 'Good'
    elif quality_value >= 0.88:
        metrics['QUALITY_CATEGORY'] = 'Acceptable'
    else:
        metrics['QUALITY_CATEGORY'] = 'Poor'
    
    return metrics

def generate_training_dataset(n_batches=100, n_outliers=2):
    """Generate training dataset with n_batches across Viking machines including some outliers"""
    
    training_data = []
    machine_names = ["Bjorn", "Ragnar", "Magnus", "Harald"]
    
    # Generate regular batches for each machine
    for i in range(n_batches - n_outliers):
        batch_id = f"BATCH_{i+1:03d}"
        for machine_name in machine_names:
            batch_metrics = generate_batch_metrics(batch_id, machine_name)
            training_data.append(batch_metrics)
    
    # Generate outlier batches (randomly distributed among machines)
    for i in range(n_outliers):
        batch_id = f"BATCH_{n_batches-n_outliers+i+1:03d}"
        outlier_machine = random.choice(machine_names)
        
        # Generate normal data for non-outlier machines
        for machine_name in machine_names:
            if machine_name == outlier_machine:
                batch_metrics = generate_batch_metrics(batch_id, machine_name, is_outlier=True)
            else:
                batch_metrics = generate_batch_metrics(batch_id, machine_name)
            training_data.append(batch_metrics)
    
    # Create DataFrame
    df = pd.DataFrame(training_data)
    return df

# Create the dataframe
df = generate_training_dataset(100, n_outliers=2)

# Display sample with both continuous and categorical targets
display_columns = [
    'BATCH_ID',
    'MACHINE_NAME',
    'FEED_RATE_ABNORMAL_PCT', 'FEED_RATE_AVG',
    'VIBRATION_ABNORMAL_PCT', 'VIBRATION_AVG',
    'SPINDLE_SPEED_ABNORMAL_PCT', 'SPINDLE_SPEED_AVG',
    'TOOL_WEAR',
    'QUALITY_YIELD',
    'QUALITY_CATEGORY'
]

# Display sample data
print("\nSample of training data:")
print(df[display_columns].head())

# Show quality distribution
print("\nQuality Category Distribution:")
print(df['QUALITY_CATEGORY'].value_counts())

# Show quality by machine
print("\nAverage Quality by Machine:")
print(df.groupby('MACHINE_NAME')['QUALITY_YIELD'].mean())

# Show extreme values (potential outliers)
print("\nPotential Outliers:")
outliers = df[
    (df['FEED_RATE_ABNORMAL_PCT'] > 50) | 
    (df['VIBRATION_ABNORMAL_PCT'] > 50) | 
    (df['SPINDLE_SPEED_ABNORMAL_PCT'] > 50) |
    (df['TOOL_WEAR'] > 80)
]
print(outliers[display_columns].head())

Step 3:  Convert Pandas Dataframe to Snowpark Dataframe, then save to Snowflake Table

In [None]:
snowpark_df = session.create_dataframe(df)
# Write Snowpark DataFrame to Snowflake table
table_name = "demo_db.streaming.train_table"
snowpark_df.write.mode("overwrite").save_as_table(table_name)