In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import math
import plotly.graph_objects as go
from plotly.subplots import make_subplots


In [2]:
# Define a small dataset
sentences = [
    "I'm extremly frustrated with your service. My order has been delayed for the third time without explanation.",
    #"Recent studies on climate change indicate a significant impact on global biodiversity.",
    "The brilliantly terrible movie had the audience laughing hysterically, though not for the reasons the director intended",
    #"Despite the scathing criticism, the artist felt a bittersweet satisfaction knowing her controversial work had sparked intense debate."
]
# Labels: [valence, arousal, relevance]
# valence: -1 to 1 (negative to positive)
# arousal: 0 to 1 (low to high intensity)
# relevance: 0 to 1 (low to high personal/social significance)
labels = torch.tensor([
    [-0.9, 0.9, 0.8],
    #[-0.3, 0.2, 0.5],
    [-0.3, 0.7, 0.5],
    #[ 0.2, 0.2, 0.5],
], dtype=torch.float)

In [3]:
# Tokenize sentences (very simple tokenization for demonstration)
vocab = set(" ".join(sentences).split())
word_to_ix = {word: i for i, word in enumerate(vocab)}
ix_to_word = {i: word for word, i in word_to_ix.items()}

# Convert sentences to tensors
max_len = max(len(s.split()) for s in sentences)
tensor_sentences = torch.zeros((len(sentences), max_len), dtype=torch.long)
for i, sentence in enumerate(sentences):
    for j, word in enumerate(sentence.split()):
        tensor_sentences[i, j] = word_to_ix[word]

# Convert labels to tensors
tensor_labels = torch.tensor(labels, dtype=torch.float)

# Define the standard transformer
class StandardTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers):
        super(StandardTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, nhead),
            num_layers
        )
        self.fc = nn.Linear(d_model, 1)

    def forward(self, x):
        x = self.embedding(x)
        x = self.transformer(x)
        x = x.mean(dim=1)  # Average pooling
        x = self.fc(x)
        return torch.sigmoid(x)

# Define the valuation-based transformer
class ValuationTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers):
        super(ValuationTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.emotion_embedding = nn.Embedding(vocab_size, d_model)
        self.valuation_fc = nn.Linear(2 * d_model, d_model)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, nhead),
            num_layers
        )
        self.fc = nn.Linear(d_model, 3)  # Output 3 values

    def forward(self, x):
        word_emb = self.embedding(x)
        emotion_emb = self.emotion_embedding(x)
        combined_emb = torch.cat((word_emb, emotion_emb), dim=-1)
        valuation = self.valuation_fc(combined_emb)
        x = self.transformer(valuation)
        x = x.mean(dim=1)  # Average pooling
        x = self.fc(x)
        return torch.tanh(x[:, 0]).unsqueeze(1), torch.sigmoid(x[:, 1:])  # tanh for valence, sigmoid for arousal and relevance



  tensor_labels = torch.tensor(labels, dtype=torch.float)


In [4]:
# Instantiate models
vocab_size = len(word_to_ix)
d_model = 16
nhead = 2
num_layers = 3
epochs=200

standard_model = StandardTransformer(vocab_size, d_model, nhead, num_layers)
valuation_model = ValuationTransformer(vocab_size, d_model, nhead, num_layers)

def train_model(model, epochs):
    optimizer = optim.Adam(model.parameters())
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        optimizer.zero_grad()
        if isinstance(model, StandardTransformer):
            outputs = model(tensor_sentences)
            loss = criterion(outputs, labels[:, 0].unsqueeze(1))  # Compare only with valence
        else:
            valence, arousal_relevance = model(tensor_sentences)
            outputs = torch.cat((valence, arousal_relevance), dim=1)
            loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

print("Training Standard Transformer:")
train_model(standard_model, epochs)

print("\nTraining Valuation-based Transformer:")
train_model(valuation_model, epochs)

# Analyze weights
def analyze_weights(model, model_name):
    print(f"\nAnalyzing {model_name} weights:")
    for name, param in model.named_parameters():
        if 'weight' in name:
            print(f"{name}:")
            print(f"  Mean: {param.data.mean().item():.4f}")
            print(f"  Std: {param.data.std().item():.4f}")
            print(f"  Max: {param.data.max().item():.4f}")
            print(f"  Min: {param.data.min().item():.4f}")

analyze_weights(standard_model, "Standard Transformer")
analyze_weights(valuation_model, "Valuation-based Transformer")





Training Standard Transformer:
Epoch [10/200], Loss: 0.5387
Epoch [20/200], Loss: 0.5237
Epoch [30/200], Loss: 0.5106
Epoch [40/200], Loss: 0.5036
Epoch [50/200], Loss: 0.4965
Epoch [60/200], Loss: 0.4899
Epoch [70/200], Loss: 0.4854
Epoch [80/200], Loss: 0.4797
Epoch [90/200], Loss: 0.4761
Epoch [100/200], Loss: 0.4742
Epoch [110/200], Loss: 0.4701
Epoch [120/200], Loss: 0.4692
Epoch [130/200], Loss: 0.4670
Epoch [140/200], Loss: 0.4653
Epoch [150/200], Loss: 0.4638
Epoch [160/200], Loss: 0.4625
Epoch [170/200], Loss: 0.4613
Epoch [180/200], Loss: 0.4605
Epoch [190/200], Loss: 0.4597
Epoch [200/200], Loss: 0.4589

Training Valuation-based Transformer:
Epoch [10/200], Loss: 0.0535
Epoch [20/200], Loss: 0.0443
Epoch [30/200], Loss: 0.0454
Epoch [40/200], Loss: 0.0446
Epoch [50/200], Loss: 0.0390
Epoch [60/200], Loss: 0.0360
Epoch [70/200], Loss: 0.0274
Epoch [80/200], Loss: 0.0130
Epoch [90/200], Loss: 0.0071
Epoch [100/200], Loss: 0.0036
Epoch [110/200], Loss: 0.0025
Epoch [120/200], L

In [5]:


def plot_sentence_embedding_heatmaps(standard_model, valuation_model, sentences, word_to_ix):
    # Extract embedding weights
    standard_weights = standard_model.embedding.weight.detach().numpy()
    valuation_weights = valuation_model.embedding.weight.detach().numpy()

    for sentence in sentences:
        words = sentence.split()
        word_indices = [word_to_ix[word] for word in words]

        # Extract weights for words in the sentence
        standard_sentence_weights = standard_weights[word_indices]
        valuation_sentence_weights = valuation_weights[word_indices]

        # Create figure with two subplots
        fig = make_subplots(rows=1, cols=2,
                            subplot_titles=('Standard Transformer', 'Valuation Transformer'),
                            shared_yaxes=True)

        # Plot heatmap for Standard Transformer
        fig.add_trace(
            go.Heatmap(z=standard_sentence_weights,
                       colorscale='RdBu',
                       zmid=0),
            row=1, col=1
        )

        # Plot heatmap for Valuation Transformer
        fig.add_trace(
            go.Heatmap(z=valuation_sentence_weights,
                       colorscale='RdBu',
                       zmid=0),
            row=1, col=2
        )

        # Update layout
        fig.update_layout(
            title_text=f'Embedding Weights for: "{sentence}"',
            height=max(500, 30*len(words)),  # Adjust height based on number of words
            width=1200
        )

        # Update axes
        for i in range(1, 3):
            fig.update_xaxes(title_text='Embedding Dimensions', row=1, col=i)
            fig.update_yaxes(title_text='Words' if i == 1 else '',
                             ticktext=words,
                             tickvals=list(range(len(words))),
                             row=1, col=i)

        fig.show()

# Call the function
plot_sentence_embedding_heatmaps(standard_model, valuation_model, sentences, word_to_ix)

In [6]:
tokens = sentence.split()
input_ids = torch.tensor([[word_to_ix.get(word, 0) for word in tokens]])

def visualize_transformer_outputs(standard_model, valuation_model, sentence, word_to_ix):
    # Tokenize the sentence, keeping duplicates
    tokens = sentence.split()
    input_ids = torch.tensor([[word_to_ix.get(word, 0) for word in tokens]])

    def get_output_magnitudes(model):
        embeddings = model.embedding(input_ids)
        model.eval()
        with torch.no_grad():
            output = model.transformer(embeddings)
        output = output.squeeze(0)
        return torch.norm(output, dim=0).detach().numpy()

    standard_magnitudes = get_output_magnitudes(standard_model)
    valuation_magnitudes = get_output_magnitudes(valuation_model)

    # Create unique labels for repeated words
    unique_tokens = []
    for i, token in enumerate(tokens):
        count = tokens[:i].count(token)
        unique_tokens.append(f"{token}_{count+1}" if count > 0 else token)

    fig = go.Figure()

    # Add bars for both models
    fig.add_trace(
        go.Bar(x=unique_tokens, y=standard_magnitudes, name="Standard Transformer", marker_color='blue')
    )
    fig.add_trace(
        go.Bar(x=unique_tokens, y=valuation_magnitudes, name="Valuation Transformer", marker_color='red')
    )

    # Update layout
    fig.update_layout(
        title_text=f"Token Output Magnitude for:<br>'{sentence}'",
        xaxis_title="Tokens",
        yaxis_title="Output Magnitude",
        barmode='group',
        height=600,
        width=1000,
        legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01)
    )

    fig.update_xaxes(tickangle=45)

    fig.show()

# Visualize transformer outputs for both sentences
for sentence in sentences:
    visualize_transformer_outputs(standard_model, valuation_model, sentence, word_to_ix)