In [None]:
import torch
import h5py

In [None]:
import os
import zipfile

# URL for the dataset
url = "https://zenodo.org/records/15732622/files/datasetTrident_univ2.zip?download=1"

# Download the file using wget
!wget -O /content/Train.zip "$url"

# Define the extraction path
extract_path = './'

# Create the extraction directory if it doesn't exist
os.makedirs(extract_path, exist_ok=True)

# Extract the ZIP file
with zipfile.ZipFile('./Train.zip', 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# List the contents of the extracted folder
extracted_files = os.listdir(extract_path)
print("Extracted files:", extracted_files)


--2025-06-26 15:06:03--  https://zenodo.org/records/15732622/files/datasetTrident_univ2.zip?download=1
Resolving zenodo.org (zenodo.org)... 188.185.48.194, 188.185.43.25, 188.185.45.92, ...
Connecting to zenodo.org (zenodo.org)|188.185.48.194|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1469096082 (1.4G) [application/octet-stream]
Saving to: ‘/content/Train.zip’


2025-06-26 15:07:18 (18.9 MB/s) - ‘/content/Train.zip’ saved [1469096082/1469096082]

Extracted files: ['.config', 'trident_processed_univ2', 'Train.zip', 'sample_data']


In [None]:

import numpy as np
from sklearn.neighbors import NearestNeighbors
from typing import Tuple, List, Union

class WSIFeatureExtrapolation:
    """
    Implementazione dell'extrapolazione per feature estratte da WSI.

    Uso:
        extrapolator = WSIFeatureExtrapolation()
        augmented_features, indices = extrapolator.generate_extrapolated_features(
            your_wsi_features,
            lambda_values=[0.3, 0.5],
            n_augmentations_per_patch=2
        )
    """

    def __init__(self, n_neighbors: int = 8):
        self.n_neighbors = n_neighbors

    def extrapolate_patch(self, c_j: np.ndarray, c_k: np.ndarray, lambda_param: float) -> np.ndarray:
        """Formula: c'_j = (c_j - c_k) * λ + c_j"""
        return (c_j - c_k) * lambda_param + c_j

    def generate_extrapolated_features(self,
                                     features: np.ndarray,
                                     lambda_values: Union[float, List[float]] = 0.5,
                                     n_augmentations_per_patch: int = 2) -> Tuple[np.ndarray, np.ndarray]:
        """
        Genera feature extrapolate.

        Args:
            features: Array (n_patches, n_features) delle feature originali
            lambda_values: Valori del parametro λ per l'extrapolazione
            n_augmentations_per_patch: Numero di augmentazioni per patch

        Returns:
            (extrapolated_features, original_indices)
        """
        if isinstance(lambda_values, (int, float)):
            lambda_values = [lambda_values]

        n_patches, n_features = features.shape

        # Setup nearest neighbors
        nn_model = NearestNeighbors(n_neighbors=min(self.n_neighbors, n_patches))
        nn_model.fit(features)

        # Calcola dimensioni output
        total_augmentations = n_patches * n_augmentations_per_patch * len(lambda_values)

        # Pre-alloca arrays per efficienza
        extrapolated = np.zeros((total_augmentations, n_features), dtype=np.float32)
        original_indices = np.zeros(total_augmentations, dtype=np.int32)

        # Trova neighbors per tutte le patch
        distances, neighbor_indices = nn_model.kneighbors(features)

        augmentation_idx = 0

        for patch_idx in range(n_patches):
            c_j = features[patch_idx]

            # Neighbors escludendo se stesso
            neighbors = neighbor_indices[patch_idx]
            if neighbors[0] == patch_idx:
                available_neighbors = neighbors[1:]
            else:
                available_neighbors = neighbors

            for aug_count in range(n_augmentations_per_patch):
                if len(available_neighbors) > 0:
                    neighbor_idx = np.random.choice(available_neighbors)
                    c_k = features[neighbor_idx]

                    for lambda_val in lambda_values:
                        extrapolated[augmentation_idx] = self.extrapolate_patch(c_j, c_k, lambda_val)
                        original_indices[augmentation_idx] = patch_idx
                        augmentation_idx += 1

        return extrapolated[:augmentation_idx], original_indices[:augmentation_idx]

In [None]:
extrapolator = WSIFeatureExtrapolation()

In [None]:
# ESEMPIO D'USO per il tuo caso specifico:
file_h5 = "./trident_processed_univ2/B/20x_256px_0px_overlap/features_uni_v2/M-104.h5"

with h5py.File(file_h5, 'r') as f:
    features = torch.from_numpy(f['features'][:]).float()
    num_patch = features.shape[0]  # features è il tuo array numpy o h5py
    labels = np.zeros(num_patch, dtype=np.int64)  # oppure np.int32
    labels = torch.from_numpy(labels)

wsi_features = features  # shape: (n_patches, 1024)

# # Applica extrapolazione
extrapolated_features, indices = extrapolator.generate_extrapolated_features(
     wsi_features,
     lambda_values=[0.3],  # Valori conservativi
     n_augmentations_per_patch=1
)
#
# # Combina originali e augmentate per il training
extrapolated_labels = torch.from_numpy(np.zeros(extrapolated_features.shape[0], dtype=np.int64))


  extrapolated[augmentation_idx] = self.extrapolate_patch(c_j, c_k, lambda_val)


In [None]:
!mkdir datasetUNIV2Extrapolation


In [None]:
folder_h5 = "./trident_processed_univ2/B/20x_256px_0px_overlap/features_uni_v2/"
folder_dest = "./datasetUNIV2Extrapolation/"
extrapolator = WSIFeatureExtrapolation()
B = []

results = {}

for fname in os.listdir(folder_h5):
    if not fname.endswith('.h5'):
        continue
    file_h5 = os.path.join(folder_h5, fname)
    with h5py.File(file_h5, 'r') as f:
        features_np = f['features'][:]
    features = torch.from_numpy(features_np).float()
    num_patch = features.shape[0]
    labels_np = np.zeros(num_patch, dtype=np.int64)
    labels = torch.from_numpy(labels_np)

    extrapolated_features_np, indices = extrapolator.generate_extrapolated_features(
        features_np,
        lambda_values=[0.3],
        n_augmentations_per_patch=1
    )

    extrapolated_labels_np = np.zeros(extrapolated_features_np.shape[0], dtype=np.int64)
    extrapolated_labels = torch.from_numpy(extrapolated_labels_np)
    extrapolated_features = torch.from_numpy(extrapolated_features_np).float()

    fname = fname.split('.')[0]
    torch.save(features, folder_dest + fname + ".pt")
    torch.save(extrapolated_features, folder_dest + fname + "_ext.pt")
    B.append(fname)
    B.append(fname + "_ext")


In [None]:
folder_h5 = "./trident_processed_univ2/E/20x_256px_0px_overlap/features_uni_v2/"
folder_dest = "./datasetUNIV2Extrapolation/"
extrapolator = WSIFeatureExtrapolation()

E = []

results = {}

for fname in os.listdir(folder_h5):
    if not fname.endswith('.h5'):
        continue
    file_h5 = os.path.join(folder_h5, fname)
    with h5py.File(file_h5, 'r') as f:
        features_np = f['features'][:]
    features = torch.from_numpy(features_np).float()
    num_patch = features.shape[0]
    labels_np = np.zeros(num_patch, dtype=np.int64)
    labels = torch.from_numpy(labels_np)

    extrapolated_features_np, indices = extrapolator.generate_extrapolated_features(
        features_np,
        lambda_values=[0.3],
        n_augmentations_per_patch=1
    )

    extrapolated_labels_np = np.zeros(extrapolated_features_np.shape[0], dtype=np.int64)
    extrapolated_labels = torch.from_numpy(extrapolated_labels_np)
    extrapolated_features = torch.from_numpy(extrapolated_features_np).float()

    fname = fname.split('.')[0]
    torch.save(features, folder_dest + fname + ".pt")
    torch.save(extrapolated_features, folder_dest + fname + "_ext.pt")

    E.append(fname)
    E.append(fname + "_ext")


In [None]:
folder_h5 = "./trident_processed_univ2/S/20x_256px_0px_overlap/features_uni_v2/"
folder_dest = "./datasetUNIV2Extrapolation/"
extrapolator = WSIFeatureExtrapolation()
S = []
results = {}

for fname in os.listdir(folder_h5):
    if not fname.endswith('.h5'):
        continue
    file_h5 = os.path.join(folder_h5, fname)
    with h5py.File(file_h5, 'r') as f:
        features_np = f['features'][:]
    features = torch.from_numpy(features_np).float()
    num_patch = features.shape[0]
    labels_np = np.zeros(num_patch, dtype=np.int64)
    labels = torch.from_numpy(labels_np)

    extrapolated_features_np, indices = extrapolator.generate_extrapolated_features(
        features_np,
        lambda_values=[0.3],
        n_augmentations_per_patch=1
    )

    extrapolated_labels_np = np.zeros(extrapolated_features_np.shape[0], dtype=np.int64)
    extrapolated_labels = torch.from_numpy(extrapolated_labels_np)
    extrapolated_features = torch.from_numpy(extrapolated_features_np).float()

    fname = fname.split('.')[0]
    torch.save(features, folder_dest + fname + ".pt")
    torch.save(extrapolated_features, folder_dest + fname + "_ext_01.pt")


    extrapolated_features_np, indices = extrapolator.generate_extrapolated_features(
        features_np,
        lambda_values=[0.5],
        n_augmentations_per_patch=1
    )

    extrapolated_labels_np = np.zeros(extrapolated_features_np.shape[0], dtype=np.int64)
    extrapolated_labels = torch.from_numpy(extrapolated_labels_np)
    extrapolated_features = torch.from_numpy(extrapolated_features_np).float()

    torch.save(extrapolated_features, folder_dest + fname + "_ext_02.pt")
    S.append(fname)
    S.append(fname + "_ext_01")
    S.append(fname + "_ext_02")



In [None]:
print(B)

['M-30', 'M-30_ext', 'M-32', 'M-32_ext', 'M-108', 'M-108_ext', 'M-112', 'M-112_ext', 'M-121', 'M-121_ext', 'M-105', 'M-105_ext', 'M-24', 'M-24_ext', 'M-104', 'M-104_ext']


In [None]:
dataset_csv = []
for i in range(len(B)):
    v = B[i].split('.')[0]
    dataset_csv.append([v,v,'B'])
for i in range(len(S)):
    v = S[i].split('.')[0]
    dataset_csv.append([v,v,'S'])
for i in range(len(E)):
    v = E[i].split('.')[0]
    dataset_csv.append([v,v,'E'])

In [None]:
import csv
# Nome del file CSV
file_csv = folder_dest + 'datasetComposition.csv'

# Scrittura del file CSV
with open(file_csv, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Scrivere l'intestazione
    writer.writerow(['case_id', 'slide_id', 'label'])
    # Scrivere i dati
    writer.writerows(dataset_csv)

print(f"File CSV salvato: {file_csv}")

File CSV salvato: ./datasetUNIV2Extrapolation/datasetComposition.csv


In [None]:
# Crea un archivio ZIP della cartella
def create_zip_archive(folder_path, zip_name=None):
    """
    Crea un archivio ZIP di una cartella mantenendo la struttura
    """
    if zip_name is None:
        zip_name = f"{os.path.basename(folder_path)}.zip"

    print(f"🗜️ Creando archivio ZIP: {zip_name}")

    with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                # Mantiene la struttura delle cartelle nell'archivio
                arc_name = os.path.relpath(file_path, os.path.dirname(folder_path))
                zipf.write(file_path, arc_name)
                print(f"  📁 Aggiunto: {arc_name}")

    # Mostra dimensione dell'archivio
    zip_size = os.path.getsize(zip_name)
    print(f"✅ Archivio creato: {zip_name} ({zip_size/1024/1024:.2f} MB)")
    return zip_name

In [None]:
# Percorso della cartella da caricare
zip_filename = 'datasetCompleted.zip'  # Nome dell'archivio
zip_path = create_zip_archive(folder_dest, zip_filename)


🗜️ Creando archivio ZIP: datasetCompleted.zip
  📁 Aggiunto: M-86_ext_01.pt
  📁 Aggiunto: M-65_ext_01.pt
  📁 Aggiunto: M-11.pt
  📁 Aggiunto: M-109.pt
  📁 Aggiunto: M-87_ext_01.pt
  📁 Aggiunto: M-103_ext.pt
  📁 Aggiunto: M-108.pt
  📁 Aggiunto: M-10_ext.pt
  📁 Aggiunto: M-105_ext.pt
  📁 Aggiunto: M-110_ext.pt
  📁 Aggiunto: M-101_ext_02.pt
  📁 Aggiunto: M-30.pt
  📁 Aggiunto: datasetComposition.csv
  📁 Aggiunto: M-87_ext_02.pt
  📁 Aggiunto: M-113.pt
  📁 Aggiunto: M-114_ext_01.pt
  📁 Aggiunto: M-104_ext.pt
  📁 Aggiunto: M-114.pt
  📁 Aggiunto: M-32_ext.pt
  📁 Aggiunto: M-110.pt
  📁 Aggiunto: M-100_ext.pt
  📁 Aggiunto: M-112_ext.pt
  📁 Aggiunto: M-10.pt
  📁 Aggiunto: M-101_ext_01.pt
  📁 Aggiunto: M-121.pt
  📁 Aggiunto: M-87.pt
  📁 Aggiunto: M-24.pt
  📁 Aggiunto: M-108_ext.pt
  📁 Aggiunto: M-103.pt
  📁 Aggiunto: M-109_ext.pt
  📁 Aggiunto: M-86.pt
  📁 Aggiunto: M-114_ext_02.pt
  📁 Aggiunto: M-32.pt
  📁 Aggiunto: M-121_ext.pt
  📁 Aggiunto: M-24_ext.pt
  📁 Aggiunto: M-105.pt
  📁 Aggiunto: M-104.pt

In [None]:
import torch
import requests
import json
import os

# Token di accesso Zenodo (sostituisci con il tuo)
ACCESS_TOKEN = 'uVSb7icJqT9efPM71KYgviJ50r7eML9ynei2q7hDkedVlFrf8fBsr9lFaJ3O'

# Crea una nuova deposizione
def create_deposition(title):
    url = 'https://zenodo.org/api/deposit/depositions'
    headers = {"Content-Type": "application/json"}
    params = {'access_token': ACCESS_TOKEN}

    data = {
        'metadata': {
            'title': title,
            'upload_type': 'dataset',
            'description': 'Dataset WSI project MLiA',
            'creators': [{'name': 'Raf-Tony-Luca'}]
        }
    }

    r = requests.post(url, params=params, data=json.dumps(data), headers=headers)
    return r.json()

# Carica il file
def upload_file(deposition_id, file_path):
    # Get bucket URL
    url = f'https://zenodo.org/api/deposit/depositions/{deposition_id}'
    params = {'access_token': ACCESS_TOKEN}
    r = requests.get(url, params=params)
    bucket_url = r.json()["links"]["bucket"]

    # Upload file
    filename = os.path.basename(file_path)
    with open(file_path, "rb") as fp:
        r = requests.put(f"{bucket_url}/{filename}",
                        data=fp,
                        params=params)
    return r.json()

# Pubblica il dataset
def publish_deposition(deposition_id):
    url = f'https://zenodo.org/api/deposit/depositions/{deposition_id}/actions/publish'
    params = {'access_token': ACCESS_TOKEN}
    r = requests.post(url, params=params)
    return r.json()


In [None]:
# Esegui l'upload
print("Creando deposizione...")
deposition = create_deposition("dataset_trident_univ2_extrapolation")
deposition_id = deposition['id']

print(f"Caricando file... (ID: {deposition_id})")
upload_result = upload_file(deposition_id, zip_filename)

print("Pubblicando dataset...")
publication = publish_deposition(deposition_id)

print(f"Dataset pubblicato! DOI: {publication['doi']}")
print(f"URL: {publication['links']['record_html']}")

Creando deposizione...
Caricando file... (ID: 15747608)
Pubblicando dataset...
Dataset pubblicato! DOI: 10.5281/zenodo.15747608
URL: https://zenodo.org/record/15747608
