In [None]:
import os
from google.colab import userdata

# Kaggle kimlik bilgilerini ortam değişkenlerine ayarla
os.environ['KAGGLE_USERNAME'] = userdata.get('KAGGLE_USERNAME')
os.environ['KAGGLE_KEY'] = userdata.get('KAGGLE_KEY')

print("Kaggle kimlik bilgileri başarıyla ayarlandı.")

In [None]:
# Veri setini indir
!kaggle datasets download -d ipythonx/mvtec-ad

# İndirilen zip dosyasını aç
!unzip mvtec-ad.zip -d /content/mvtec-ad

print("Veri seti başarıyla indirildi ve açıldı.")

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import cv2
import os
from pathlib import Path
import matplotlib.pyplot as plt

def ssim_loss(y_true, y_pred):
    return 1 - tf.image.ssim(y_true, y_pred, max_val=1.0)

In [None]:
def load_images(data_path, img_size=(256, 256)):
    images = []
    for img_file in Path(data_path).glob('**/*.png'):
        img = cv2.imread(str(img_file))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, img_size)
        img = img.astype('float32') / 255.0
        images.append(img)
    return np.array(images)

#Büyük işlemlerde DataLoader kullanmak ram için daha verimli
category = 'leather'  # kategori
train_path = f'/content/mvtec-ad/{category}/train/good'

train_images = load_images(train_path)

In [None]:
def build_autoencoder(input_shape=(256, 256, 3)):
    # Encoder
    encoder_input = keras.Input(shape=input_shape)

    x = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoder_input)
    x = keras.layers.MaxPooling2D((2, 2), padding='same')(x)

    x = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = keras.layers.MaxPooling2D((2, 2), padding='same')(x)

    x = keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    encoded = keras.layers.MaxPooling2D((2, 2), padding='same')(x)

    # Decoder
    x = keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(encoded)
    x = keras.layers.UpSampling2D((2, 2))(x)

    x = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = keras.layers.UpSampling2D((2, 2))(x)

    x = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = keras.layers.UpSampling2D((2, 2))(x)

    decoded = keras.layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    autoencoder = keras.Model(encoder_input, decoded)
    return autoencoder

model = build_autoencoder()
model.summary()

In [None]:
# Model derleme
model.compile(
    optimizer='adam',
    loss=ssim_loss,  # SSIM loss kullanıldı
    metrics=['mae']
)

# Eğitim
history = model.fit(
    train_images, train_images,  # Input ve output aynı
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    shuffle=True,
    callbacks=[
        keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5)
    ]
)

# Modeli kaydetme (Legacy format düzeltildi: .h5 -> .keras)
model.save('autoencoder_model.keras')

In [None]:
def calculate_anomaly_scores(model, images):
    """
    Verilen görüntüler için modelin rekonstrüksiyonuna göre
    SSIM tabanlı anomali skorlarını hesaplar.
    Skor = 1 - SSIM (0'a yakınsa iyi, 1'e yakınsa kötü/anomali)
    """
    reconstructed = model.predict(images, verbose=0)
    # TF SSIM hesaplaması
    # SSIM (Batch, ) boyutunda bir tensör döndürür
    ssim_values = tf.image.ssim(images, reconstructed, max_val=1.0)
    # Loss'a çeviriyoruz (Anomali skoru)
    scores = 1.0 - ssim_values.numpy()
    return scores, reconstructed

def detect_anomalies(model, test_images, threshold):
    """
    Belirlenen threshold'a göre anomali tespiti yapar.
    """
    scores, reconstructed = calculate_anomaly_scores(model, test_images)
    is_anomaly = scores > threshold
    return scores, is_anomaly, reconstructed

# 1. Threshold Hesaplama (TEMİZ EĞİTİM SETİ ÜZERİNDEN)
# Issue Düzeltmesi: Threshold test setinden değil, train setinden hesaplanmalı.
print("Threshold temiz eğitim seti üzerinden hesaplanıyor...")
train_scores, _ = calculate_anomaly_scores(model, train_images)

# Ortalama + 2 Standart Sapma (veya 3) kuralı
threshold = np.mean(train_scores) + 2 * np.std(train_scores)
print(f"Belirlenen Threshold (SSIM Loss): {threshold:.4f}")

# 2. Test Aşaması
test_path = f'/content/mvtec-ad/{category}/test'
test_images = load_images(test_path)

# Hesaplanan threshold değerini kullanıyoruz
test_scores, is_anomaly, reconstructed = detect_anomalies(model, test_images, threshold=threshold)

In [None]:
def visualize_results(original, reconstructed, mse, n_samples=5):
    plt.figure(figsize=(15, 6))

    for i in range(n_samples):
        # Orijinal görüntü
        plt.subplot(3, n_samples, i + 1)
        plt.imshow(original[i])
        plt.title(f'Original')
        plt.axis('off')

        # Rekonstrüksiyon
        plt.subplot(3, n_samples, n_samples + i + 1)
        plt.imshow(reconstructed[i])
        plt.title(f'Reconstructed')
        plt.axis('off')

        # Fark haritası
        plt.subplot(3, n_samples, 2*n_samples + i + 1)
        diff = np.abs(original[i] - reconstructed[i])
        plt.imshow(diff)
        plt.title(f'MSE: {mse[i]:.4f}')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

visualize_results(test_images, reconstructed, mse_scores)

In [None]:
def evaluate_model(model, threshold, category=category):
    # Normal örnekler
    normal_path = f'/content/mvtec-ad/{category}/test/good'
    normal_images = load_images(normal_path)

    # SSIM Skorlarını al
    normal_scores, _ = calculate_anomaly_scores(model, normal_images)

    # Anomalili örnekler (tüm defekt tipleri)
    anomaly_scores_list = []

    test_path = Path(f'/content/mvtec-ad/{category}/test')
    for defect_dir in test_path.iterdir():
        if defect_dir.name != 'good' and defect_dir.is_dir():
            anomaly_imgs = load_images(str(defect_dir))
            if len(anomaly_imgs) > 0:
                scores, _ = calculate_anomaly_scores(model, anomaly_imgs)
                anomaly_scores_list.extend(scores)

    anomaly_scores_np = np.array(anomaly_scores_list)

    # Görselleştirme
    plt.figure(figsize=(12, 6))
    plt.hist(normal_scores, bins=30, alpha=0.7, label='Normal (Clean)', color='green', density=True)
    plt.hist(anomaly_scores_np, bins=30, alpha=0.7, label='Anomaly (Defect)', color='red', density=True)

    # Threshold çizgisini ekle
    plt.axvline(threshold, color='black', linestyle='--', label=f'Threshold: {threshold:.3f}')

    plt.xlabel('Anomaly Score (1 - SSIM)')
    plt.ylabel('Frequency (Density)')
    plt.legend()
    plt.title('SSIM Score Distribution: Normal vs Anomaly')
    plt.show()

    # İstatistikler
    print(f"Normal Score Mean: {normal_scores.mean():.6f} ± {normal_scores.std():.6f}")
    print(f"Anomaly Score Mean: {anomaly_scores_np.mean():.6f} ± {anomaly_scores_np.std():.6f}")

    return normal_scores, anomaly_scores_np

# Fonksiyonu yeni threshold ile çağır
evaluate_model(model, threshold)

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import cv2
import os
from pathlib import Path
import matplotlib.pyplot as plt

def ssim_loss(y_true, y_pred):
    return 1 - tf.image.ssim(y_true, y_pred, max_val=1.0)

In [None]:
def load_images(data_path, img_size=(256, 256)):
    images = []
    path_obj = Path(data_path)
    if not path_obj.exists():
        print(f"Uyarı: {data_path} yolu bulunamadı!")
        return np.array([])

    for img_file in path_obj.glob('**/*.png'):
        img = cv2.imread(str(img_file))
        if img is None: continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, img_size)
        img = img.astype('float32') / 255.0
        images.append(img)
    return np.array(images)

# NOISE INJECTION FUNCTION
def add_noise(images, noise_factor=0.1):
    noisy_images = images + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=images.shape)
    noisy_images = np.clip(noisy_images, 0., 1.)
    return noisy_images

category = 'leather'  # kategori
base_path = '/content/mvtec-ad' if os.path.exists('/content/mvtec-ad') else '.'
train_path = f'{base_path}/{category}/train/good'

train_images_clean = load_images(train_path)
# Add noise to create training inputs
train_images_noisy = add_noise(train_images_clean, noise_factor=0.2)

print(f"Eğitim için {len(train_images_clean)} görüntü yüklendi.")

# Show example of noise
if len(train_images_clean) > 0:
    plt.figure(figsize=(10, 4))
    plt.subplot(1, 2, 1); plt.imshow(train_images_clean[0]); plt.title('Clean')
    plt.subplot(1, 2, 2); plt.imshow(train_images_noisy[0]); plt.title('Noisy (Input)')
    plt.show()

In [None]:
def build_autoencoder_simple(input_shape=(256, 256, 3)):
    # Standard Autoencoder (Simpler than before to prevent Identity Mapping)
    encoder_input = keras.Input(shape=input_shape)

    x = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoder_input)
    x = keras.layers.MaxPooling2D((2, 2), padding='same')(x) # 128

    x = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = keras.layers.MaxPooling2D((2, 2), padding='same')(x) # 64

    x = keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    encoded = keras.layers.MaxPooling2D((2, 2), padding='same')(x) # 32x32 BOttleneck
    # Note: We kept it slightly larger (32x32) because Denoising is the main regularizer now.
    # If it still copies, we will reduce to 16x16.

    x = keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(encoded)
    x = keras.layers.UpSampling2D((2, 2))(x)

    x = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = keras.layers.UpSampling2D((2, 2))(x)

    x = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = keras.layers.UpSampling2D((2, 2))(x)

    decoded = keras.layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    autoencoder = keras.Model(encoder_input, decoded)
    return autoencoder

model = build_autoencoder_simple()
model.compile(optimizer='adam', loss=ssim_loss)
model.summary()

In [None]:
# Eğitim
if len(train_images_clean) > 0:
    history = model.fit(
        train_images_noisy, train_images_clean,  # Input: Noisy, Target: Clean
        epochs=100,
        batch_size=32,
        validation_split=0.2,
        shuffle=True,
        callbacks=[
            keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
            keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5)
        ]
    )
    model.save('denoising_autoencoder.keras')
else:
    print("Eğitilecek veri bulunamadı.")

In [None]:
def detect_anomalies_ssim(model, test_images):
    if len(test_images) == 0: return [], [], []

    # Rekonstrüksiyon
    reconstructed = model.predict(test_images)

    # SSIM MAP Computation
    # tf.image.ssim returns a single score per image. We want a MAP.
    # Unfortunately standard TF doesn't give Map easily. We can use abs diff for viz,
    # but for scoring we use 1 - ssim.

    # 1. SSIM Score (Global)
    ssim_scores = tf.image.ssim(test_images, reconstructed, max_val=1.0).numpy()
    anomaly_score = 1 - ssim_scores  # Higher score = More Anomaly

    # 2. Difference Map (for Visualization)
    diff_map = np.abs(test_images - reconstructed)
    diff_map = np.mean(diff_map, axis=-1) # Grayscale difference

    return anomaly_score, diff_map, reconstructed

# Test veri yükleme
test_path = f'{base_path}/{category}/test'
test_images = load_images(test_path)

if 'model' in locals() and len(test_images) > 0:
    scores, diff_maps, reconstructed = detect_anomalies_ssim(model, test_images)

    # Thresholding based on Scores
    print(f"Mean Score: {np.mean(scores):.4f}, Max Score: {np.max(scores):.4f}")

In [None]:
def visualize_ssim_results(original, reconstructed, diff_maps, scores, n_samples=5):
    if len(original) == 0: return
    n_samples = min(n_samples, len(original))

    plt.figure(figsize=(15, 6))

    for i in range(n_samples):
        # Orijinal
        plt.subplot(3, n_samples, i + 1)
        plt.imshow(original[i])
        plt.title('Original')
        plt.axis('off')

        # Rekonstrüksiyon
        plt.subplot(3, n_samples, n_samples + i + 1)
        plt.imshow(reconstructed[i])
        plt.title('Reconstructed')
        plt.axis('off')

        # Hata Haritası
        plt.subplot(3, n_samples, 2*n_samples + i + 1)
        plt.imshow(diff_maps[i], cmap='jet', vmin=0, vmax=0.3) # Jet cmap highlights defects
        plt.title(f'Score: {scores[i]:.4f}')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

if 'scores' in locals() and len(test_images) > 0:
    visualize_ssim_results(test_images, reconstructed, diff_maps, scores)

In [None]:
def evaluate_model(model, category=category):
    # Normal örnekler
    normal_path = f'/content/mvtec-ad/{category}/test/good'
    normal_images = load_images(normal_path)
    normal_recon = model.predict(normal_images)
    normal_mse = np.mean(np.square(normal_images - normal_recon), axis=(1,2,3))

    # Anomalili örnekler (tüm defekt tipleri)
    anomaly_mse = []
    defect_types = []

    test_path = Path(f'/content/mvtec-ad/{category}/test')
    for defect_dir in test_path.iterdir():
        if defect_dir.name != 'good' and defect_dir.is_dir():
            anomaly_images = load_images(str(defect_dir))
            anomaly_recon = model.predict(anomaly_images)
            mse = np.mean(np.square(anomaly_images - anomaly_recon), axis=(1,2,3))
            anomaly_mse.extend(mse)
            defect_types.extend([defect_dir.name] * len(mse))

    anomaly_mse = np.array(anomaly_mse)

    # Görselleştirme
    plt.figure(figsize=(12, 6))
    plt.hist(normal_mse, bins=30, alpha=0.7, label='Normal', color='green')
    plt.hist(anomaly_mse, bins=30, alpha=0.7, label='Anomaly', color='red')
    plt.xlabel('MSE')
    plt.ylabel('Frequency')
    plt.legend()
    plt.title('MSE Distribution: Normal vs Anomaly')
    plt.show()

    # İstatistikler
    print(f"Normal MSE: {normal_mse.mean():.6f} ± {normal_mse.std():.6f}")
    print(f"Anomaly MSE: {anomaly_mse.mean():.6f} ± {anomaly_mse.std():.6f}")
    print(f"Separation ratio: {anomaly_mse.mean() / normal_mse.mean():.2f}x")

    return normal_mse, anomaly_mse

normal_scores, anomaly_scores = evaluate_model(model)