version DEC

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

class Autoencoder(nn.Module):
    def __init__(self, input_dim, embedding_dim):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 512), 
            nn.ReLU(),
            nn.Linear(512, 256),       
            nn.ReLU(),
            nn.Linear(256, embedding_dim) 
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(embedding_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, input_dim) 
            #sigmoid ?
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded, encoded

#les dim à revoir !!
F_global_dim = 2048
F_important_dim = 512
F_local_dim = 1024

input_dim = F_global_dim + F_important_dim + F_local_dim # Dimension entrée concaténée
emb_dim = 128  # Dimension de l'espace latent pour le clustering 

autoencoder = Autoencoder(input_dim, emb_dim)

criterion_pretrain = nn.MSELoss() #tester d'autres maybe
optimizer_pretrain = optim.Adam(autoencoder.parameters(), lr=0.001)


data = # le tensor de nos features 

dataset = TensorDataset(data)
dataloader_pretrain = DataLoader(dataset, batch_size=256, shuffle=True)

epochs = 50

for epoch in range(epochs):
    total_loss = 0
    for batch_idx, (data,) in enumerate(dataloader_pretrain):
        optimizer_pretrain.zero_grad()
        reconstructed_data, _ = autoencoder(data)
        loss = criterion_pretrain(reconstructed_data, data)
        loss.backward()
        optimizer_pretrain.step()
        total_loss += loss.item() * data.size(0)
    
    avg_loss = total_loss / len(dataloader_pretrain.dataset)
    print(f"Epoch Pré-entraînement [{epoch+1}/{epochs}], Perte: {avg_loss:.4f}")


In [None]:
import torch.nn.functional as F
from sklearn.cluster import KMeans

class DEC(nn.Module):
    def __init__(self, input_dim, embedding_dim, num_clusters, pretrained_ae=None):
        super(DEC, self).__init__()
        self.encoder = pretrained_ae.encoder if pretrained_ae else Autoencoder(input_dim, embedding_dim).encoder
        
        self.num_clusters = num_clusters
        self.embedding_dim = embedding_dim
        self.cluster_layer = nn.Parameter(torch.Tensor(num_clusters, embedding_dim))
    
        
    def forward(self, x):
        embeddings = self.encoder(x)
        #on utilise la distrib student t distrib 
        q = 1.0 / (1.0 + torch.sum(torch.pow(embeddings.unsqueeze(1) - self.cluster_layer, 2), 2) / 1.0)
        q = q.pow((1.0 + 1.0) / 2.0)
        q = (q.T / torch.sum(q, 1)).T 
        
        return embeddings, q

def target_distribution(q):
    weight = q**2 / torch.sum(q, 0)
    return (weight.T / torch.sum(weight, 1)).T

k = 10 # jsp lol faut tester

dec_model = DEC(input_dim, emb_dim, k, pretrained_ae=autoencoder)

autoencoder.eval() 
all_embeddings_pretrain = []
with torch.no_grad():
    for batch_idx, (data,) in enumerate(dataloader_pretrain):
        _, encoded = autoencoder(data)
        all_embeddings_pretrain.append(encoded.cpu().numpy())
all_embeddings_pretrain = np.concatenate(all_embeddings_pretrain, axis=0)

kmeans = KMeans(n_clusters=k, n_init=20, random_state=0)
kmeans.fit(all_embeddings_pretrain)
initial_cluster_centroids = torch.tensor(kmeans.cluster_centers_, dtype=torch.float)

dec_model.cluster_layer.data = initial_cluster_centroids

optimizer_dec = optim.Adam(dec_model.parameters(), lr=0.001)
dataloader_dec = DataLoader(dataset, batch_size=256, shuffle=True)

num_epochs_dec = 100 # Plus d'epochs pour l'affinement
update_interval = 140 
previous_cluster_assignments = None

for epoch in range(num_epochs_dec):
    total_loss_dec = 0
    num_batches = 0
    
    if epoch % 1 == 0: 
        dec_model.eval() 
        all_q = []
        with torch.no_grad():
            for batch_idx, (data,) in enumerate(dataloader_dec):
                _, q_batch = dec_model(data)
                all_q.append(q_batch.cpu())
        all_q = torch.cat(all_q, dim=0)
        p_target = target_distribution(all_q).to(data.device) 
        current_cluster_assignments = torch.argmax(all_q, dim=1).numpy()
        if previous_cluster_assignments is not None:
            n_changes = np.sum(current_cluster_assignments != previous_cluster_assignments)
            print(f"Epoch {epoch}: {n_changes} assignations ont changé.")
            if n_changes < 0.001 * len(dataset): 
                break
        previous_cluster_assignments = current_cluster_assignments
        dec_model.train() 

    for batch_idx, (data,) in enumerate(dataloader_dec):
        optimizer_dec.zero_grad()
        
        _, q_batch = dec_model(data)

        if len(dataloader_dec.dataset) < 10000: 
            loss_kl = F.kl_div(q_batch.log(), p_target[batch_idx*data.size(0):(batch_idx+1)*data.size(0)], reduction='batchmean')
        else:
            loss_kl = F.kl_div(q_batch.log(), target_distribution(q_batch), reduction='batchmean')


        loss_kl.backward()
        optimizer_dec.step()
        total_loss_dec += loss_kl.item() * data.size(0)
        num_batches += 1
        
    avg_loss_dec = total_loss_dec / len(dataloader_dec.dataset)
    print(f"Epoch DEC [{epoch+1}/{num_epochs_dec}], Perte KL: {avg_loss_dec:.4f}")

dec_model.eval()
final_embeddings_dec = []
final_cluster_assignments = []
with torch.no_grad():
    for batch_idx, (data,) in enumerate(dataloader_dec):
        embeddings, q_batch = dec_model(data)
        final_embeddings_dec.append(embeddings.cpu().numpy())
        final_cluster_assignments.append(torch.argmax(q_batch, dim=1).cpu().numpy())

final_embeddings_dec = np.concatenate(final_embeddings_dec, axis=0)
final_cluster_assignments = np.concatenate(final_cluster_assignments, axis=0)

print(f"Exemple {final_cluster_assignments[:20]}")

VERsion GMM : c'est plus simple mais en fait c'est peut etre moins adapté 