In [1]:
import re
from collections import Counter
import json
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader as DL
import torch.utils.data
import math
import torch.nn.functional as F
import torch.optim as optim
import random

In [2]:
dataset_path = "/content/big_fitness_dataset.txt"

In [3]:
def load_glove_embeddings(glove_path="/content/glove.6B.100d.txt"):
    glove_dict = {}
    with open(glove_path, 'r', encoding="utf8") as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            vector = torch.tensor([float(val) for val in parts[1:]], dtype=torch.float32)
            glove_dict[word] = vector
    return glove_dict

glove_dict = load_glove_embeddings()

In [4]:
with open("/content/big_fitness_dataset.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()

questions, answers = [], []
for i in range(0, len(lines), 2):
    if i + 1 < len(lines) and lines[i].startswith("Q:") and lines[i + 1].startswith("A:"):
        # Clean questions
        q = lines[i].replace("Q:", "").strip().lower()
        # Remove multiple spaces
        q = " ".join(q.split())
        # Ensure question ends with question mark if it's an actual question
        if any(q.startswith(w) for w in ["what", "how", "why", "when", "where", "which", "can", "should", "is", "are", "do", "does"]) and not q.endswith("?"):
            q += "?"

        # Clean answers
        a = lines[i + 1].replace("A:", "").strip().lower()
        # Remove multiple spaces
        a = " ".join(a.split())
        # Ensure answers are complete sentences with proper punctuation
        if not a.endswith(('.', '!', '?')):
            a += '.'
        # Capitalize first letter of answer
        if a:
            a = a[0].upper() + a[1:]

        # Skip very short or low-quality pairs
        if len(q.split()) < 2 or len(a.split()) < 3:
            continue

        questions.append(q)
        answers.append(a)

print(f"Loaded and cleaned {len(questions)} questions and {len(answers)} answers.")

Loaded and cleaned 10110 questions and 10110 answers.


In [5]:
special_tokens = ["<pad>", "<sos>", "<eos>", "<unk>"]
word_freq = Counter(" ".join(questions + answers).split())
vocab = special_tokens + sorted(word_freq.keys())
word2index = {word: idx for idx, word in enumerate(vocab)}
index2word = {idx: word for word, idx in word2index.items()}

In [6]:
def encode_text(text, word_map, max_len=20):
    tokens = text.strip().lower().split()
    encoded = [word_map.get(word, word_map["<unk>"]) for word in tokens]
    encoded = [word_map["<sos>"]] + encoded[:max_len - 2] + [word_map["<eos>"]]
    return encoded + [word_map["<pad>"]] * (max_len - len(encoded))

encoded_questions = [encode_text(q, word2index) for q in questions]
encoded_answers = [encode_text(a, word2index) for a in answers]

print("Sample Encoded Q:", encoded_questions[0])
print("Decoded Q:", [index2word[i] for i in encoded_questions[0]])

Sample Encoded Q: [1, 14836, 2085, 13754, 2543, 9162, 11118, 5411, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Decoded Q: ['<sos>', 'what', 'are', 'the', 'benefits', 'of', 'regular', 'exercise?', '<eos>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>']


In [7]:
def create_embedding_matrix(word2index, glove_dict, embedding_dim=100):
    matrix = torch.randn(len(word2index), embedding_dim) * 0.1
    for word, idx in word2index.items():
        if word in glove_dict:
            matrix[idx] = glove_dict[word]
    return matrix

embedding_matrix = create_embedding_matrix(word2index, glove_dict, embedding_dim=100)

In [8]:
class FitnessDataset(Dataset):
    def __init__(self, questions, answers):
        self.questions = questions
        self.answers = answers

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return torch.tensor(self.questions[idx]), torch.tensor(self.answers[idx])

dataset = FitnessDataset(encoded_questions, encoded_answers)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DL(train_dataset, batch_size=32, shuffle=True)
val_loader = DL(val_dataset, batch_size=32)


In [9]:
class FitnessGRUBot(nn.Module):
    def __init__(self, vocab_size, embed_dim=100, hidden_dim=256, num_layers=2, dropout=0.3, pretrained_embeddings=None):
        super(FitnessGRUBot, self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        if pretrained_embeddings is not None:
            self.embedding.weight.data.copy_(pretrained_embeddings)
            self.embedding.weight.requires_grad = True  # Allow fine-tuning

        # Encoder GRU
        self.encoder = nn.GRU(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True  # Bidirectional for better context understanding
        )

        # Decoder GRU - Correct indentation here
        self.decoder = nn.GRU(
            input_size=embed_dim,
            hidden_size=hidden_dim * 2,  # Account for bidirectional encoder
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )

        # Attention mechanism - simple dot product attention
        self.attention = nn.Linear(hidden_dim * 2, hidden_dim * 2) # Correct indentation here as well

        # Output projection
        self.fc_out = nn.Linear(hidden_dim * 2, vocab_size)

        # Additional dropout for regularization
        self.dropout = nn.Dropout(dropout)

        # Initialize weights
        for name, param in self.named_parameters():
            if 'weight' in name and 'embedding' not in name:
                nn.init.xavier_uniform_(param)

    def forward(self, src, tgt):
        # Source and target shape: [batch_size, seq_len]
        batch_size = src.shape[0]
        # Embed source and target
        src_embedded = self.dropout(self.embedding(src))  # [batch_size, src_len, embed_dim]
        tgt_embedded = self.dropout(self.embedding(tgt))  # [batch_size, tgt_len, embed_dim]

        # Encode source
        encoder_outputs, encoder_hidden = self.encoder(src_embedded)
        # encoder_outputs: [batch_size, src_len, hidden_dim*2]
        # encoder_hidden: [num_layers*2, batch_size, hidden_dim]

        # Process encoder hidden state for decoder
        encoder_hidden = encoder_hidden.view(self.encoder.num_layers, 2, batch_size, -1)
        encoder_hidden = torch.cat([encoder_hidden[:, 0], encoder_hidden[:, 1]], dim=2)
        # Now encoder_hidden: [num_layers, batch_size, hidden_dim*2]

        # Decode
        decoder_output, _ = self.decoder(tgt_embedded, encoder_hidden)
        # decoder_output: [batch_size, tgt_len, hidden_dim*2]

        # Apply attention over encoder outputs (optional, more advanced)
        # This is a simplified attention mechanism
        attn_weights = torch.bmm(decoder_output, encoder_outputs.transpose(1, 2))
        attn_weights = F.softmax(attn_weights, dim=2)  # [batch_size, tgt_len, src_len]
        context = torch.bmm(attn_weights, encoder_outputs)  # [batch_size, tgt_len, hidden_dim*2]

        # Combine context with decoder output (simple concatenation+projection for this example)
        combined = decoder_output + context

        # Project to vocabulary size
        output = self.fc_out(self.dropout(combined))  # [batch_size, tgt_len, vocab_size]

        return output

In [10]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = FitnessGRUBot(
    vocab_size=len(word2index),
    embed_dim=100,  # Match GloVe embedding dimension
    hidden_dim=256,
    num_layers=2,
    dropout=0.01,
    pretrained_embeddings=embedding_matrix
).to(device)

In [11]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler=None, num_epochs=30, device="cuda" if torch.cuda.is_available() else "cpu"):
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        total_train_loss = 0

        for src, tgt in train_loader:
            src, tgt = src.to(device), tgt.to(device)

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            output = model(src, tgt[:, :-1])

            # Flatten output and target for loss calculation
            output_flat = output.contiguous().view(-1, output.size(-1))
            target_flat = tgt[:, 1:].contiguous().view(-1)

            # Calculate loss
            loss = criterion(output_flat, target_flat)

            # Backward pass and optimize
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()

            total_train_loss += loss.item()

        # Validation phase
        model.eval()
        total_val_loss = 0

        with torch.no_grad():
            for src, tgt in val_loader:
                src, tgt = src.to(device), tgt.to(device)

                # Forward pass
                output = model(src, tgt[:, :-1])

                # Flatten output and target for loss calculation
                output_flat = output.contiguous().view(-1, output.size(-1))
                target_flat = tgt[:, 1:].contiguous().view(-1)

                # Calculate loss
                loss = criterion(output_flat, target_flat)

                total_val_loss += loss.item()

        # Calculate average losses
        avg_train_loss = total_train_loss / len(train_loader)
        avg_val_loss = total_val_loss / len(val_loader)

        # Print epoch results
        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

        # Learning rate scheduling
        if scheduler:
            scheduler.step(avg_val_loss)

    return model

In [12]:
criterion = nn.CrossEntropyLoss(ignore_index=0, label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, verbose=True
)



In [13]:
def generate_response_beam(
    model,
    user_input,
    max_length=20,
    beam_width=3,
    temperature=0.8,
    device="cuda" if torch.cuda.is_available() else "cpu"
):
    model.eval()
    input_ids = torch.tensor([encode_text(user_input.lower(), word2index)], dtype=torch.long).to(device)

    # Special token IDs
    sos_token = word2index["<sos>"]
    eos_token = word2index["<eos>"]
    pad_token = word2index["<pad>"]

    # Initialize beam search
    sequences = [(torch.tensor([[sos_token]], dtype=torch.long).to(device), 0.0)]
    finished_sequences = []

    with torch.no_grad():
        for step in range(max_length):
            all_candidates = []

            for seq, score in sequences:
                # Check if sequence has ended
                if seq[0, -1].item() == eos_token or seq.size(1) >= max_length:
                    finished_sequences.append((seq, score))
                    continue

                # Get model prediction
                output = model(input_ids, seq)
                logits = output[:, -1, :] / temperature
                probs = F.log_softmax(logits, dim=-1)

                # Get top k predictions
                topk_probs, topk_indices = torch.topk(probs, beam_width)

                # Create new candidate sequences
                for i in range(beam_width):
                    next_token_id = topk_indices[0, i].item()
                    next_score = topk_probs[0, i].item()

                    # Penalize repetition (simplified)
                    if seq.size(1) >= 2 and next_token_id == seq[0, -1].item() == seq[0, -2].item():
                        continue

                    # Create new sequence
                    new_seq = torch.cat([seq, torch.tensor([[next_token_id]], dtype=torch.long).to(device)], dim=1)

                    # Calculate score with length normalization
                    length_penalty = (5 + new_seq.size(1)) / 6
                    adjusted_score = (score + next_score) / length_penalty

                    all_candidates.append((new_seq, adjusted_score))

            # If no candidates left, break
            if not all_candidates:
                break

            # Keep top beam_width candidates
            sequences = sorted(all_candidates, key=lambda x: x[1], reverse=True)[:beam_width]

        # Add any remaining sequences to finished
        finished_sequences.extend(sequences)

        # Get best sequence
        if finished_sequences:
            finished_sequences = sorted(finished_sequences, key=lambda x: x[1], reverse=True)
            best_sequence = finished_sequences[0][0].squeeze().tolist()
        else:
            best_sequence = sequences[0][0].squeeze().tolist()

        # Decode
        decoded = [index2word.get(idx, "<unk>") for idx in best_sequence]

        # Clean up
        result = []
        for token in decoded:
            if token not in ["<sos>", "<eos>", "<pad>", "<unk>"]:
                result.append(token)

        return " ".join(result)

In [14]:
def post_process_response(response):
    if not response or len(response.split()) < 3:
        return "I don't have enough information to answer that question properly."

    # Basic cleaning
    response = ' '.join(response.split())  # Remove multiple spaces

    # Capitalize first letter
    response = response[0].upper() + response[1:]

    # Fix common issues
    response = response.replace(" i ", " I ")

    # Add period if missing final punctuation
    if not response[-1] in ['.', '!', '?']:
        response += '.'

    # Remove repeated sentences if any
    sentences = response.split('. ')
    unique_sentences = []
    for sentence in sentences:
        if sentence not in unique_sentences:
            unique_sentences.append(sentence)
    response = '. '.join(unique_sentences)

    return response

In [15]:
def get_fallback_response(query):
    query = query.lower()

    # Expanded fallback dictionary
    fallbacks = {
        "flexibility": "To improve flexibility, try daily stretching routines, yoga, or Pilates. Hold each stretch for 15-30 seconds and focus on major muscle groups. Consistency is key.",

        "strength training": "A good beginner strength routine includes 2-3 sessions per week focusing on compound movements like squats, push-ups, rows, and lunges. Start with bodyweight exercises before adding weights.",

        "water": "During workouts, aim to drink about 7-10 ounces of water every 10-20 minutes. For workouts under an hour, water is sufficient. For longer sessions, consider sports drinks to replace electrolytes.",

        "supplement": "Supplements aren't necessary to get fit. Focus on a balanced diet with adequate protein, carbs, and healthy fats. If considering supplements, protein powder can be convenient, but whole foods should be your primary nutrition source.",

        "cardio": "For cardiovascular fitness, aim for 150 minutes of moderate activity weekly. Options include walking, running, cycling, swimming, or any activity that elevates your heart rate.",

        "protein": "Most active adults should aim for 0.6-0.8 grams of protein per pound of body weight daily. Good sources include lean meats, eggs, dairy, legumes, and plant-based options like tofu.",

        "weight loss": "For healthy weight loss, focus on a moderate calorie deficit of 300-500 calories per day, combined with regular exercise. Aim for 1-2 pounds of weight loss per week.",

        "recovery": "Proper recovery includes adequate sleep (7-9 hours), proper nutrition, hydration, and rest days between intense workouts. Active recovery like light walking or yoga can also help.",

        "beginner": "As a beginner, start with 2-3 days of exercise per week combining basic strength movements and moderate cardio. Focus on proper form rather than intensity and gradually increase duration and difficulty."
    }

    # Check if any keywords match
    for keyword, response in fallbacks.items():
        if keyword in query:
            return response

    return None

In [16]:
def chat_with_fitness_bot(model):
    print("FitBot")
    print("Ask me any fitness questions! (Type 'exit' to quit)")

    while True:
        user_input = input("\nYou: ")
        if user_input.lower() in ["exit", "quit", "bye"]:
            print("Chatbot: Thanks for chatting! Stay fit and healthy!")
            break

        # First check if we should use a fallback response
        fallback = get_fallback_response(user_input)
        if fallback:
            print(f"Chatbot: {fallback}")
            continue

        # Otherwise generate response with the model
        try:
            raw_response = generate_response_beam(model, user_input)
            cleaned_response = post_process_response(raw_response)

            # Quality check
            if len(cleaned_response.split()) < 5 or cleaned_response.count(',') > 5:
                print("Chatbot: That's a great fitness question. For personalized advice, consider consulting with a certified fitness professional.")
            else:
                print(f"Chatbot: {cleaned_response}")
        except Exception as e:
            print(f"Chatbot: I'm having trouble processing that question. Could you rephrase it?")
            print(f"Error: {str(e)}")

In [None]:
print("Starting training...")
trained_model = train_model(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    num_epochs=30,
)

# Save the final model
torch.save(trained_model.state_dict(), "final_fitbot_gru_model.pth")
print("Training complete! Model saved as 'final_fitbot_gru_model.pth'")

Starting training...
Epoch 1/30 - Train Loss: 6.6370, Val Loss: 6.0210
Epoch 2/30 - Train Loss: 5.6309, Val Loss: 5.5345
Epoch 3/30 - Train Loss: 5.0610, Val Loss: 5.3178
Epoch 4/30 - Train Loss: 4.5683, Val Loss: 5.2324
Epoch 5/30 - Train Loss: 4.0839, Val Loss: 5.2133
Epoch 6/30 - Train Loss: 3.6267, Val Loss: 5.2354
Epoch 7/30 - Train Loss: 3.2468, Val Loss: 5.2724
Epoch 8/30 - Train Loss: 2.9287, Val Loss: 5.3273
Epoch 9/30 - Train Loss: 2.6574, Val Loss: 5.3963
Epoch 10/30 - Train Loss: 2.4233, Val Loss: 5.4723


In [None]:
try:
    model.load_state_dict(torch.load("final_fitbot_gru_model.pth"))
    print("Loaded pretrained model successfully!")
    chat_with_fitness_bot(model)
except:
    print("No pretrained model found. Please train the model first.")

Loaded pretrained model successfully!
FitBot
Ask me any fitness questions! (Type 'exit' to quit)

You: What are the benefits of cross-training?
Chatbot: It reduces overuse injuries, improves overall fitness, and keeps workouts mentally engaging.
No pretrained model found. Please train the model first.
