In [1]:
import os

In [2]:
import torch 
import torch.nn as nn
import torch.nn.functional as F 
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms as T 
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import hdbscan
import umap
from sklearn.manifold import TSNE

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from torch.distributions import Normal
from sklearn.mixture import GaussianMixture
from tqdm import tqdm

In [4]:
torch.manual_seed(42)
np.random.seed(42)

In [5]:
config = {
    # GLOW model parameter
    'img_size': (128*128),
    'in_channels': 3,
    'hidden_channels': 512,
    'K': 32, 
    'L': 3,
    'coupling_layer':'affine',
    
    # Training parameter
    'batch_size': 64,
    'lr': 1e-4,
    'epochs': 30,
    'weight_decay': 1e-5,
    
    # Clustering parameters
    'n_cluster':3,
    'gmm_covariance_type':'full'
}

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [7]:
data_transform = T.Compose([
    T.Resize(config['img_size']),
    T.RandomAdjustSharpness(0.5),
    T.RandomHorizontalFlip(),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
base_folder = os.path.join("..", "Merged Files")
my_data = []

for i in os.listdir(base_folder):
    img_path = os.path.join(base_folder, i)
    
    with Image.open(img_path) as image:
        image = image.convert("RGB")
        image_T = data_transform(image)
        my_data.append(image_T.unsqueeze(0))
    os.remove(img_path)  # ลบไฟล์ที่เปิด
    
    # เก็บกวาด memory
    del image_T
    gc.collect()

data = torch.cat(my_data, dim=0).to(device)
print(f"Dataset shape: {data.shape}")

RuntimeError: [enforce fail at alloc_cpu.cpp:115] data. DefaultCPUAllocator: not enough memory: you tried to allocate 4294901760 bytes.

In [None]:
dataset = TensorDataset(data)
dataloader = DataLoader(dataset, batch_size=config['batch_size'], shuffle=True)

In [None]:
# ActNorm Layer (Activation Normalization)
class ActNorm(nn.Module):
    def __init__(self, channels):
        super(ActNorm, self).__init__()
        self.loc = nn.Parameter(torch.zeros(1, channels, 1, 1))
        self.scale = nn.Parameter(torch.ones(1, channels, 1, 1))
        self.initialized = False

    def initialize(self, x):
        with torch.no_grad():
            flatten = x.permute(1, 0, 2, 3).contiguous().view(x.shape[1], -1)
            mean = flatten.mean(1).view(1, x.shape[1], 1, 1)
            std = flatten.std(1).view(1, x.shape[1], 1, 1)
            self.loc.data.copy_(-mean)
            self.scale.data.copy_(1 / (std + 1e-6))

    def forward(self, x, reverse=False):
        if not self.initialized:
            self.initialize(x)
            self.initialized = True

        if reverse:
            return (x / self.scale) + self.loc, 0
        else:
            log_abs_det = torch.sum(torch.log(torch.abs(self.scale)), dim=1)
            return (x - self.loc) * self.scale, log_abs_det.sum() * x.size(2) * x.size(3)

In [None]:
# Invertible 1x1 Convolution
class InvConv2d(nn.Module):
    def __init__(self, in_channels):
        super(InvConv2d, self).__init__()
        weight = torch.qr(torch.randn(in_channels, in_channels))[0]
        self.weight = nn.Parameter(weight)

    def forward(self, x, reverse=False):
        b, c, h, w = x.size()

        weight = self.weight

        if not reverse:
            z = F.conv2d(x, weight.view(c, c, 1, 1))
            log_det = torch.slogdet(weight)[1] * h * w
            return z, log_det
        else:
            weight_inv = torch.inverse(weight)
            z = F.conv2d(x, weight_inv.view(c, c, 1, 1))
            return z, 0

In [None]:
# Affine Coupling Layer
class AffineCoupling(nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super(AffineCoupling, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_channels // 2, hidden_channels, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_channels, hidden_channels, 1),
            nn.ReLU(),
            nn.Conv2d(hidden_channels, in_channels, 3, padding=1)
        )

        # Initialize last layer with zeros
        self.net[-1].weight.data.zero_()
        self.net[-1].bias.data.zero_()

    def forward(self, x, reverse=False):
        x_a, x_b = torch.chunk(x, 2, dim=1)

        s_t = self.net(x_a)
        s, t = torch.chunk(s_t, 2, dim=1)
        s = torch.sigmoid(s + 2)  # Add +2 for numerical stability

        if not reverse:
            y_b = x_b * s + t
            log_det = torch.sum(torch.log(s), dim=[1, 2, 3])
        else:
            y_b = (x_b - t) / s
            log_det = -torch.sum(torch.log(s), dim=[1, 2, 3])

        y = torch.cat([x_a, y_b], dim=1)

        return y, log_det