In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Feb 9 2025

@author: Yaning
"""

# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
# import torch.nn.functional as F # mainly for ReLU
import numpy as np
import copy
import re
import torch.nn.functional as F


In [2]:
from transformers import AutoTokenizer, AutoModelForCausalLM, GPT2Model

tokenizer = AutoTokenizer.from_pretrained("dbmdz/german-gpt2")
model = AutoModelForCausalLM.from_pretrained("dbmdz/german-gpt2")
model.eval()

  from .autonotebook import tqdm as notebook_tqdm


GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(50265, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.0, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D(nf=2304, nx=768)
          (c_proj): Conv1D(nf=768, nx=768)
          (attn_dropout): Dropout(p=0.0, inplace=False)
          (resid_dropout): Dropout(p=0.0, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D(nf=3072, nx=768)
          (c_proj): Conv1D(nf=768, nx=3072)
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.0, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=768, out_features=50265, bias=False)
)

In [3]:
def normalise_matrix(m):
    m_mean = m.mean()
    m_std = m.std()
    m = (m-m_mean)/m_std
    return m

In [5]:
batch_size = 537

In [6]:
# # Define the Transition Layer (T)
# class TransitionLayer(nn.Module):
#     def __init__(self, embedding_dim):
#         super(TransitionLayer, self).__init__()
#         # Trainable transition matrix to map standard to dialect
#         self.transition_matrix = nn.Parameter(torch.randn(537, embedding_dim))
    
#     def forward(self, standard_embeddings):
#         # Apply the transformation: H_dialect = T * H_standard
#         return torch.matmul(standard_embeddings, self.transition_matrix)

# # attention transition matrix
# class TransitionLayer(nn.Module):
#     def __init__(self, embedding_dim):
#         super(TransitionLayer, self).__init__()
#         # Trainable transition matrix to map standard to dialect
#         self.query = nn.Parameter(torch.randn(embedding_dim, embedding_dim))
#         self.key = nn.Parameter(torch.randn(embedding_dim, embedding_dim))
#         self.value = nn.Parameter(torch.randn(embedding_dim, embedding_dim))
#         self.transition_matrix = nn.Parameter(torch.randn(embedding_dim, embedding_dim))
        
    
#     def forward(self, standard_embeddings):
#         attention_matrix = torch.matmul(self.query, self.key)/(10 ** 0.5)
#         attention_weights = F.softmax(attention_matrix, dim=-1)
#         transition_matrix = torch.matmul(attention_weights, self.value)
#         return torch.matmul(standard_embeddings, transition_matrix)

# # Define the Transition Layer (T)
# class TransitionLayer(nn.Module):
#     def __init__(self, embedding_dim):
#         super(TransitionLayer, self).__init__()
#         # Trainable transition matrix to map standard to dialect
#         self.transition_matrix = nn.Parameter(torch.randn(embedding_dim, embedding_dim))
    
#     def forward(self, standard_embeddings):
#         # Apply the transformation: H_dialect = T * H_standard
#         # return torch.matmul(standard_embeddings, self.transition_matrix)
#         # print("first")
#         # print(standard_embeddings.shape)
#         # print("second")
#         # print(self.transition_matrix.shape)
#         standard_embeddings_mean = standard_embeddings.mean()
#         standard_embeddings_std = standard_embeddings.std()
#         standard_embeddings = (standard_embeddings - standard_embeddings_mean) / standard_embeddings_std

        
#         transition_matrix_mean = self.transition_matrix.mean()
#         transition_matrix_std = self.transition_matrix.std()
#         scaled_transition = (self.transition_matrix - transition_matrix_mean) / transition_matrix_std
        
#         return torch.matmul(standard_embeddings, scaled_transition)

# add transition layer to the standard embedding
class TransitionLayer(nn.Module):
    def __init__(self, embedding_dim):
        super(TransitionLayer, self).__init__()
        # Trainable transition matrix to map standard to dialect
        self.transition_matrix = nn.Parameter(torch.randn(batch_size, embedding_dim))
    
    def forward(self, standard_embeddings):

        scaled_standard = normalise_matrix(standard_embeddings)

        scaled_transition = normalise_matrix(self.transition_matrix)

        added = scaled_standard + scaled_transition/1
        
        return added

In [7]:
embedding_dim = model.config.hidden_size
# Initialize the Transition Layer with the same embedding dimension
transition_layer = TransitionLayer(embedding_dim)

# Load the saved transition matrix into the model
transition_layer.load_state_dict(torch.load('transition_matrix_new.pth'))


  transition_layer.load_state_dict(torch.load('transition_matrix_new.pth'))


<All keys matched successfully>

In [8]:
def ask(question, model, tokenizer, max_length=100, device="cpu"):
    # Tokenize the input question
    inputs = tokenizer(question, return_tensors="pt").to(device)
    
    # Generate the response
    with torch.no_grad():
        output = model.generate(
            inputs.input_ids,
            max_length=max_length,
            temperature=0.7,  # Controls randomness (lower = more deterministic)
            top_k=50,         # Top-k sampling for diversity
            do_sample=True,    # Enable sampling for less repetitive responses
            output_hidden_states=True,
            return_dict_in_generate=True 
        )
    
    hidden_states = output.hidden_states  # List of hidden states from each layer
    # last_layer_hidden_state = hidden_states[-1]
    # Decode the response
    response = tokenizer.decode(output.sequences[0], skip_special_tokens=True)
    return response, hidden_states

In [9]:
import torch.nn.functional as F

def ask_with_transition(question, model, tokenizer, transition_layer, max_length=100, device="cpu"):
    # Tokenize the input question
    inputs = tokenizer(question, return_tensors="pt").to(device)
    
    # Initialize the input ids and attention mask
    input_ids = inputs.input_ids
    attention_mask = inputs.attention_mask
    
    # Use the model to generate responses while controlling the generation loop
    model.eval()  # Set the model to evaluation mode
    generated_ids = input_ids
    for _ in range(max_length - input_ids.shape[1]):
        # Generate the logits and hidden states for the current input
        outputs = model(input_ids=generated_ids, attention_mask=attention_mask, output_hidden_states=True)
        
        # Get the last hidden state from the outputs
        hidden_states = outputs.hidden_states
        last_hidden_state = hidden_states[-1]  # [batch_size, seq_len, hidden_size]

        # print(last_hidden_state.shape)
        
        # Apply the transition matrix to the last hidden state
        mean = last_hidden_state[:, -1, :].mean()
        std = last_hidden_state[:, -1, :].std()
        scaled_lhs = (last_hidden_state[:, -1, :] - mean)/std

        trans_matrix = transition_layer.transition_matrix
        mean = trans_matrix.mean()
        std = trans_matrix.std()
        scaled_trans = (trans_matrix - mean)/std

        transformed_hidden_state = torch.matmul(scaled_lhs, scaled_trans)
        
        # Transform the hidden state to logits
        # logits = model.lm_head(transformed_hidden_state)  # Project the transformed hidden state to logits
        logits = model.lm_head(transformed_hidden_state)
        
        # Apply softmax to get probabilities for the next token
        next_token_logits = logits[:,:]  # Get the logits for the next token
        # print(next_token_logits)
        next_token_probs = F.softmax(next_token_logits, dim=-1)
        
        # Sample the next token from the probabilities
        next_token_id = torch.multinomial(next_token_probs, 1)
        
        # Append the predicted token to the generated sequence
        generated_ids = torch.cat([generated_ids, next_token_id], dim=1)
        print(generated_ids)
    
    # Decode the generated tokens into text
    response = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    
    # return response, transformed_hidden_state
    return response


In [52]:
import torch.nn.functional as F

def M_add(question, model, tokenizer, transition_layer, max_length=100, device="cpu"):
    # Tokenize the input question
    inputs = tokenizer(question, return_tensors="pt").to(device)
    
    # Initialize the input ids and attention mask
    input_ids = inputs.input_ids
    attention_mask = inputs.attention_mask
    
    # Use the model to generate responses while controlling the generation loop
    model.eval()  # Set the model to evaluation mode
    generated_ids = input_ids
    for _ in range(max_length - input_ids.shape[1]):
        # Generate the logits and hidden states for the current input
        outputs = model(input_ids=generated_ids, attention_mask=attention_mask, output_hidden_states=True)
        
        # Get the last hidden state from the outputs
        hidden_states = outputs.hidden_states
        last_hidden_state = hidden_states[-1]  # [batch_size, seq_len, hidden_size]

        # print(last_hidden_state.shape)
        
        # Apply the transition matrix to the last hidden state
        scaled_lhs = normalise_matrix(last_hidden_state[:,-1,:])
        # print(scaled_lhs.shape)
        scaled_trans = normalise_matrix(transition_layer.transition_matrix)
        # print(scaled_trans.shape)
        # transformed_hidden_state = transition_layer(scaled_lhs)
        # print(transformed_hidden_state.shape)
        transformed_hidden_state = scaled_lhs + scaled_trans/1000
        # Transform the hidden state to logits
        # logits = model.lm_head(transformed_hidden_state)  # Project the transformed hidden state to logits
        logits = model.lm_head(transformed_hidden_state)
        
        # Apply softmax to get probabilities for the next token
        next_token_logits = logits[:,-1, :]  # Get the logits for the next token
        # print(next_token_logits)
        next_token_probs = F.softmax(next_token_logits, dim=-1)
        
        # Sample the next token from the probabilities
        next_token_id = torch.multinomial(next_token_probs, 1)
        # print(generated_ids.shape)
        # print(next_token_id.shape)
        
        # Append the predicted token to the generated sequence
        generated_ids = torch.cat([generated_ids, next_token_id.unsqueeze(0)], dim=1)
        # print(generated_ids)
    
    # Decode the generated tokens into text
    response = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    
    # return response, transformed_hidden_state
    return response


In [29]:
# Example: Ask a question
question = "Geschichte der TU Dresden"
question = "Er da hat sich auf seiner Mary im Sattel aufrichten"
response, output = ask(question, model, tokenizer)
print("LLaMA's Response:", response)

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


LLaMA's Response: Er da hat sich auf seiner Mary im Sattel aufrichten lassen.
- Ja.
- Ja.
- Ja.
- Ja.
Also, was ist mit dir los?
- Du hast nicht mit mir geredet?
- Nein.
- Ich hab nicht mit dir geredet?
- Nein.
Ich hab nicht mit dir geredet, weil du mich nicht verstehst, also...
Weil ich...
- Nein, nein, ich verstehe.
- Nein, ich verstehe.


In [53]:
# Example: Ask a question
question = "Geschichte der TU Dresden"
# question = "Er da hat sich auf seiner Mary im Sattel aufrichten"
response = M_add(question, model, tokenizer, transition_layer)
print("LLaMA's Response:", response)

IndexError: too many indices for tensor of dimension 2