## Entropy-based method to generate CLIP embedding

In [None]:
import os
import numpy as np
from open_clip import create_model_and_transforms, tokenize
import torch
from PIL import Image
from tqdm import tqdm
import json
import re
from statistics import mean, stdev, median
from sklearn.cluster import KMeans

n_classes = 3
# Path to the parent directory containing the TF folders
output_path = f"../ImgData/supernovaRGBa_tf_class3"

# Load the CLIP model
model, preprocess_train, preprocess_val = create_model_and_transforms("ViT-B-32", pretrained="openai")
model.eval()  # Set the model to evaluation mode

top_k = 10 # Number of images to select by entropy

class Entropy(torch.nn.Module):
    """
    Computes the entropy of the input tensor:
    $H(x) = sum_i (p_i log_2(p_i) )$ where $p_i$ is the input tensor.
    """
    def __init__(self,
                 dim=None,
                 keepdim: bool = False,
                 normalize_input: bool = True,
                 normalize_output: bool = True):
        super().__init__()
        self._dim = dim
        self._keepdim = keepdim
        self._normalize_input = normalize_input
        self._normalize_output = normalize_output

    def forward(self, x):
        if self._dim is None:
            dim = tuple(range(len(x.shape)))
        else:
            dim = self._dim

        if self._normalize_input:
            scale = torch.sum(x, dim, keepdim=True)
            p = x / scale
        else:
            p = x

        entropy = -torch.nansum(p * torch.log2(p), dim, keepdim=self._keepdim)

        if self._normalize_output:
            N = np.prod([x.shape[i] for i in dim])
            entropy = entropy / np.log2(N)
        return entropy

class EntropyLosses(torch.nn.Module):
    def __init__(self, opacity_weight=1):
        super().__init__()
        self._opacity_entropy = Entropy(dim=(1, 2), normalize_input=True, normalize_output=True)
        self._opacity_weight = opacity_weight

    def forward(self, x):
        losses = torch.zeros(x.shape[0], dtype=x.dtype, device=x.device)

        if self._opacity_weight > 0:
            opacity = x[:, :, :, 3]
            opacity_entropy = self._opacity_entropy(opacity)
            losses = losses + self._opacity_weight * opacity_entropy

        return losses

# Function to calculate cosine similarity
def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

def extract_frame_number(file_path):
    match = re.search(r'r_(\d+)\.png', file_path)
    return int(match.group(1)) if match else None

# Get the best view out of top k frames
def get_best_frame(top_frames):
    frame_numbers = [extract_frame_number(frame) for frame in top_frames if extract_frame_number(frame) is not None]
    print(f"Top frame numbers: {frame_numbers}")

    # Calculate mean and standard deviation
    mean_frame = mean(frame_numbers)
    std_dev = stdev(frame_numbers)

    # Exclude outliers (those beyond 1 standard deviations from the mean)
    filtered_frames = [frame for frame in frame_numbers if abs(frame - mean_frame) <= 1 * std_dev]
    print(f"Filtered frame numbers: {filtered_frames}")

    # Calculate the average of the filtered frame numbers
    average_frame_number = int(mean(filtered_frames))
    return average_frame_number, filtered_frames

# Compute entropy for all frames and select the top 10 by entropy
entropy_loss = EntropyLosses()

for folder_name in os.listdir(output_path):
    if folder_name.startswith("TF"):
        folder_path = os.path.join(output_path, folder_name, "train")
        if not os.path.isdir(folder_path):
            continue

        print(f"Processing folder: {folder_name}")
        image_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(".png")]

        if len(image_files) < top_k:
            raise ValueError(f"Not enough images in {folder_path} to select {top_k}. Skipping.")

        entropy_scores = []
        for img_path in tqdm(image_files):
            img = Image.open(img_path).convert("RGBA")
            img_tensor = torch.from_numpy(np.array(img)).float().unsqueeze(0) / 255.0

            with torch.no_grad():
                entropy = entropy_loss(img_tensor).item()
            entropy_scores.append((img_path, entropy))

        # Sort by entropy and select top k frames
        entropy_scores.sort(key=lambda x: x[1], reverse=True)
        top_frames = [path for path, _ in entropy_scores[:top_k]]
        best_frame_number, filtered_frame_number = get_best_frame(top_frames)
        print(f"Best frame number: {best_frame_number}")
        with open(os.path.join(output_path, folder_name, "best_frames.txt"), "w") as f:
            f.write(f"{best_frame_number}\n")
            f.write("\n".join([str(frame) for frame in filtered_frame_number if frame != best_frame_number]))   
        best_frame_path = os.path.join(folder_path, f"r_{best_frame_number:04}.png")
        filtered_frames = [os.path.join(folder_path, f"r_{frame:04}.png") for frame in filtered_frame_number]

        # Save unfilted and filtered embeddings
        embeddings = []
        for img_path in top_frames:
            img = Image.open(img_path).convert("RGB")
            img_tensor = preprocess_train(img).unsqueeze(0)

            with torch.no_grad():
                embedding = model.encode_image(img_tensor).squeeze(0).numpy()
                embeddings.append(embedding)

        combined_embedding = np.mean(embeddings, axis=0)
        embedding_path = os.path.join(output_path, folder_name, "image_embedding_entropy.npy")
        np.save(embedding_path, combined_embedding)

        embeddings = []
        for img_path in filtered_frames:
            img = Image.open(img_path).convert("RGB")
            img_tensor = preprocess_train(img).unsqueeze(0)

            with torch.no_grad():
                embedding = model.encode_image(img_tensor).squeeze(0).numpy()
                embeddings.append(embedding)

        combined_embedding = np.mean(embeddings, axis=0)
        embedding_path = os.path.join(output_path, folder_name, "image_filtered_embedding_entropy.npy")
        np.save(embedding_path, combined_embedding)


        img = Image.open(best_frame_path).convert("RGB")
        img_tensor = preprocess_train(img).unsqueeze(0)
        with torch.no_grad():
            top1_embedding = model.encode_image(img_tensor).squeeze(0).numpy()
        embedding_path = os.path.join(output_path, folder_name, "image_top1_embedding_entropy.npy")
        np.save(embedding_path, top1_embedding)
        print(f"Saved embedding to {embedding_path}")

print("Processing complete.")

## Combine text embedding with image embedding

In [None]:
import os
import random
import numpy as np
from PIL import Image
from open_clip import create_model_and_transforms, tokenize
import torch

# Path to the parent directory containing the TF folders
output_path = f"../ImgData/supernovaRGBa_tf_class3"
top_k = 10  # Number of images to select randomly from each folder
image_embedding_name = "image_embedding_entropy.npy" # Image embedding file name used to combine with text embedding

# TF to class mapping
tf_to_class = {
    "TF00": "expanding shockwave",
    "TF01": "turbulent plasma",
    "TF02": "dense core ejecta"
}


# Load the CLIP model
model, preprocess_train, preprocess_val = create_model_and_transforms("ViT-B-32", pretrained="openai")
model.eval()  # Set the model to evaluation mode

def get_text_embedding(text):
    """Generates a CLIP embedding for the given text."""
    tokenized_text = tokenize([text])
    with torch.no_grad():
        text_embedding = model.encode_text(tokenized_text).squeeze(0).numpy()
    return text_embedding

def process_folder(folder_path, class_text, image_embedding_name=None):
    """Processes a folder to generate a combined CLIP embedding for images and text."""
    if image_embedding_name is None:
        image_files = [f for f in os.listdir(folder_path) if f.endswith(".png")]
        if len(image_files) < top_k:
            print(f"Not enough images in {folder_path} to select {top_k}. Skipping.")
            return None

        # Select random images
        selected_images = random.sample(image_files, top_k)
        image_embeddings = []

        for img_file in selected_images:
            img_path = os.path.join(folder_path, img_file)
            img = Image.open(img_path).convert("RGB")
            img_tensor = preprocess_train(img).unsqueeze(0)  # Preprocess and add batch dimension

            with torch.no_grad():
                embedding = model.encode_image(img_tensor).squeeze(0).numpy()
                image_embeddings.append(embedding)

        # Average the image embeddings
        combined_image_embedding = np.mean(image_embeddings, axis=0)
    else:
        # Load the image embedding
        image_embedding_path = os.path.join(os.path.dirname(folder_path), image_embedding_name)
        combined_image_embedding = np.load(image_embedding_path)

    # Generate the text embedding
    text_embedding = get_text_embedding(class_text)

    # Combine image and text embeddings with equal weights
    combined_embedding = (combined_image_embedding + text_embedding) / 2
    return combined_embedding

# Process each folder starting with "TF"
for folder_name in os.listdir(output_path):
    if folder_name.startswith("TF"):
        folder_path = os.path.join(output_path, folder_name, "train")
        if not os.path.isdir(folder_path):
            continue

        class_text = tf_to_class.get(folder_name, "")
        print(f"Processing folder: {folder_name}, Class text: {class_text}")
        combined_embedding = process_folder(folder_path, class_text, image_embedding_name)

        if combined_embedding is not None:
            # Save the combined embedding as a .npy file in the "TF" folder
            embedding_path = os.path.join(output_path, folder_name, f"{image_embedding_name.split('.')[0]}_plus_text.npy")
            np.save(embedding_path, combined_embedding)
            print(f"Saved combined embedding to {embedding_path}")

print("Processing complete.")

In [None]:
tf_to_class = {
    "TF00": "expanding shockwave (green), illustrates lower-density, cooler material expanding outward at extremely high velocities, representing the supernova's shockwave interacting with surrounding interstellar gas.",
    "TF01": "turbulent plasma (blue), represent intermediate-density gases, likely consisting of heated stellar material, freshly fused elements created by the supernova itself, and plasma undergoing turbulent mixing.",
    "TF02": "dense core ejecta (red), indicate dense, hotter core regions and heavier elements synthesized during the explosion. These areas are marked by intricate, filamentary structures caused by powerful shockwaves and fluid instabilities."
}
# Extract dataset name
dataset_name = os.path.basename(output_path).split("RGBa")[0]

# Write information to a .txt file
output_file = os.path.join(output_path, "dataset_info.txt")
with open(output_file, "w") as f:
    f.write(f"Dataset Name: {dataset_name}\n This is a dataset of a supernova, which is an enormous stellar explosion that occurs at the end of a massive star's lifecycle, briefly outshining entire galaxies.\n")
    f.write("Classes:\n")
    for key, value in tf_to_class.items():
        f.write(f"{key}: {value}\n")

print(f"Dataset information written to {output_file}")