In [1]:
import numpy as np
import torch
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, log_loss
import matplotlib.pyplot as plt

In [None]:
def find_best_split(s_entropy, 
                    plot=True, 
                    label="",
                    n_splits=100):
    # find the best split threshold for the semantic entropy

    splits = np.linspace(1e-10, s_entropy.max(), n_splits)
    split_mses = []
    
    for split in splits:
        low_idxs, high_idxs = s_entropy < split, s_entropy >= split
        
        if not any(low_idxs) or not any(high_idxs):
            split_mses.append(float('inf'))
            continue
            
        low_mean = np.mean(s_entropy[low_idxs])
        high_mean = np.mean(s_entropy[high_idxs])
        
        mse = np.sum((s_entropy[low_idxs] - low_mean)**2) + np.sum((s_entropy[high_idxs] - high_mean)**2)
        split_mses.append(mse)
    
    split_mses = np.array(split_mses)
    best_split = splits[np.argmin(split_mses)]
    
    if plot:
        plt.plot(splits, split_mses, label=label)
        plt.xlabel('Split Thresholds')
        plt.ylabel('Mean Squared Error')
        plt.title('MSE vs Split Threshold')
        if label:
            plt.legend()
    
    return best_split

def binarize_entropy(entropy, threshold):
    # binarize the entropy based on threshold
    return (entropy >= threshold).astype(int)

def train_and_evaluate_model(X, y, test_size=0.2, random_state=42):
    # train a logistic regression model to predict binarized entropy along an action dimension and evaluate it
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )
    
    model = LogisticRegression(random_state=random_state)
    model.fit(X_train, y_train)
    
    y_pred_proba = model.predict_proba(X_test)
    test_loss = log_loss(y_test, y_pred_proba)
    test_accuracy = model.score(X_test, y_test)
    test_auroc = roc_auc_score(y_test, y_pred_proba[:, 1])
    
    return model, test_accuracy, test_auroc, test_loss

In [None]:

def train_entropy_predictors(hidden_states_dict, entropies_dict):
    # train entropy predictors for each action dimension
    results = {}
    
    plt.figure(figsize=(15, 5))
    
    for action_dim in hidden_states_dict.keys():
        print(f"\nProcessing action dimension {action_dim}")
        
        X = hidden_states_dict[action_dim]
        raw_entropy = entropies_dict[action_dim]
        
        split = find_best_split(raw_entropy, plot=True, label=f"Action {action_dim}")
        binary_entropy = binarize_entropy(raw_entropy, split)
        
        model, accuracy, auroc, loss = train_and_evaluate_model(X, binary_entropy)
        results[action_dim] = {
            'model': model,
            'split_threshold': split,
            'test_accuracy': accuracy,
            'test_auroc': auroc,
            'test_loss': loss,
            'high_entropy_ratio': np.mean(binary_entropy)
        }
        
        print(f"Action {action_dim} Results:")
        print(f"Test Accuracy: {accuracy:.4f}")
        print(f"Test AUROC: {auroc:.4f}")
        print(f"Test Loss: {loss:.4f}")
        print(f"High Entropy Ratio: {results[action_dim]['high_entropy_ratio']:.4f}")
    
    plt.tight_layout()
    plt.show()
    
    return results

"""
# assumes in dictionaries like:
hidden_states_dict = {
    0: X_0,  # hidden states for action dimension 0
    1: X_1,  # hidden states for action dimension 1
    ...
    6: X_6   # hidden states for action dimension 6
}

entropies_dict = {
    0: y_0,  # semantic entropy or probability values for action dimension 0
    1: y_1,  # semantic entropy or probability values for action dimension 1
    ...
    6: y_6   # semantic entropy or probability values for action dimension 6
}

# train models for all action dimensions
results = train_entropy_predictors(hidden_states_dict, entropies_dict)

# get an action specific trained model and metrics
action_dim = 0
model = results[action_dim]['model']
accuracy = results[action_dim]['test_accuracy']
"""

In [None]:
def get_semantic_entropy(prob_vector, k):
    # essentially an action level approximation of the trajectory level semantic entropy  
    # principlied way of choosing k? probably something to do with dirichlet process...
    def entropy(p):
        return np.sum(-p * np.log(p + 1e-10))
    
    ses = []
    for i in range(k):
        split = prob_vector.shape[0] // (2 ** (i+1))
        clustered_prob_vector = np.mean(prob_vector.reshape(-1, split), axis=1) # get normalized probabilities of belonging to a cluster for each cluster
        ith_semantic_entropy = entropy(clustered_prob_vector) # entropy of the ith level of the hierarchy
        ses.append(ith_semantic_entropy)
    
    return np.mean(np.array(ses))

In [3]:
random_arr = np.random.rand(256)
split = random_arr.shape[0] // (2 ** 2)
np.mean(random_arr.reshape(-1, split), axis=1)

array([0.5270786 , 0.46039868, 0.54184972, 0.51950227])

In [12]:
def entropy(p):
    return -np.sum(p * np.log(p + 1e-10))

i=0
split = random_arr.shape[0] // (2 ** (i+1))
clustered_prob_vector = np.mean(random_arr.reshape(-1, split), axis=1)
print(entropy(clustered_prob_vector))
print(clustered_prob_vector)


0.684693768449274
[0.49373864 0.530676  ]
