In [26]:
# 🧠 Cell 1: Core Libraries
import os
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from PIL import Image
from collections import deque
import numpy as np
import pathlib


In [11]:
# 🌱 Cell 2: Reproducibility
random.seed(42)
torch.manual_seed(42)


<torch._C.Generator at 0x15905e585d0>

In [12]:
# 📂 Cell 3: Project Paths
PROJECT_ROOT = pathlib.Path(os.getcwd()).resolve().parent
DATA_DIR = PROJECT_ROOT / "data" / "preprocessed" / "fertilizer" / "YaraMila"
IMAGE_DIR = str(DATA_DIR)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
IMAGE_SIZE = (128, 128)
NUM_EPISODES = 5
MAX_STEPS = 20
LR = 1e-4
THRESHOLD = 0.0148  # ← you can inject from settings.yaml if preferred

print(f"🛠 Using image folder: {IMAGE_DIR}")


🛠 Using image folder: D:\Documents\CODE_WITH_ERICADESHH\GitHub\HarvestGuard\data\preprocessed\fertilizer\YaraMila


In [14]:
# 🔍 Cell 4: Load Trained Autoencoder with Scoring Logic

import sys

# Add project root to sys.path so we can import from models/
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

from models.autoencoder import ConvAutoencoder

# Load the autoencoder
autoencoder = ConvAutoencoder().to(DEVICE)
autoencoder_path = PROJECT_ROOT / "models" / "checkpoints" / "autoencoder.pth"
autoencoder.load_state_dict(torch.load(autoencoder_path, map_location=DEVICE))
autoencoder.eval()

# Scoring function (MSE)
def anomaly_score(img_tensor):
    with torch.no_grad():
        recon = autoencoder(img_tensor)
        return torch.nn.functional.mse_loss(recon, img_tensor).item()


In [15]:
# 🧪 Cell 5: Enhanced RL Environment with anomaly scoring

class SmarterImageScanEnv:
    def __init__(self, image_dir, autoencoder_model):
        self.image_dir = image_dir
        self.autoencoder = autoencoder_model
        self.index = 0

        if not os.path.exists(image_dir):
            raise FileNotFoundError(f"❌ Directory not found: {image_dir}")

        self.images = sorted([
            f for f in os.listdir(image_dir)
            if f.lower().endswith(('.jpg', '.png', '.jpeg'))
        ])

        if not self.images:
            raise ValueError(f"❌ No valid image files in: {image_dir}")

        self.transform = transforms.Compose([
            transforms.Resize(IMAGE_SIZE),
            transforms.ToTensor()
        ])

    def reset(self):
        self.index = 0
        return self._get_obs()

    def _get_obs(self):
        if self.index >= len(self.images):
            return None
        path = os.path.join(self.image_dir, self.images[self.index])
        image = Image.open(path).convert("RGB")
        return self.transform(image).unsqueeze(0).to(DEVICE)

    def step(self, action):
        """Action: 0 = mark as REAL, 1 = mark as FAKE"""
        img_tensor = self._get_obs()

        # Compute anomaly score (lower = more real)
        score = anomaly_score(img_tensor)
        threshold = 0.0148  # Ideally this should be dynamic/configurable

        is_anomaly = score > threshold
        user_feedback = 1 if is_anomaly else 0  # Simulated

        # Reward logic
        if action == 1 and is_anomaly:
            reward = +1  # Correctly flagged fake
        elif action == 0 and not is_anomaly:
            reward = +1  # Correctly identified real
        else:
            reward = -1  # Wrong classification

        self.index += 1
        done = self.index >= len(self.images)
        return self._get_obs(), reward, done


In [17]:
# 🏋️ Cell 6: Setup training with smarter environment
# 🤖 Cell X: Define Enhanced Agent
class Agent(nn.Module):
    def __init__(self, input_shape=(3, 128, 128), num_actions=2):
        super(Agent, self).__init__()
        c, h, w = input_shape
        self.net = nn.Sequential(
            nn.Conv2d(c, 16, kernel_size=3, stride=2, padding=1),  # (B, 16, 64, 64)
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),  # (B, 32, 32, 32)
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(32 * 32 * 32, 256),
            nn.ReLU(),
            nn.Linear(256, num_actions)
        )

    def forward(self, x):
        return self.net(x)

env = SmarterImageScanEnv(IMAGE_DIR, autoencoder)
model = Agent().to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LR)
loss_fn = nn.MSELoss()
gamma = 0.95  # Discount factor


In [19]:
# 🔁 Cell 7: Train Agent with Smart Reward Logic
from collections import deque

memory = deque(maxlen=1000)

for episode in range(NUM_EPISODES):
    state = env.reset()
    total_reward = 0

    for step in range(MAX_STEPS):
        q_values = model(state)
        action = torch.argmax(q_values).item()

        next_state, reward, done = env.step(action)
        total_reward += reward

        # Estimate future reward (Q-learning)
        with torch.no_grad():
            next_q_values = model(next_state) if next_state is not None else torch.zeros_like(q_values)
            max_next_q = torch.max(next_q_values)
            target_q = q_values.clone()
            target_q[0][action] = reward + gamma * max_next_q

        # Loss & Backprop
        loss = loss_fn(q_values, target_q)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if done or next_state is None:
            break

        state = next_state

    print(f"📈 Episode {episode+1}/{NUM_EPISODES} | Total Reward: {total_reward}")


📈 Episode 1/5 | Total Reward: 20
📈 Episode 2/5 | Total Reward: 20
📈 Episode 3/5 | Total Reward: 20
📈 Episode 4/5 | Total Reward: 20
📈 Episode 5/5 | Total Reward: 20


In [20]:
# 💾 Cell 8: Save the trained model
CHECKPOINT_DIR = PROJECT_ROOT / "models" / "checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

checkpoint_path = CHECKPOINT_DIR / "rl_agent_smart.pth"
torch.save(model.state_dict(), checkpoint_path)
print(f"✅ Smarter RL agent saved to: {checkpoint_path}")


✅ Smarter RL agent saved to: D:\Documents\CODE_WITH_ERICADESHH\GitHub\HarvestGuard\models\checkpoints\rl_agent_smart.pth


In [21]:
# 🧠 Cell 9: Evaluate Smarter Agent
print("🔍 Agent Evaluation Run:")

state = env.reset()
done = False
total_reward = 0

while not done and state is not None:
    with torch.no_grad():
        q_values = model(state)
        action = torch.argmax(q_values).item()
    next_state, reward, done = env.step(action)
    total_reward += reward
    state = next_state

print(f"✅ Total Reward Collected by Smart Agent: {total_reward}")


🔍 Agent Evaluation Run:
✅ Total Reward Collected by Smart Agent: 30


In [27]:
# 📦 Imports
import matplotlib.pyplot as plt
import torchvision.transforms.functional as TF
from datetime import datetime
from pathlib import Path
from PIL import Image
import torch
import os

from matplotlib.backends.backend_pdf import PdfPages  # For PDF export
from IPython.display import display, HTML  # For inline display or HTML export


In [28]:
# 📊 RL Agent Demo with Visual Output
print("🎞️ RL Agent Visual Inference Demo:")

env.reset()
done = False
state = env._get_obs()
total_reward = 0
step = 0

results = []
figures = []

pdf_report = PdfPages("rl_agent_report.pdf")  # 📄 PDF init

while not done and state is not None:
    with torch.no_grad():
        # ✅ Fix shape bug
        if state.dim() == 3:
            state_input = state.unsqueeze(0)  # (1, 3, 128, 128)
        else:
            state_input = state  # Already batched

        action_values = model(state_input)
        action = torch.argmax(action_values).item()

        # 🎯 Inference from autoencoder
        recon = autoencoder(state_input)
        error = torch.mean((recon - state_input) ** 2).item()

        # 🔍 Result Label
        result = "✅ Accept" if action == 0 else "🚨 Flag"

        # 📊 Store this step
        results.append({
            "step": step,
            "image_path": env.images[env.index],
            "action": result,
            "error": round(error, 5)
        })

        # 🖼 Side-by-side view
        fig, axs = plt.subplots(1, 2, figsize=(8, 4))
        axs[0].imshow(TF.to_pil_image(state.squeeze().cpu()))
        axs[0].set_title("Original")
        axs[0].axis('off')

        axs[1].imshow(TF.to_pil_image(recon.squeeze().cpu()))
        axs[1].set_title(f"Reconstruction\nError: {round(error, 5)}")
        axs[1].axis('off')

        fig.suptitle(f"Step {step} | {result}", fontsize=12)
        figures.append(fig)
        pdf_report.savefig(fig)  # Save this fig to PDF
        plt.close(fig)

        # 🔁 Next step
        next_state, reward, done = env.step(action)
        total_reward += reward
        state = next_state
        step += 1

pdf_report.close()
print(f"📄 PDF report saved: rl_agent_report.pdf")
print(f"🏁 Demo finished — Total Reward Collected: {total_reward}")


🎞️ RL Agent Visual Inference Demo:


  pdf_report.savefig(fig)  # Save this fig to PDF


📄 PDF report saved: rl_agent_report.pdf
🏁 Demo finished — Total Reward Collected: 30


In [30]:
# 📁 Save results as HTML (inline + file)
html_rows = "".join([
    f"<tr><td>{r['step']}</td><td>{r['image_path']}</td><td>{r['action']}</td><td>{r['error']}</td></tr>"
    for r in results
])
html_table = f"""
<h3>RL Agent Decision Log</h3>
<table border='1' style='border-collapse: collapse;'>
<tr><th>Step</th><th>Image</th><th>Action</th><th>Recon Error</th></tr>
{html_rows}
</table>
"""

# Inline display
display(HTML(html_table))

# ✅ Save HTML using utf-8 to fix emoji error
with open("rl_agent_results.html", "w", encoding="utf-8") as f:
    f.write(html_table)

print("✅ HTML log saved: rl_agent_results.html")



Step,Image,Action,Recon Error
0,000001.jpg,✅ Accept,0.00663
1,000002.jpg,✅ Accept,0.007
2,000003.jpg,✅ Accept,0.00787
3,000004.jpg,✅ Accept,0.00766
4,000005.jpg,✅ Accept,0.00251
5,000006.jpg,✅ Accept,0.00703
6,000007.jpg,✅ Accept,0.00182
7,000008.jpg,✅ Accept,0.00243
8,000009.jpg,✅ Accept,0.00646
9,000010.jpg,✅ Accept,0.0029


✅ HTML log saved: rl_agent_results.html
