# Otokodlayıcılar (Autoencoders)

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms , datasets
import matplotlib.pyplot as plt
import numpy as np
from scipy.ndimage import gaussian_filter


**Problem Tanımı:** Veri boyut küçültme \
Veri : FashionMNIST

In [2]:
transform = transforms.Compose([transforms.ToTensor()]) # Görüntüleri tensore çevir [0-1] aralığında

# Eğitim ve test veri setini indir
train_dataset = datasets.FashionMNIST(root = "FashionMNIST_data" , train = True , transform = transform , download = True )
test_dataset = datasets.FashionMNIST(root = "FashionMNIST_data" , train = False , transform = transform , download = True )

100%|█████████████████████████████████████████████████████████████████████████████| 26.4M/26.4M [00:20<00:00, 1.27MB/s]
100%|██████████████████████████████████████████████████████████████████████████████| 29.5k/29.5k [00:00<00:00, 414kB/s]
100%|█████████████████████████████████████████████████████████████████████████████| 4.42M/4.42M [00:03<00:00, 1.15MB/s]
100%|█████████████████████████████████████████████████████████████████████████████████████| 5.15k/5.15k [00:00<?, ?B/s]


**Eğitim ve test veri seti yükleyicilerini oluşturma**

In [3]:
batch_size = 128 
train_loader = DataLoader(train_dataset , batch_size = batch_size , shuffle = True)
test_loader = DataLoader(test_dataset , batch_size = batch_size , shuffle = False)

**Auto Encoder Geliştirme**

In [16]:
class AutoEncoder(nn.Module):

    def __init__(self):
        super(AutoEncoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Flatten(), # 28 x 28 -> 784 (1D) Vektör
            nn.Linear(28 * 28 , 256), # Tam bağlı katman 784 -> 256
            nn.ReLU(), # Aktivasyon fonk.
            nn.Linear(256 , 64), # Tam bağlı katman 256 -> 64
            nn.ReLU()
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(64,256), # Tam bağlı katman 64 -> 256
            nn.ReLU(), # Aktivasyon Fonk.
            nn.Linear(256 , 28 * 28), # Tam bağlı katman
            nn.Sigmoid(), # Sigmoid 0-1 aralığında tutmak için kullanılır
            nn.Unflatten(1 , (1 , 28 , 28)) # Tek boyutlu çıktıyı tekrar 28 x 28 yapar
        
        )
    def forward(self,x):
        encoded = self.encoder(x) # Giriş verisini kodlar
        decoded = self.decoder(encoded) # Kodlanmış veriyi tekrar görüntüye dönüştürür 
        return decoded

**Callback : Early Stopping**

In [17]:
class EarlyStopping():
    
    def __init__(self, patience = 3 , min_delta = 0.001 ):
        # Kaç epoch boyunca gelişme olmazsa durduracağını belirleyen parametre
        self.patience = patience

        # Kayıptaki min. iyileşme sayısı
        self.min_delta = min_delta

        # En iyi kayıp değerleri
        self.best_loss = None
    
        # Sabit kalan epoch sayısı (counter)
        self.counter = 0
    
    def __call__(self,loss):
        # Gelişme var
        if self.best_loss is None or loss < self.best_loss - self.min_delta:
            self.best_loss = loss
            self.counter = 0 
        # Gelişme yok
        else:
            self.counter += 1

        # Sabit kalan epoch sayısı patience i aşarsa -> Durdur
        if self.counter >= self.patience :
            return True # Training'i durdur
            
        return False
        

**Modelin Eğitimi**

In [18]:
epochs = 50
learning_rate = 1e-3 

# Model , Loss , Optimizer Tanımlama
model = AutoEncoder() # Modeli tanımlama
criterion = nn.MSELoss() # Kayıp Fonksiyonu -> MSE : ortalama kare hata
optimizer = optim.Adam(model.parameters(),lr = learning_rate) # Optimizer
early_stopping = EarlyStopping(patience = 5 , min_delta = 0.001) # Erken durdurma

# Eğitim Fonk.

def training(model,train_loader , optimizer , criterion , early_stopping , epochs):
    model.train() # Modeli Eğitim moduna al

    for epoch in range(epochs):
        total_loss = 0 #Epoch başına toplam kayıt

        for inputs , _ in train_loader:
            optimizer.zero_grad() # Gradyanları sıfırla
            outputs = model(inputs)
            loss = criterion(outputs , inputs) # Gerçek ve tahmini veriler arasındaki kayıp
            loss.backward() # Gradyanları hesaplama
            optimizer.step() # Ağırlıkları güncelle
            total_loss += loss.item() # Her bir epoch için toplam loss değeri

        avg_loss = total_loss / len(train_loader) # Epoch başına ortalama kayıp hesaplama
        print(f"Epoch :{epoch+1}/{epochs} - Loss : {avg_loss:5f} ")

        # Early Stopping
        if early_stopping(avg_loss):
            print(f"Early Stopping at epoch {epoch+1} ")
            break

In [19]:
training(model , train_loader , optimizer , criterion , early_stopping , epochs)

Epoch :1/50 - Loss : 0.033305 
Epoch :2/50 - Loss : 0.017248 
Epoch :3/50 - Loss : 0.014544 
Epoch :4/50 - Loss : 0.013141 
Epoch :5/50 - Loss : 0.012193 
Epoch :6/50 - Loss : 0.011461 
Epoch :7/50 - Loss : 0.010888 
Epoch :8/50 - Loss : 0.010485 
Epoch :9/50 - Loss : 0.010068 
Epoch :10/50 - Loss : 0.009789 
Epoch :11/50 - Loss : 0.009479 
Epoch :12/50 - Loss : 0.009245 
Epoch :13/50 - Loss : 0.008994 
Epoch :14/50 - Loss : 0.008819 
Epoch :15/50 - Loss : 0.008643 
Epoch :16/50 - Loss : 0.008519 
Epoch :17/50 - Loss : 0.008390 
Epoch :18/50 - Loss : 0.008274 
Early Stopping at epoch 18 


**Modelin Test Edilmesi**

In [1]:
def compute_ssim(img1, img2, sigma=1.5):
    """
    iki goruntu arasindaki benzerligi hesaplar
    """
    C1 = (0.01*255)**2 # ssim sabitlerinden bir tanesi
    C2 = (0.03*255)**2 # diger bir sabit
    
    img1 = img1.astype(np.float64)
    img2 = img2.astype(np.float64)
    
    # goruntulerin ortalamalari
    mu1 = gaussian_filter(img1, sigma)
    mu2 = gaussian_filter(img2, sigma)
    
    # 
    mu1_sq = mu1**2 # ilk goruntunun ortalamasinin karesini aldik
    mu2_sq = mu2**2 
    mu1_mu2 = mu1 * mu2
    
    sigma1_sq = gaussian_filter(img1 **2, sigma) - mu1_sq # varyans hesabi
    sigma2_sq = gaussian_filter(img2 **2, sigma) - mu2_sq
    sigma12 = gaussian_filter(img1*img2, sigma) - mu1_mu2 # kovaryans hesabi
    
    # ssim haritasi hesaplama
    ssim_map = ((2*mu1_mu2 + C1) * (2 * sigma12 + C2))/((mu1_sq + mu2_sq + C1)*(sigma1_sq + sigma2_sq + C2))
    
    return ssim_map.mean()

def evaluate(model , test_loader , n_images = 10):
    model.eval() # Modelin değerlendirme moduna al

    with torch.no_grad():
        for batch in test_loader:
            inputs , _ = batch
            outputs = model(inputs) # Modelin çıktılarını üretiyoruz
            break
    inputs = inputs.numpy() # Numpy array'e çevir
    outputs = outputs.numpy() # Numpy array'e çevir

    fig , axes = plt.subplots(2 , n_images , figsize = (n_images , 3)) # Görselleştirme için sublot
    ssim_scores = []

    for i in range(n_images) :
        img1 = np.squeeze(inputs[i]) # Orijinal görüntü
        img2 = np.squeeze(outputs[i]) # Decoded edilen görüntü

        ssim_score = compute_ssim(img1, img2) # ssim skoru yani benzerlik hesapla
        ssim_scores.append(ssim_score) # ssim skorunu listeye ekle
        
        axes[0,i].imshow(img1, cmap = "gray")
        axes[0,i].axis("off")
        axes[1,i].imshow(img2, cmap = "gray")
        axes[1,i].axis("off")
        
    axes[0,0].set_title("Original")
    axes[1,0].set_title("Decoded image")
    plt.show()
    
    avg_ssim = np.mean(ssim_scores)
    print(f"Average SSIM: {avg_ssim}")

In [3]:
# evaluate(model, test_loader, n_images = 10)