<a href="https://colab.research.google.com/github/Abhay27273/Emotional-Intelligent-/blob/main/T5_layers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers datasets torch sentencepiece

Collecting datasets
  Downloading datasets-3.4.1-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.

In [None]:
import pandas as pd

# Load the dataset
df = pd.read_csv("/combined_emotion.csv")  # Update with the correct file path

# Inspect the dataset
print(df.head())
print(df['text'].dtype)

                                                text  emotion
0  say jim how about going for a few beers after ...  neutral
1  can you do pushups of course i can its a piece...  neutral
2  can you study with the radio on no i listen to...  neutral
3  are you all right i will be all right soon i w...  neutral
4  hey john nice skates are they new yeah i just ...  neutral
object


In [None]:
df['input_text'] = "generate empathetic response: " + df['text'] + " [emotion: " + df['emotion'] + "]"
df['target_text'] = df['text']  # Use the same text as the target for now (update this based on your task)

# Inspect the formatted dataset
print(df[['input_text', 'target_text']].head())
print(df['input_text'].dtype)
print(df['target_text'].dtype)

                                          input_text  \
0  generate empathetic response: say jim how abou...   
1  generate empathetic response: can you do pushu...   
2  generate empathetic response: can you study wi...   
3  generate empathetic response: are you all righ...   
4  generate empathetic response: hey john nice sk...   

                                         target_text  
0  say jim how about going for a few beers after ...  
1  can you do pushups of course i can its a piece...  
2  can you study with the radio on no i listen to...  
3  are you all right i will be all right soon i w...  
4  hey john nice skates are they new yeah i just ...  
object
object


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_k):
        super(ScaledDotProductAttention, self).__init__()
        self.d_k = d_k  # Dimension of the key vectors

    def forward(self, Q, K, V, mask=None):
        # Q: Query matrix (batch_size, seq_len, d_k)
        # K: Key matrix (batch_size, seq_len, d_k)
        # V: Value matrix (batch_size, seq_len, d_v)
        # mask: Optional mask (batch_size, seq_len, seq_len)

        # Compute attention scores
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Apply mask (if provided)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # Apply softmax to get attention weights
        attn_weights = F.softmax(scores, dim=-1)

        # Apply attention weights to the value matrix
        output = torch.matmul(attn_weights, V)

        return output, attn_weights

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model  # Dimension of the model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads  # Dimension of each head

        # Linear layers for Q, K, V
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)

        # Output linear layer
        self.W_O = nn.Linear(d_model, d_model)

        # Scaled Dot-Product Attention
        self.attention = ScaledDotProductAttention(self.d_k)

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)

        # Linear projections for Q, K, V
        Q = self.W_Q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_K(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_V(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Apply Scaled Dot-Product Attention
        x, attn_weights = self.attention(Q, K, V, mask)

        # Concatenate heads and apply output linear layer
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        output = self.W_O(x)

        return output, attn_weights

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len):
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        self.max_seq_len = max_seq_len

        # Create positional encodings
        pe = torch.zeros(max_seq_len, d_model)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        # x: Input embeddings (batch_size, seq_len, d_model)
        return x + self.pe[:, :x.size(1)]

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(F.gelu(self.fc1(x)))

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Multi-Head Attention with residual connection
        attn_output, _ = self.self_attn(x, x, x, mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # Feed-Forward Network with residual connection
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        x = self.norm2(x)

        return x

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.encoder_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        # Self-Attention with residual connection
        attn_output, _ = self.self_attn(x, x, x, tgt_mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # Encoder-Decoder Attention with residual connection
        attn_output, _ = self.encoder_attn(x, encoder_output, encoder_output, src_mask)
        x = x + self.dropout(attn_output)
        x = self.norm2(x)

        # Feed-Forward Network with residual connection
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        x = self.norm3(x)

        return x

In [None]:
class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, dropout=0.1):
        super(Encoder, self).__init__()
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return x

class Decoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, dropout=0.1):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return x

In [None]:
class T5Model(nn.Module):
    def __init__(self, vocab_size, max_seq_len, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super(T5Model, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_len)
        self.encoder = Encoder(num_layers, d_model, num_heads, d_ff, dropout)
        self.decoder = Decoder(num_layers, d_model, num_heads, d_ff, dropout)
        self.fc_out = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # Embed and add positional encoding
        src = self.dropout(self.positional_encoding(self.embedding(src)))
        tgt = self.dropout(self.positional_encoding(self.embedding(tgt)))

        # Pass through encoder and decoder
        encoder_output = self.encoder(src, src_mask)
        decoder_output = self.decoder(tgt, encoder_output, src_mask, tgt_mask)

        # Output layer
        output = self.fc_out(decoder_output)
        return output

In [None]:
from torch.utils.data import Dataset, DataLoader

class T5Dataset(Dataset):
    def __init__(self, src_texts, tgt_texts, tokenizer, max_seq_len=128):
        self.src_texts = src_texts
        self.tgt_texts = tgt_texts
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        # Tokenize source and target texts
        src = self.tokenizer(
            self.src_texts[idx],
            max_length=self.max_seq_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        tgt = self.tokenizer(
            self.tgt_texts[idx],
            max_length=self.max_seq_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        # Return input IDs, attention masks, and labels
        return {
            'src_input_ids': src['input_ids'].squeeze(0),
            'src_attention_mask': src['attention_mask'].squeeze(0),
            'tgt_input_ids': tgt['input_ids'].squeeze(0),
            'tgt_attention_mask': tgt['attention_mask'].squeeze(0)
        }

In [None]:
# Example data
src_texts = [
    "generate empathetic response: I feel so happy today! [emotion: happiness]",
    "generate empathetic response: I am feeling sad. [emotion: sadness]"
]
tgt_texts = [
    "That's great to hear! Keep smiling!",
    "I'm sorry to hear that. I hope things get better soon."
]

# Initialize the tokenizer
from transformers import T5Tokenizer
tokenizer = T5Tokenizer.from_pretrained('t5-small', legacy=False)

# Create the dataset
dataset = T5Dataset(src_texts, tgt_texts, tokenizer)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False)

In [None]:
# Training loop
num_epochs = 3
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0

    for batch in train_loader:
        # Move batch to GPU (if available)
        src_input_ids = batch['src_input_ids'].to(device)
        src_attention_mask = batch['src_attention_mask'].to(device)
        tgt_input_ids = batch['tgt_input_ids'].to(device)
        tgt_attention_mask = batch['tgt_attention_mask'].to(device)

        # Forward pass
        output = model(
            src=src_input_ids,
            tgt=tgt_input_ids[:, :-1],  # Exclude the last token
            src_mask=src_attention_mask,
            tgt_mask=tgt_attention_mask[:, :-1]  # Exclude the last token
        )

        # Compute loss
        loss = F.cross_entropy(
            output.view(-1, output.size(-1)),
            tgt_input_ids[:, 1:].contiguous().view(-1)  # Shifted target
        )

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Print average loss for the epoch
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch + 1}, Loss: {avg_loss}")

Epoch 1, Loss: 10.573751449584961
Epoch 2, Loss: 6.2018561363220215
Epoch 3, Loss: 3.4238131046295166


In [None]:
model.eval()  # Set the model to evaluation mode
total_loss = 0

with torch.no_grad():
    for batch in val_loader:
        # Move batch to GPU (if available)
        src_input_ids = batch['src_input_ids'].to(device)
        src_attention_mask = batch['src_attention_mask'].to(device)
        tgt_input_ids = batch['tgt_input_ids'].to(device)
        tgt_attention_mask = batch['tgt_attention_mask'].to(device)

        # Forward pass
        output = model(
            src=src_input_ids,
            tgt=tgt_input_ids[:, :-1],  # Exclude the last token
            src_mask=src_attention_mask,
            tgt_mask=tgt_attention_mask[:, :-1]  # Exclude the last token
        )

        # Compute loss
        loss = F.cross_entropy(
            output.view(-1, output.size(-1)),
            tgt_input_ids[:, 1:].contiguous().view(-1)  # Shifted target
        )

        total_loss += loss.item()

# Print validation loss
avg_loss = total_loss / len(val_loader)
print(f"Validation Loss: {avg_loss}")

Validation Loss: 2.278475761413574


In [None]:
# Save the model
torch.save(model.state_dict(), 't5_custom_model.pth')

In [None]:
from transformers import T5ForConditionalGeneration, T5Tokenizer

# Initialize the tokenizer
tokenizer = T5Tokenizer.from_pretrained('t5-small')

# Initialize the model
model = T5ForConditionalGeneration.from_pretrained('t5-small')

# Move the model to GPU (if available)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

T5ForConditionalGeneration(
  (shared): Embedding(32128, 512)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 512)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=512, out_features=512, bias=False)
              (k): Linear(in_features=512, out_features=512, bias=False)
              (v): Linear(in_features=512, out_features=512, bias=False)
              (o): Linear(in_features=512, out_features=512, bias=False)
              (relative_attention_bias): Embedding(32, 8)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseActDense(
              (wi): Linear(in_features=512, out_features=2048, bias=False)
              (wo): Linear(in_features=2048, out_features=512, bias=False)
              (dropout): Drop

In [None]:
# Generate text
input_text = "Someone said: I feel so happy today!. They are feeling happy. How would you respond empathetically?"
input_ids = tokenizer(input_text, return_tensors='pt').input_ids.to(device)

# Generate output
output = model.generate(input_ids, max_length=50)  # Use model.generate() for text generation
generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

print(generated_text)

Someone said: I feel so happy today!. They are feeling happy. How would you respond empathetically?


In [None]:
def greedy_decode(model, input_ids, max_length=50):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        # Initialize the output sequence with the start token
        output_ids = torch.tensor([[tokenizer.pad_token_id]]).to(device)  # Start with pad token

        for _ in range(max_length):
            # Forward pass
            # Changed from src,tgt to input_ids, decoder_input_ids
            output = model(
                input_ids=input_ids,
                decoder_input_ids=output_ids,
            )

            # Get the predicted token (greedy decoding)
            next_token = output.logits.argmax(dim=-1)[:, -1].unsqueeze(1)

            # Append the predicted token to the output sequence
            output_ids = torch.cat([output_ids, next_token], dim=-1)

            # Stop if the end-of-sequence token is generated
            if next_token.item() == tokenizer.eos_token_id:
                break

        return output_ids

In [None]:
# Generate text
input_text = "generate empathetic response: I feel so happy today! [emotion: happiness]"
input_ids = tokenizer(input_text, return_tensors='pt').input_ids.to(device)

# Generate output
output_ids = greedy_decode(model, input_ids, max_length=50)
generated_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)

print(generated_text)

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


[emotion: happiness]


In [None]:
def beam_search_decode(model, input_ids, beam_width=5, max_length=50):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        # Initialize the beam with the start token
        beams = [(torch.tensor([[tokenizer.pad_token_id]]).to(device), 0.0)]  # (sequence, score)

        for _ in range(max_length):
            new_beams = []
            for beam_seq, beam_score in beams:
                # Forward pass
                # Changed from src,tgt to input_ids, decoder_input_ids
                output = model(
                    input_ids=input_ids,
                    decoder_input_ids=beam_seq,
                )  # Changed here

                # Get the top-k tokens and their probabilities
                log_probs = F.log_softmax(output.logits[:, -1, :], dim=-1) # Changed here
                topk_probs, topk_tokens = log_probs.topk(beam_width, dim=-1)

                # Expand the beam
                for i in range(beam_width):
                    new_seq = torch.cat([beam_seq, topk_tokens[:, i].unsqueeze(1)], dim=-1)
                    new_score = beam_score + topk_probs[:, i].item()
                    new_beams.append((new_seq, new_score))

            # Select the top-k beams
            beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]

            # Stop if all beams have generated the end-of-sequence token
            if all(beam_seq[:, -1].item() == tokenizer.eos_token_id for beam_seq, _ in beams):
                break

        # Return the best sequence
        best_sequence = beams[0][0]
        return best_sequence

In [None]:
# Generate text
input_text = "generate empathetic response: I feel so happy today! [emotion: happiness]"
input_ids = tokenizer(input_text, return_tensors='pt').input_ids.to(device)

# Generate output
output_ids = beam_search_decode(model, input_ids, beam_width=5, max_length=50)
generated_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)

print(generated_text)

[emotion: happiness] I feel so happy today! [emotion: happiness] : :) :) :) :) :) :) :)
