In [25]:
import pandas as pd

def load_and_prepare_data(file_path, tokenizer):
    df = pd.read_csv(file_path)
    texts = df['input'].tolist()  # Replace 'input_column_name' with your actual text column name
    # Tokenization to create a dictionary of tensors
    inputs = tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt")
    return inputs


In [65]:
def load_and_prepare_data(file_path, tokenizer, device, fraction=0.005):
    df = pd.read_csv(file_path)
    if not df['input'].dropna().shape[0] == df.shape[0]:
        raise ValueError("Data contains NaN values.")
    
    # Filter out empty strings or possibly problematic texts
    df = df[df['input'].str.strip() != ""]
    if df.empty:
        raise ValueError("All data filtered out after removing empty inputs.")

    df_sampled = df.sample(frac=fraction, random_state=42)
    texts = df_sampled['input'].tolist()
    inputs = tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt")
    inputs = {key: val.to(device) for key, val in inputs.items()}
    
    # Diagnostic to check tokenization results
    if (inputs['input_ids'] == 0).all():
        raise ValueError("Tokenization resulted in all zero input_ids, check tokenizer settings and inputs.")
    
    return inputs


## Reviewed functions:

In [59]:
def normalize(R):
    with torch.no_grad():
        if R.dim() == 1:
            R = R.unsqueeze(0)  # Add a batch dimension if R is 1-dimensional
        mean = R.mean(dim=0)
        R = R - mean
        norms = torch.norm(R, p=2, dim=1, keepdim=True)
        if any(norms == 0):
            print("Zero norms detected, problematic tensor:", R)
            return None  # Return None if norms are zero (avoid division by zero)
        R = R / norms
        if not torch.isfinite(R).all():
            print("Normalization resulted in non-finite values, input tensor:", R)
            return None
    return R


In [60]:
def check_data_integrity(inputs):
    if not torch.isfinite(inputs['input_ids']).all():
        print("Non-finite values found in input_ids:", inputs['input_ids'])
    if 'attention_mask' in inputs and not torch.isfinite(inputs['attention_mask']).all():
        print("Non-finite values found in attention_mask:", inputs['attention_mask'])


In [54]:
def cal_cov(R):
    with torch.no_grad():
        if R is None or R.nelement() == 0:
            return None  # Return None if R is None or empty
        Z = torch.nn.functional.normalize(R, dim=1)
        if not torch.isfinite(Z).all():
            return None  # Check if Z contains non-finite values
        A = torch.matmul(Z.T, Z) / Z.shape[0]
        if not torch.isfinite(A).all():
            return None
    return A


In [49]:
def cal_entropy(A):
    with torch.no_grad():
        if torch.trace(A) == 0:
            raise ValueError("Trace of the matrix A is zero, which can cause division by zero in normalization.")
        normalized_A = A / torch.trace(A)
        try:
            eig_val = torch.svd(normalized_A).S
        except RuntimeError as e:
            print("SVD failed:", e)
            print("Matrix A:", A)
            raise

        eig_val = eig_val[eig_val > 0]  # Remove zero or negative eigenvalues
        entropy = - (eig_val * torch.log(eig_val)).sum().item()
        normalized_entropy = entropy / math.log(A.shape[0])
        normalized_entropy1 = math.exp(entropy) / A.shape[0]
    return normalized_entropy, normalized_entropy1



In [66]:
def calculate_matrix_entropy(inputs, model):
    ls1, ls3 = [], []
    model.eval()
    with torch.no_grad():
        outputs = model(**inputs)
        last_hidden_states = outputs.last_hidden_state

        for hidden_state in last_hidden_states:
            if torch.all(hidden_state == 0):
                print("Skipping zero vector.")
                continue  # Skip this loop iteration if hidden_state is a zero vector
            hidden_state = hidden_state.mean(dim=0, keepdim=True)
            R = normalize(hidden_state)
            if R is None:
                continue  # Skip this batch if normalization failed
            A = cal_cov(R)
            if A is None:
                continue  # Skip this batch if covariance calculation failed
            entropy1, entropy3 = cal_entropy(A)
            ls1.append(entropy1)
            ls3.append(entropy3)

    entropy_avg1 = sum(ls1) / len(ls1) if ls1 else float('nan')
    entropy_avg3 = sum(ls3) / len(ls3) if ls3 else float('nan')
    return entropy_avg1, entropy_avg3




In [68]:
def extract_embeddings(data_loader, model):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for inputs, _ in data_loader:
            inputs = {k: v.to(model.device) for k, v in inputs.items()}  # Ensure inputs are on the same device as model
            output = model(**inputs)
            # Extract embeddings from a specific layer, e.g., last hidden state or any intermediate layer
            embeddings.append(output.last_hidden_state.mean(dim=1).detach().cpu().numpy())
    return np.vstack(embeddings)


In [85]:
def calculate_covariance_and_entropy(embeddings):
    # Ensure embeddings are real numbers
    embeddings = np.real(embeddings)

    # Calculate the covariance matrix and ensure it's symmetric
    covariance_matrix = np.cov(embeddings, rowvar=False)
    covariance_matrix = (covariance_matrix + covariance_matrix.T) / 2

    # Calculate eigenvalues
    eigenvalues = np.linalg.eigvalsh(covariance_matrix)  # Use eigvalsh for Hermitian matrices
    eigenvalues = eigenvalues[eigenvalues > 0]  # Filter out non-positive to avoid log of non-positive numbers

    # Calculate entropy
    entropy = -np.sum(eigenvalues * np.log(eigenvalues))
    normalized_entropy = entropy / np.log(len(eigenvalues))

    return covariance_matrix, entropy, normalized_entropy


In [72]:
from torch.utils.data import Subset, DataLoader
import numpy as np

def create_subset_loader(full_dataset, fraction=0.005):
    # Determine the number of samples to include
    subset_size = int(len(full_dataset) * fraction)
    # Randomly sample indices for the subset
    subset_indices = np.random.choice(len(full_dataset), subset_size, replace=False)
    # Create a subset dataset
    subset_dataset = Subset(full_dataset, subset_indices)
    # Create a DataLoader for the subset dataset
    subset_loader = DataLoader(subset_dataset, batch_size=32, shuffle=True)
    return subset_loader


In [None]:
def main(model, data_loader):
    embeddings = extract_embeddings(data_loader, model)
    covariance_matrix, entropy, normalized_entropy = calculate_covariance_and_entropy(embeddings)
    print(f"Entropy: {entropy}, Normalized Entropy: {normalized_entropy}")
    return entropy, normalized_entropy

# Example usage, assuming you have 'model' and 'data_loader' ready
main(model, data_loader)


In [75]:
from torch.utils.data import Dataset, DataLoader
import torch

class DataFrameDataset(Dataset):
    def __init__(self, dataframe, input_column, output_column=None):
        """
        Args:
            dataframe (DataFrame): Source data
            input_column (str): Column name for the model input data
            output_column (str, optional): Column name for the model output/target data
        """
        self.dataframe = dataframe
        self.input_column = input_column
        self.output_column = output_column

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Fetch the input data using the specified column
        input_data = self.dataframe.iloc[idx][self.input_column]
        
        # Process the input data through tokenizer or any preprocessing here if necessary
        # Example: input_tensor = tokenizer.encode(input_data, ...)
        # For simplicity, assume it's pre-processed or handled elsewhere

        if self.output_column is not None:
            # Fetch the output data if an output column is specified
            output_data = self.dataframe.iloc[idx][self.output_column]
            return input_data, output_data
        
        return input_data

# Example usage
# Assuming 'df' is your pandas DataFrame with columns 'input' for inputs and 'output' for expected outputs
df = pd.read_csv('/Users/thomasbush/langmodel/direct_data.csv')
full_dataset = DataFrameDataset(df, 'input', 'output')


In [78]:
import torch

def extract_embeddings(data_loader, model):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for data in data_loader:
            # Check if data is a tuple (input, output) and extract input part
            if isinstance(data, tuple):
                inputs = data[0]
            else:
                inputs = data

            # Ensure inputs are tensors; convert if not
            if not isinstance(inputs, torch.Tensor):
                inputs = torch.tensor(inputs, dtype=torch.float32)  # You might need to adjust dtype based on your model's expected input
                
            # Move inputs to the correct device
            inputs = inputs.to(model.device)
            
            # Forward pass to get outputs from the model
            outputs = model(inputs)
            
            # Assuming you want to extract something specific from the outputs; adjust accordingly
            embedding = outputs.last_hidden_state.mean(dim=1)
            embeddings.append(embedding)

    # Concatenate all batch embeddings into a single tensor
    embeddings = torch.cat(embeddings, dim=0)
    return embeddings


### Logic Model:

In [104]:
# Create a DataLoader for extracting embeddings (inputs only)
subset_loader = create_subset_loader(full_dataset, fraction=1.0)

def extract_embeddings(data_loader, model, tokenizer):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for data in data_loader:
            input_texts, _ = data  # Unpack the tuple containing inputs and labels

            # Tokenize inputs
            inputs = tokenizer(input_texts, padding=True, truncation=True, return_tensors="pt", max_length=512)
            inputs = {k: v.to(model.device) for k, v in inputs.items()}  # Move tokenized inputs to the correct device

            # Forward pass to get outputs from the model
            outputs = model(**inputs)
            
            # Extract embeddings; adjust according to what you actually need, here using the last hidden state
            embedding = outputs.last_hidden_state.mean(dim=1)
            embeddings.append(embedding)

    # Concatenate all batch embeddings into a single tensor
    embeddings = torch.cat(embeddings, dim=0)
    return embeddings

# Example usage
def main(model, data_loader, tokenizer):
    embeddings = extract_embeddings(data_loader, model, tokenizer)
    covariance_matrix, entropy, normalized_entropy = calculate_covariance_and_entropy(embeddings.detach().cpu().numpy())
    print(f"Entropy: {entropy}, Normalized Entropy: {normalized_entropy}")

# Assuming 'model' is already defined and properly configured
model_path = "/Users/thomasbush/langmodel/logic_model"
dataset_path = "/Users/thomasbush/langmodel/direct_data.csv"
    
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModel.from_pretrained(model_path)
main(model, subset_loader, tokenizer)


Entropy: -787.4395632753549, Normalized Entropy: -118.52264970798292


### Redundand Model:

In [105]:
subset_loader = create_subset_loader(full_dataset, fraction=1.0)

def extract_embeddings(data_loader, model, tokenizer):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for data in data_loader:
            input_texts, _ = data  # Unpack the tuple containing inputs and labels

            # Tokenize inputs
            inputs = tokenizer(input_texts, padding=True, truncation=True, return_tensors="pt", max_length=512)
            inputs = {k: v.to(model.device) for k, v in inputs.items()}  # Move tokenized inputs to the correct device

            # Forward pass to get outputs from the model
            outputs = model(**inputs)
            
            # Extract embeddings; adjust according to what you actually need, here using the last hidden state
            embedding = outputs.last_hidden_state.mean(dim=1)
            embeddings.append(embedding)

    # Concatenate all batch embeddings into a single tensor
    embeddings = torch.cat(embeddings, dim=0)
    return embeddings

# Example usage
def main(model, data_loader, tokenizer):
    embeddings = extract_embeddings(data_loader, model, tokenizer)
    covariance_matrix, entropy, normalized_entropy = calculate_covariance_and_entropy(embeddings.detach().cpu().numpy())
    print(f"Entropy: {entropy}, Normalized Entropy: {normalized_entropy}")

# Assuming 'model' is already defined and properly configured
model_path = "/Users/thomasbush/langmodel/redu_model"
dataset_path = "/Users/thomasbush/langmodel/redundant_data.csv"
    
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModel.from_pretrained(model_path)
main(model, subset_loader, tokenizer)

Entropy: 1.593753028287761, Normalized Entropy: 0.23988613311106072
