# Test LSTM Food Freshness Model

This notebook loads the trained ONNX model and tests it with sample sensor data.

In [1]:
# Install required packages if needed
!pip install onnxruntime pandas numpy scikit-learn

ERROR: Could not find a version that satisfies the requirement onnxruntime (from versions: none)

[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\Asus\AppData\Local\Programs\Python\Python314\python.exe -m pip install --upgrade pip
ERROR: No matching distribution found for onnxruntime


In [None]:
import numpy as np
import pandas as pd
import onnxruntime as ort
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

In [None]:
# --- Configuration ---
ONNX_MODEL_PATH = "lstm_food_freshness.onnx"
DATA_FILE = "food_freshness_dataset.csv"
SEQUENCE_LENGTH = 10  # Must match training configuration

# Sensor columns (3 analog sensors)
SENSOR_COLUMNS = ['MQ135_Analog', 'MQ3_Analog', 'MiCS5524_Analog']

In [None]:
# --- Load ONNX Model ---
print(f"Loading ONNX model from {ONNX_MODEL_PATH}...")
ort_session = ort.InferenceSession(ONNX_MODEL_PATH)

# Get model input details
input_name = ort_session.get_inputs()[0].name
output_name = ort_session.get_outputs()[0].name

print(f"Model loaded successfully!")
print(f"Input name: {input_name}")
print(f"Output name: {output_name}")
print(f"Expected input shape: (batch_size, {SEQUENCE_LENGTH}, {len(SENSOR_COLUMNS)})")

In [None]:
# --- Function to Create Sequences ---
def create_test_sequence(data, start_idx, seq_length=10):
    """
    Create a single sequence from the data starting at start_idx.
    Returns shape (1, seq_length, num_features) for model input.
    """
    if start_idx + seq_length > len(data):
        raise ValueError(f"Not enough data. Need {seq_length} samples from index {start_idx}")

    sequence = data[start_idx:start_idx + seq_length]
    # Reshape to (1, seq_length, num_features) for batch size of 1
    return sequence.reshape(1, seq_length, -1).astype(np.float32)

# --- Function to Make Prediction (Dual Outputs) ---
def predict(sequence):
    """
    Make prediction using ONNX model.
    Returns classification probability, class label, and RSL hours.
    """
    ort_inputs = {input_name: sequence}
    ort_outputs = ort_session.run(None, ort_inputs)
    
    # Output 1: Classification
    classification_prob = ort_outputs[0][0][0]
    classification_pred = 1 if classification_prob > 0.5 else 0
    
    # Output 2: RSL
    rsl_hours = max(0, ort_outputs[1][0][0])  # Ensure non-negative
    
    return classification_prob, classification_pred, rsl_hours


In [None]:
# --- Prepare Scaler (fit on all data for testing purposes) ---
X = df[SENSOR_COLUMNS].values
y = df['Output'].values

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print(f"\nData scaled. Shape: {X_scaled.shape}")
print(f"Feature means: {scaler.mean_}")
print(f"Feature std devs: {scaler.scale_}")

In [None]:
# --- Function to Create Sequences ---
def create_test_sequence(data, start_idx, seq_length=10):
    """
    Create a single sequence from the data starting at start_idx.
    Returns shape (1, seq_length, num_features) for model input.
    """
    if start_idx + seq_length > len(data):
        raise ValueError(f"Not enough data. Need {seq_length} samples from index {start_idx}")
    
    sequence = data[start_idx:start_idx + seq_length]
    # Reshape to (1, seq_length, num_features) for batch size of 1
    return sequence.reshape(1, seq_length, -1).astype(np.float32)

# --- Function to Make Prediction ---
def predict(sequence):
    """
    Make prediction using ONNX model.
    Returns probability and class label.
    """
    ort_inputs = {input_name: sequence}
    ort_outputs = ort_session.run(None, ort_inputs)
    probability = ort_outputs[0][0][0]
    prediction = 1 if probability > 0.5 else 0
    return probability, prediction

## Test with Random Samples

In [None]:
# --- TEST 1: Random samples from dataset (Dual Output) ---
print("=" * 80)
print("TEST 1: Testing with random samples from dataset (showing dual outputs)")
print("=" * 80)

num_tests = 5
random_indices = np.random.choice(len(df) - sequence_length, num_tests, replace=False)

for i, idx in enumerate(random_indices, 1):
    # Get the actual label for this sequence (label at the end of sequence)
    actual_class = df.iloc[idx + sequence_length - 1]['Output']
    actual_rsl = df.iloc[idx + sequence_length - 1]['RSL_Hours']

    # Create sequence from scaled features
    sequence = create_test_sequence(features_scaled, idx, sequence_length)

    # Get prediction
    prob, pred_class, pred_rsl = predict(sequence)

    # Print results
    print(f"\n--- Test {i} (Index {idx}) ---")
    print(f"Last 3 sensor values in sequence:")
    print(f"  MQ135: {df.iloc[idx+sequence_length-1]['MQ135_Analog']:.0f}")
    print(f"  MQ3:   {df.iloc[idx+sequence_length-1]['MQ3_Analog']:.0f}")
    print(f"  MiCS:  {df.iloc[idx+sequence_length-1]['MiCS5524_Analog']:.0f}")
    print(f"\nClassification:")
    print(f"  Actual:     {'Fresh (1)' if actual_class == 1 else 'Bad (0)'}")
    print(f"  Predicted:  {'Fresh (1)' if pred_class == 1 else 'Bad (0)'}")
    print(f"  Confidence: {prob:.4f}")
    print(f"  Status:     {'✓ CORRECT' if pred_class == actual_class else '✗ WRONG'}")
    print(f"\nRemaining Shelf Life:")
    print(f"  Actual:     {actual_rsl:.1f} hours")
    print(f"  Predicted:  {pred_rsl:.1f} hours")
    print(f"  Error:      {abs(pred_rsl - actual_rsl):.1f} hours")

print("\n" + "=" * 80)


## Test with Custom Sensor Values

In [None]:
# --- TEST 2: Custom Sensor Values (Dual Output) ---
print("=" * 80)
print("TEST 2: Testing with custom sensor values (showing dual outputs)")
print("=" * 80)

# Define test scenarios
test_scenarios = [
    {
        "name": "Very Fresh Food",
        "values": (150, 120, 180),
        "description": "All sensors in fresh range"
    },
    {
        "name": "Slightly Fresh Food",
        "values": (300, 280, 320),
        "description": "Sensors near upper fresh range"
    },
    {
        "name": "Borderline Food",
        "values": (400, 380, 420),
        "description": "Sensors at threshold"
    },
    {
        "name": "Spoiled Food",
        "values": (700, 650, 720),
        "description": "All sensors indicating spoilage"
    },
    {
        "name": "Very Spoiled Food",
        "values": (950, 900, 980),
        "description": "Sensors at maximum spoilage"
    }
]

for scenario in test_scenarios:
    print(f"\n--- {scenario['name']} ---")
    print(f"Description: {scenario['description']}")
    print(f"Sensor Values: MQ135={scenario['values'][0]}, MQ3={scenario['values'][1]}, MiCS={scenario['values'][2]}")

    # Create a sequence with the same values repeated
    custom_values = np.array([scenario['values']] * sequence_length)
    custom_scaled = scaler.transform(custom_values)
    sequence = custom_scaled.reshape(1, sequence_length, -1).astype(np.float32)

    # Get prediction
    prob, pred_class, pred_rsl = predict(sequence)

    # Print results
    print(f"\nClassification:")
    print(f"  Predicted:  {'Fresh (1)' if pred_class == 1 else 'Bad (0)'}")
    print(f"  Confidence: {prob:.4f}")
    print(f"\nRemaining Shelf Life:")
    print(f"  Predicted:  {pred_rsl:.1f} hours")
    print(f"  Status:     {'Expected: Long shelf life' if pred_class == 1 else 'Expected: Short/No shelf life'}")

print("\n" + "=" * 80)


## Visualize Predictions on Sample Data

In [None]:
# --- TEST 3: Batch Testing (Dual Output) ---
print("=" * 80)
print("TEST 3: Batch testing on test set (showing dual outputs)")
print("=" * 80)

# Select a random batch of sequences for testing
batch_size = 100
start_indices = np.random.choice(len(df) - sequence_length, batch_size, replace=False)

classification_correct = 0
classification_probs = []
classification_actuals = []
classification_preds = []

rsl_errors = []
rsl_actuals = []
rsl_preds = []

for idx in start_indices:
    # Get actual values
    actual_class = df.iloc[idx + sequence_length - 1]['Output']
    actual_rsl = df.iloc[idx + sequence_length - 1]['RSL_Hours']

    # Create sequence and predict
    sequence = create_test_sequence(features_scaled, idx, sequence_length)
    prob, pred_class, pred_rsl = predict(sequence)

    # Track classification metrics
    classification_probs.append(prob)
    classification_actuals.append(actual_class)
    classification_preds.append(pred_class)
    if pred_class == actual_class:
        classification_correct += 1

    # Track RSL metrics
    rsl_actuals.append(actual_rsl)
    rsl_preds.append(pred_rsl)
    rsl_errors.append(abs(pred_rsl - actual_rsl))

# Calculate metrics
classification_accuracy = (classification_correct / batch_size) * 100
rsl_mae = np.mean(rsl_errors)
rsl_rmse = np.sqrt(np.mean(np.array(rsl_errors) ** 2))

print(f"\nClassification Results:")
print(f"  Batch Size: {batch_size}")
print(f"  Accuracy:   {classification_accuracy:.2f}%")
print(f"  Correct:    {classification_correct}/{batch_size}")

print(f"\nRSL Regression Results:")
print(f"  MAE (Mean Absolute Error): {rsl_mae:.2f} hours")
print(f"  RMSE (Root Mean Squared Error): {rsl_rmse:.2f} hours")
print(f"  Min Error:  {min(rsl_errors):.2f} hours")
print(f"  Max Error:  {max(rsl_errors):.2f} hours")

print("\n" + "=" * 80)


In [None]:
# --- Visualize Batch Results (Dual Output) ---
import matplotlib.pyplot as plt

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Classification Confusion Matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(classification_actuals, classification_preds)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0, 0])
axes[0, 0].set_title('Classification Confusion Matrix')
axes[0, 0].set_xlabel('Predicted')
axes[0, 0].set_ylabel('Actual')
axes[0, 0].set_xticklabels(['Bad (0)', 'Fresh (1)'])
axes[0, 0].set_yticklabels(['Bad (0)', 'Fresh (1)'])

# Plot 2: Classification Confidence Distribution
axes[0, 1].hist(classification_probs, bins=20, edgecolor='black', alpha=0.7)
axes[0, 1].set_title('Classification Confidence Distribution')
axes[0, 1].set_xlabel('Confidence (Probability)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].axvline(x=0.5, color='r', linestyle='--', label='Threshold')
axes[0, 1].legend()

# Plot 3: RSL Actual vs Predicted
axes[1, 0].scatter(rsl_actuals, rsl_preds, alpha=0.5)
axes[1, 0].plot([0, max(rsl_actuals)], [0, max(rsl_actuals)], 'r--', label='Perfect Prediction')
axes[1, 0].set_title('RSL: Actual vs Predicted')
axes[1, 0].set_xlabel('Actual RSL (hours)')
axes[1, 0].set_ylabel('Predicted RSL (hours)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: RSL Error Distribution
axes[1, 1].hist(rsl_errors, bins=20, edgecolor='black', alpha=0.7, color='orange')
axes[1, 1].set_title('RSL Prediction Error Distribution')
axes[1, 1].set_xlabel('Absolute Error (hours)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].axvline(x=rsl_mae, color='r', linestyle='--', label=f'MAE: {rsl_mae:.2f}h')
axes[1, 1].legend()

plt.tight_layout()
plt.show()

print("Visualizations complete!")


## Test Single Reading Function (Arduino-like)

This simulates how you would use the model with real-time sensor readings from Arduino.

In [None]:
# --- Arduino Simulation: SensorBuffer Class (Dual Output) ---
class SensorBuffer:
    """
    Simulates Arduino's rolling buffer for sensor readings.
    Maintains last 10 readings for each sensor.
    """
    def __init__(self, buffer_size=10):
        self.buffer_size = buffer_size
        self.mq135_buffer = []
        self.mq3_buffer = []
        self.mics_buffer = []
        self.scaler = scaler  # Use the loaded scaler

    def add_reading(self, mq135, mq3, mics):
        """Add new sensor readings to buffers"""
        self.mq135_buffer.append(mq135)
        self.mq3_buffer.append(mq3)
        self.mics_buffer.append(mics)

        # Keep only last buffer_size readings
        if len(self.mq135_buffer) > self.buffer_size:
            self.mq135_buffer.pop(0)
            self.mq3_buffer.pop(0)
            self.mics_buffer.pop(0)

    def is_ready(self):
        """Check if buffer is full"""
        return len(self.mq135_buffer) == self.buffer_size

    def get_prediction(self):
        """Get prediction from current buffer (dual outputs)"""
        if not self.is_ready():
            return None, None, None

        # Prepare sequence
        sequence_data = np.column_stack([
            self.mq135_buffer,
            self.mq3_buffer,
            self.mics_buffer
        ])

        # Scale the sequence
        scaled_sequence = self.scaler.transform(sequence_data)

        # Reshape for model input
        sequence = scaled_sequence.reshape(1, self.buffer_size, -1).astype(np.float32)

        # Get prediction
        ort_inputs = {input_name: sequence}
        ort_outputs = ort_session.run(None, ort_inputs)

        # Parse outputs
        classification_prob = ort_outputs[0][0][0]
        classification_pred = 1 if classification_prob > 0.5 else 0
        rsl_hours = max(0, ort_outputs[1][0][0])

        return classification_prob, classification_pred, rsl_hours


# --- Simulate Arduino readings (Dual Output) ---
print("=" * 80)
print("ARDUINO SIMULATION: Rolling buffer predictions (showing dual outputs)")
print("=" * 80)

buffer = SensorBuffer(buffer_size=10)

# Simulate 15 sensor readings (from fresh to spoiled)
simulated_readings = [
    (150, 130, 170),  # Very fresh
    (160, 140, 180),
    (170, 150, 190),
    (200, 180, 220),
    (250, 230, 270),
    (300, 280, 320),  # Still fresh
    (350, 330, 370),
    (400, 380, 420),  # Borderline
    (500, 480, 520),  # Getting bad
    (600, 580, 620),  # Bad
    (700, 680, 720),  # Very bad
    (750, 730, 770),
    (800, 780, 820),
    (850, 830, 870),
    (900, 880, 920),  # Extremely spoiled
]

for i, (mq135, mq3, mics) in enumerate(simulated_readings, 1):
    buffer.add_reading(mq135, mq3, mics)

    print(f"\nReading {i}: MQ135={mq135}, MQ3={mq3}, MiCS={mics}")

    if buffer.is_ready():
        prob, pred_class, pred_rsl = buffer.get_prediction()
        status = "Fresh (1)" if pred_class == 1 else "Bad (0)"
        print(f"  Classification: {status} (Confidence: {prob:.4f})")
        print(f"  RSL: {pred_rsl:.1f} hours")
    else:
        print(f"  Buffer filling... ({len(buffer.mq135_buffer)}/{buffer.buffer_size} readings)")

print("\n" + "=" * 80)
print("Arduino simulation complete!")
print("This demonstrates how the model would work on Arduino with rolling buffer.")
print("=" * 80)


## Summary

The LSTM model has been successfully loaded and tested with:
1. ✅ Random samples from the dataset
2. ✅ Custom fresh food sensor values
3. ✅ Custom spoiled food sensor values
4. ✅ Batch predictions with visualization
5. ✅ Real-time sensor buffer simulation (Arduino-like)

The model is ready for deployment!