# Installation of nessasary libraries

In [None]:
!pip uninstall -y torch torchvision torchaudio
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128
!pip install anomalib opencv-python numpy
!pip install Pillow

!python -c "import torch; print(torch.__version__); print('cuda:', torch.version.cuda); print('available:', torch.cuda.is_available()); print('gpu:', (torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'NO GPU'))"
!python -c "from anomalib.models import Patchcore; print('PatchCore OK')"

!pip install kagglehub


# Importing and setting up global variables

## Imports

In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import time
from preprocess import preprocess

from anomalib.data import Folder
from anomalib.data.utils import TestSplitMode
from anomalib.engine import Engine
from anomalib.models import Patchcore
from anomalib.utils.post_processing import superimpose_anomaly_map, add_normal_label, add_anomalous_label, compute_mask

import kagglehub

## Download Kaggle dataset

In [None]:


# Download latest version
path = kagglehub.dataset_download("nileshamarathunge/3d-print-failure-detection")

print("Path to dataset files:", path)

## Global Variables

In [None]:
os.environ["TERM"] = "dumb"   # disables rich features like progress bars in many notebook contexts
os.environ["NO_COLOR"] = "1"  # optional: reduces fancy output further

In [None]:
raw_train_dir = "raw/good"
raw_test_dir = "raw/test"
processed_train_dir = "data/good"
processed_test_dir = "data/test"
result_dir = "data/results"
overview_file = "/model_overview.csv"

In [None]:
import pandas as pd

print(f"Loading model overview... ({overview_file})")
if os.path.exists(overview_file):
    model_versions_df = pd.read_csv(overview_file)
    print(f"Loaded model_versions from {overview_file} ({len(model_versions_df)} rows)")
else:
    cols = [
        "batch_size",
        "sampling_ratio",
        "num_neighbors",
        "training_time",
        "testing_time",
        "train_accuracy",
        "test_accuracy",
    ]
    model_versions_df = pd.DataFrame(columns=cols)
    model_versions_df.to_csv(overview_file, index=False)
    print(f"Created new model_versions file at {overview_file}")

# Processing

## Training data

In [None]:
if not os.path.exists(processed_train_dir):
    os.makedirs(processed_train_dir)
    print(f"Created output directory: {processed_train_dir}")
else:
    print(f"Output directory already exists: {processed_train_dir}")

image_files = [f for f in os.listdir(raw_train_dir) if f.lower().endswith(".jpg")]
image_files.sort()  # Ensure consistent order

counter = 1

# tqdm shows: progress bar, n/N, %, elapsed, ETA by default
for filename in tqdm(image_files, desc="Processing images", unit="img", dynamic_ncols=True):
    preprocess(raw_train_dir, filename, processed_train_dir, counter)
    counter += 1

print(f"Finished resizing and saving {counter - 1} images to {processed_train_dir}")

## Test data

In [None]:
if not os.path.exists(processed_test_dir):
    os.makedirs(processed_test_dir)
    print(f"Created output directory: {processed_test_dir}")
else:
    print(f"Output directory already exists: {processed_test_dir}")

image_files = [f for f in os.listdir(raw_test_dir) if f.lower().endswith('.jpg')]
image_files.sort() # Ensure consistent order

counter = 1
for filename in image_files:
  preprocess(raw_test_dir, filename, processed_test_dir, counter)
  counter += 1

print(f"Finished resizing and saving {counter - 1} images to {processed_test_dir}")

## Output data

In [None]:
if not os.path.exists(result_dir):
    os.makedirs(result_dir)
    print(f"Created output directory: {result_dir}")
else:
    print(f"Output directory already exists: {result_dir}")

# Training

In [None]:
train_batch_size = 2
test_batch_size = 2
coreset_sampling_ratio = 0.02
num_neighbors = 2

### Data Folder setup

In [None]:
datamodule = Folder(
    name="printcam",
    root="data",
    normal_dir="good",
    test_split_mode=TestSplitMode.SYNTHETIC,
    train_batch_size=train_batch_size,
    eval_batch_size=test_batch_size,
    num_workers=os.cpu_count() or 4,
)
datamodule.setup()

### Model setup
* GPU will be used if available for training

In [None]:
model = Patchcore(
    coreset_sampling_ratio=coreset_sampling_ratio,
    num_neighbors=num_neighbors
)

engine = Engine(
    accelerator="gpu",
    devices=1,
    enable_progress_bar=False,
    logger=False,
)

### Model training

In [None]:
# ---- Train + Test ----
# !PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
startTime = time.time()
engine.fit(datamodule=datamodule, model=model)
trainEndTime = time.time()
test_results = engine.test(datamodule=datamodule, model=model)
validationEndTime = time.time()

print("train time:", trainEndTime - startTime)
print("Validation time:", validationEndTime - trainEndTime)
print("Test results:", test_results)
print("Best checkpoint:", engine.best_model_path)

# Testing

Inference on a single image

* Change to cpu for testing

In [None]:
predEngine = Engine(accelerator="cpu")  # set "gpu" if you want
predModel = Patchcore()

In [None]:
CKPT_PATH = "results/Patchcore/printcam/latest/weights/lightning/model.ckpt"
FILENAME = f"frame_{2:06d}.jpg"
IMAGE_PATH = f"{processed_train_dir}/{FILENAME}"

# Engine.predict supports ckpt_path + data_path :contentReference[oaicite:2]{index=2}

startTime = time.time()
preds = predEngine.predict(model=predModel, data_path=IMAGE_PATH, ckpt_path=CKPT_PATH)
endTime = time.time()
print(f"Elapsted time: {endTime-startTime}")
pred = preds[0]

# Extract anomaly map (H x W) :contentReference[oaicite:3]{index=3}
anomaly_map = pred.anomaly_map[0].squeeze().detach().cpu().numpy()

# Load original image and resize to match model input size
h, w = pred.image.shape[-2], pred.image.shape[-1]
image = np.array(Image.open(pred.image_path[0]).convert("RGB").resize((w, h)))

# Overlay heatmap on image :contentReference[oaicite:4]{index=4}
overlay = superimpose_anomaly_map(
    anomaly_map=anomaly_map,
    image=image,
    alpha=0.2,
    normalize=True,
)

score = float(pred.pred_score[0])
label = bool(pred.pred_label[0])

print(f"pred_label={label}  pred_score={score:.6f}")

if score<.75:
  labeled_image = add_normal_label(image, score)
else:
  labeled_image = add_anomalous_label(image, score)

# Show + save
plt.figure(figsize=(12, 4))
plt.subplot(1, 4, 1); plt.title("Input"); plt.imshow(labeled_image); plt.axis("off")
plt.subplot(1, 4, 2); plt.title("Anomaly Map"); plt.imshow(anomaly_map); plt.axis("off")
plt.subplot(1, 4, 3); plt.title("Overlay"); plt.imshow(overlay); plt.axis("off")
plt.subplot(1, 4, 4); plt.title("Mask"); plt.imshow(compute_mask(anomaly_map, 0.5)); plt.axis("off")
plt.tight_layout()
plt.savefig(f"{result_dir}/{FILENAME}", dpi=200)
plt.show()