# Diffusion generate

In [None]:
!sh scripts/exps/expand_diff.sh

# Cross Validation

In [None]:
!python classification/classificationAtExpanedM_Rock.py

In [None]:
!python classification/classificationAtM_Rock.py 

# FID

In [None]:
!python fid/fid.py

# VAE fitting

In [None]:
from diffusers import AutoencoderKL
vae = AutoencoderKL.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="vae")
import torch
from torch import optim
from torch.utils.data import DataLoader
train_loader = DataLoader(dataset, batch_size=8, shuffle=True)
optimizer = optim.Adam(vae.parameters(), lr=1e-5)
vae.train()
for epoch in range(num_epochs):
    for batch in train_loader:
        images = batch['image'].to(device)
        latents = vae.encode(images).latent_dist.sample()
        recon_images = vae.decode(latents).sample()
        recon_loss = torch.nn.functional.mse_loss(recon_images, images)
        kl_loss = -0.5 * torch.sum(1 + vae.logvar - vae.mean.pow(2) - vae.logvar.exp())
        loss = recon_loss + kl_weight * kl_loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}")
vae.save_pretrained("path/to/your_custom_vae")
from diffusers import StableDiffusionPipeline
pipeline = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4", vae="path/to/your_custom_vae")


# classfication valid

In [None]:
import os
import random
import shutil
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Subset
from sklearn.metrics import classification_report
import pandas as pd
from sklearn.model_selection import KFold

# Define paths
# data_dir = "data/m_rock/train"
data_dir = "data/rock_minerals/train"
save_dir = "figure"
os.makedirs(save_dir, exist_ok=True)

# Hyperparameters
batch_size = 64
epochs = 20
learning_rate = 0.001

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Dataset
train_dataset = datasets.ImageFolder(data_dir, transform=transform)

# KFold cross-validation setup
kf = KFold(n_splits=3, shuffle=True, random_state=42)

# Model definitions
models_dict = {
    "ResNet50": models.resnet50,
    "ResNeXt-50": models.resnext50_32x4d,
    "WideResNet-50": models.wide_resnet50_2,
    "MobileNetv2": models.mobilenet_v2
}

# Results storage
results = []

# Training and evaluation function
def train_and_evaluate(model_name, model, train_loader, test_loader):
    print(f"Training {model_name}...")
    
    # Initialize model, loss, and optimizer
    model = model(num_classes=len(train_dataset.classes))  # Correct this line
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}")

    # Evaluation
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # Classification report
    report = classification_report(all_labels, all_preds, target_names=train_dataset.classes, output_dict=True)
    accuracy = report['accuracy']
    recall = np.mean([v['recall'] for k, v in report.items() if k not in ('accuracy', 'macro avg', 'weighted avg')])
    f1 = np.mean([v['f1-score'] for k, v in report.items() if k not in ('accuracy', 'macro avg', 'weighted avg')])

    print(f"{model_name} - Accuracy: {accuracy:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

    return accuracy, recall, f1

# Cross-validation loop
for model_name, model_fn in models_dict.items():
    fold_results = []

    # Perform KFold cross-validation
    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        print(f"\nFold {fold+1} - Model: {model_name}")

        # Create data loaders for the current fold
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

        # Train and evaluate the model
        accuracy, recall, f1 = train_and_evaluate(model_name, model_fn, train_loader, val_loader)
        fold_results.append({
            'Fold': fold + 1,
            'Accuracy': accuracy,
            'Recall': recall,
            'F1-Score': f1
        })

    # Calculate average results for the model across all folds
    avg_accuracy = np.mean([result['Accuracy'] for result in fold_results])
    avg_recall = np.mean([result['Recall'] for result in fold_results])
    avg_f1 = np.mean([result['F1-Score'] for result in fold_results])

    print(f"\n{model_name} - Average Accuracy: {avg_accuracy:.4f}, Average Recall: {avg_recall:.4f}, Average F1-score: {avg_f1:.4f}")

    # Save fold results to the overall results list
    for result in fold_results:
        results.append({
            'Model': model_name,
            'Fold': result['Fold'],
            'Accuracy': result['Accuracy'],
            'Recall': result['Recall'],
            'F1-Score': result['F1-Score']
        })

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(save_dir, "results_fold_data2_org.csv"), index=False)

print("Training and evaluation completed. Results saved.")


In [None]:
import os
import random
import shutil
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Subset
from sklearn.metrics import classification_report
import pandas as pd
from sklearn.model_selection import KFold

# Define paths
# data_dir = "data/m_rock_expansion/save/distdiff_batch_3x"
data_dir = "data/rock_minerals_expansion/save/distdiff_batch_3x(0.2)"
save_dir = "figure"
os.makedirs(save_dir, exist_ok=True)

# Hyperparameters
batch_size = 64
epochs = 20
learning_rate = 0.001

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Dataset
train_dataset = datasets.ImageFolder(data_dir, transform=transform)

# KFold cross-validation setup
kf = KFold(n_splits=3, shuffle=True, random_state=42)

# Model definitions
models_dict = {
    "ResNet50": models.resnet50,
    "ResNeXt-50": models.resnext50_32x4d,
    "WideResNet-50": models.wide_resnet50_2,
    "MobileNetv2": models.mobilenet_v2
}

# Results storage
results = []

# Training and evaluation function
def train_and_evaluate(model_name, model, train_loader, test_loader):
    print(f"Training {model_name}...")
    
    # Initialize model, loss, and optimizer
    model = model(num_classes=len(train_dataset.classes))  # Correct this line
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}")

    # Evaluation
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # Classification report
    report = classification_report(all_labels, all_preds, target_names=train_dataset.classes, output_dict=True)
    accuracy = report['accuracy']
    recall = np.mean([v['recall'] for k, v in report.items() if k not in ('accuracy', 'macro avg', 'weighted avg')])
    f1 = np.mean([v['f1-score'] for k, v in report.items() if k not in ('accuracy', 'macro avg', 'weighted avg')])

    print(f"{model_name} - Accuracy: {accuracy:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

    return accuracy, recall, f1

# Cross-validation loop
for model_name, model_fn in models_dict.items():
    fold_results = []

    # Perform KFold cross-validation
    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        print(f"\nFold {fold+1} - Model: {model_name}")

        # Create data loaders for the current fold
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

        # Train and evaluate the model
        accuracy, recall, f1 = train_and_evaluate(model_name, model_fn, train_loader, val_loader)
        fold_results.append({
            'Fold': fold + 1,
            'Accuracy': accuracy,
            'Recall': recall,
            'F1-Score': f1
        })

    # Calculate average results for the model across all folds
    avg_accuracy = np.mean([result['Accuracy'] for result in fold_results])
    avg_recall = np.mean([result['Recall'] for result in fold_results])
    avg_f1 = np.mean([result['F1-Score'] for result in fold_results])

    print(f"\n{model_name} - Average Accuracy: {avg_accuracy:.4f}, Average Recall: {avg_recall:.4f}, Average F1-score: {avg_f1:.4f}")

    # Save fold results to the overall results list
    for result in fold_results:
        results.append({
            'Model': model_name,
            'Fold': result['Fold'],
            'Accuracy': result['Accuracy'],
            'Recall': result['Recall'],
            'F1-Score': result['F1-Score']
        })

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(save_dir, "results_fold_data2_exp.csv"), index=False)

print("Training and evaluation completed. Results saved.")
