# **Install Dependencies**

In [1]:
!pip install pandas transformers torch gradio tqdm nltk rouge-score

Collecting gradio
  Downloading gradio-5.9.1-py3-none-any.whl.metadata (16 kB)
Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.6-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.5.2 (from gradio)
  Downloading gradio_client-1.5.2-py3-none-any.whl.metadata (7.1 kB)
Collecting markupsafe~=2.0 (from gradio)
  Downloading MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)


# **Import Required Libraries**

In [2]:
import pandas as pd
from transformers import T5Tokenizer, T5ForConditionalGeneration, AdamW
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import os
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer

# **Load Dataset and Prepare Inputs/Outputs**

In [3]:
file_path = "Gym.csv"
dataset = pd.read_csv(file_path)

dataset["input"] = (
    "Sex: " + dataset["Sex"] +
    " | Age: " + dataset["Age"].astype(str) +
    " | Height: " + dataset["Height"].astype(str) +
    " | Weight: " + dataset["Weight"].astype(str) +
    " | Hypertension: " + dataset["Hypertension"] +
    " | Diabetes: " + dataset["Diabetes"] +
    " | BMI: " + dataset["BMI"].astype(str) +
    " | Level: " + dataset["Level"] +
    " | Fitness Goal: " + dataset["Fitness Goal"] +
    " | Fitness Type: " + dataset["Fitness Type"]
)

dataset["output"] = (
    "Exercises: " + dataset["Exercises"] +
    " | Equipment: " + dataset["Equipment"] +
    " | Diet: " + dataset["Diet"] +
    " | Recommendation: " + dataset["Recommendation"] +
    " | Caloric Intake: " + dataset["Caloric Intake"].astype(str)
)

dataset_for_t5 = dataset[["input", "output"]]
train_val_data, test_data = train_test_split(dataset_for_t5, test_size=0.1, random_state=42)
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)

print(f"Training data size: {len(train_data)}")
print(f"Validation data size: {len(val_data)}")
print(f"Test data size: {len(test_data)}")

Training data size: 10504
Validation data size: 2626
Test data size: 1459


# **Create Dataset Class**

In [4]:
class GymDataset(Dataset):
    def __init__(self, inputs, outputs, tokenizer, max_length=512):
        self.inputs = inputs
        self.outputs = outputs
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_text = self.inputs[idx]
        output_text = self.outputs[idx]

        input_tokens = self.tokenizer(
            input_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt"
        )
        output_tokens = self.tokenizer(
            output_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt"
        )

        return {
            "input_ids": input_tokens["input_ids"].squeeze(),
            "attention_mask": input_tokens["attention_mask"].squeeze(),
            "labels": output_tokens["input_ids"].squeeze(),
        }

# **Initialize Model and Tokenizer**

In [5]:
model_name = "t5-small"
tokenizer = T5Tokenizer.from_pretrained(model_name)

train_dataset = GymDataset(train_data["input"].tolist(), train_data["output"].tolist(), tokenizer)
val_dataset = GymDataset(val_data["input"].tolist(), val_data["output"].tolist(), tokenizer)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/2.32k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [6]:
model = T5ForConditionalGeneration.from_pretrained(model_name)
model = model.to("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    print(f"Using GPU: {gpu_name}")
    if "T4" not in gpu_name:
        print("WARNING: You are not using a T4 GPU. Consider upgrading your runtime in Google Colab to a T4 GPU.")
else:
    print("No GPU found. Using CPU instead.")

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Using GPU: Tesla T4


# **Set Optimizer and Hyperparameters/Validation Loss Calculation/Train Model Function**

In [9]:
learning_rate = 1e-5
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)

epochs = 42
gradient_clipping = 1

checkpoint_dir = "checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

# Validation loss calculation
def compute_validation_loss(val_loader):
    model.eval()
    val_loss = 0.0

    for batch in tqdm(val_loader, desc="Computing Validation Loss", leave=False):
        input_ids = batch["input_ids"].to(model.device)
        attention_mask = batch["attention_mask"].to(model.device)
        labels = batch["labels"].to(model.device)

        with torch.no_grad():
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            val_loss += outputs.loss.item()

    avg_val_loss = val_loss / len(val_loader)
    return avg_val_loss

# Training function
def train_model(save_training_file="training_state.pth"):
    start_epoch = 0
    best_val_loss = float("inf")

    # Check if a saved training state exists
    if os.path.exists(save_training_file):
        print(f"Loading training state from {save_training_file}")
        training_state = torch.load(save_training_file)
        model.load_state_dict(training_state["model_state_dict"])
        optimizer.load_state_dict(training_state["optimizer_state_dict"])
        start_epoch = training_state["epoch"]
        best_val_loss = training_state["best_val_loss"]
    else:
        print("No saved training state found. Starting from scratch.")

    for epoch in range(start_epoch, epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        train_loss = 0.0
        model.train()

        for batch in tqdm(train_loader, desc=f"Training Progress (Epoch {epoch + 1})", leave=False):
            input_ids = batch["input_ids"].to(model.device)
            attention_mask = batch["attention_mask"].to(model.device)
            labels = batch["labels"].to(model.device)

            # Forward pass
            optimizer.zero_grad()
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            loss = outputs.loss

            # Backward pass
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), gradient_clipping)
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = compute_validation_loss(val_loader)

        # Save the model checkpoint
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            checkpoint_path = os.path.join(checkpoint_dir, f"best_model_epoch_{epoch + 1}.pth")
            torch.save(model.state_dict(), checkpoint_path)
            print(f"Model checkpoint saved to {checkpoint_path}")

        # Save training
        training_state = {
            "epoch": epoch + 1,
            "best_val_loss": best_val_loss,
            "optimizer_state_dict": optimizer.state_dict(),
            "model_state_dict": model.state_dict(),
        }
        torch.save(training_state, save_training_file)
        print(f"Training state saved to {save_training_file}")

        print(f"Epoch {epoch + 1} Complete: Training Loss: {avg_train_loss:.4f} | Validation Loss: {avg_val_loss:.4f}\n")

In [10]:
train_model()
model.save_pretrained("gym_recommendation_t5")
tokenizer.save_pretrained("gym_recommendation_t5")

Loading training state from training_state.pth


  training_state = torch.load(save_training_file)


Epoch 41/42




Model checkpoint saved to checkpoints/best_model_epoch_41.pth
Training state saved to training_state.pth
Epoch 41 Complete: Training Loss: 0.0130 | Validation Loss: 0.0118

Epoch 42/42




Model checkpoint saved to checkpoints/best_model_epoch_42.pth
Training state saved to training_state.pth
Epoch 42 Complete: Training Loss: 0.0129 | Validation Loss: 0.0117



('gym_recommendation_t5/tokenizer_config.json',
 'gym_recommendation_t5/special_tokens_map.json',
 'gym_recommendation_t5/spiece.model',
 'gym_recommendation_t5/added_tokens.json')

# **Testing and Evaluation**

In [15]:
test_dataset = GymDataset(test_data["input"].tolist(), test_data["output"].tolist(), tokenizer)
test_loader = DataLoader(test_dataset, batch_size=8)

# Perform testing
model.eval()
predictions, ground_truths = [], []

for batch in tqdm(test_loader, desc="Testing Progress"):
    with torch.no_grad():
        outputs = model.generate(
            input_ids=batch["input_ids"].to(model.device),
            attention_mask=batch["attention_mask"].to(model.device),
            max_length=1024,
            num_beams=4
        )
    predictions.extend(tokenizer.batch_decode(outputs, skip_special_tokens=True))
    ground_truths.extend(tokenizer.batch_decode(batch["labels"], skip_special_tokens=True))

# Evaluate metrics
smoothing_function = SmoothingFunction().method1

# BLEU scores
bleu_scores = [
    sentence_bleu([truth.split()], pred.split(), smoothing_function=smoothing_function)
    for truth, pred in zip(ground_truths, predictions)
]
avg_bleu = sum(bleu_scores) / len(bleu_scores)

# ROUGE evaluation
scorer = rouge_scorer.RougeScorer(["rouge1", "rougeL"], use_stemmer=True)
rouge_scores = [scorer.score(truth, pred) for truth, pred in zip(ground_truths, predictions)]
avg_rouge1 = sum([score["rouge1"].fmeasure for score in rouge_scores]) / len(rouge_scores)
avg_rougeL = sum([score["rougeL"].fmeasure for score in rouge_scores]) / len(rouge_scores)

print(f"Average BLEU: {avg_bleu:.4f}, ROUGE-1: {avg_rouge1:.4f}, ROUGE-L: {avg_rougeL:.4f}")

Testing Progress: 100%|██████████| 183/183 [30:42<00:00, 10.07s/it]


Average BLEU: 0.9470, ROUGE-1: 0.9651, ROUGE-L: 0.9592


# **Reasoning and UI**

In [23]:
import gradio as gr
import requests

# Helper Functions
def determine_level(bmi):
    if bmi < 18.5:
        return "Underweight"
    elif 18.5 <= bmi < 24.9:
        return "Normal weight"
    elif 25 <= bmi < 29.9:
        return "Overweight"
    else:
        return "Obese"

def call_exercise_api(query):
    url = "https://api.api-ninjas.com/v1/exercises"
    headers = {"X-Api-Key": "CwRxTYkn8E5QEH/3dk9MsA==RGAXrWLIEyn63eaf"}
    params = {"muscle": query.lower()}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        exercises = response.json()
        if exercises:
            result = ["### Suggested Exercises\n"]
            for exercise in exercises:
                result.append(
                    f"**Exercise**: {exercise['name']}  \n"
                    f"**Muscle**: {exercise['muscle']}  \n"
                    f"**Type**: {exercise['type']}  \n"
                    f"**Difficulty**: {exercise['difficulty']}  \n"
                    f"---\n"
                )
            return "\n".join(result)
        else:
            return f"No exercises found for the muscle '{query}'. Please try a different query."
    else:
        return "Sorry, I couldn't fetch the workout details right now."

def call_exercise_instructions(exercise_name):
    url = "https://api.api-ninjas.com/v1/exercises"
    headers = {"X-Api-Key": "CwRxTYkn8E5QEH/3dk9MsA==RGAXrWLIEyn63eaf"}
    params = {"name": exercise_name}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        exercises = response.json()
        if exercises:
            for exercise in exercises:
                return (
                    f"### Instruction for {exercise['name']}\n"
                    f"{exercise['instructions']}"
                )
        else:
            return f"No instructions found for the exercise '{exercise_name}'."
    else:
        return "Sorry, I couldn't fetch the instructions right now."

def call_nutrition_api(query):
    url = "https://api.api-ninjas.com/v1/nutrition"
    headers = {"X-Api-Key": "CwRxTYkn8E5QEH/3dk9MsA==RGAXrWLIEyn63eaf"}
    params = {"query": query}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        nutrition_info = response.json()
        result = ["### Nutrition Information\n"]
        for item in nutrition_info:
            result.append(
                f"**Item**: {item['name']}  \n"
                f"**Calories**: {item['calories']} kcal  \n"
                f"**Protein**: {item['protein_g']} g  \n"
                f"**Fat**: {item['fat_total_g']} g  \n"
                f"**Carbs**: {item['carbohydrates_total_g']} g  \n"
                f"---\n"
            )
        return "\n".join(result)
    else:
        return "Sorry, I couldn't fetch the nutrition details right now."

def parse_response(response, question):
    question_keywords = {
        "equipment": "Equipment",
        "diet": "Diet",
        "exercise": "Exercises",
        "recommendation": "Recommendation",
        "caloric intake": "Caloric Intake"
    }

    introductions = {
        "Equipment": "Here is the information about the equipment:",
        "Diet": "Let me provide you with some dietary insights:",
        "Exercises": "Here are the suggested exercises:",
        "Recommendation": "Based on your preferences, here's a recommendation:",
        "Caloric Intake": "Here is the caloric intake information you requested (in kcal/day):"
    }

    for keyword, category in question_keywords.items():
        if keyword in question.lower():
            for part in response.split("|"):
                if part.strip().startswith(category):
                    introduction = introductions.get(category, "Here's what I found:")
                    return f"{introduction}\n{part.strip()}"

    return "I'm sorry, I couldn't find an answer to your question."

# Main chatbot logic
def chatbot_response(sex, age, height, weight, hypertension, diabetes, fitness_goal, fitness_type, question):
    bmi = round(weight / (height ** 2), 2)
    level = determine_level(bmi)

    if question.lower().startswith("workout for"):
        query = question[len("workout for"):].strip()
        return call_exercise_api(query)
    elif question.lower().startswith("instruction for"):
        exercise_name = question[len("instruction for"):].strip()
        return call_exercise_instructions(exercise_name)
    elif question.lower().startswith("nutrition for"):
        query = question[len("nutrition for"):].strip()
        return call_nutrition_api(query)
    else:
        user_input = (
            f"Sex: {sex} | Age: {age} | Height: {height} | Weight: {weight} | "
            f"BMI: {bmi} | Level: {level} | Hypertension: {hypertension} | "
            f"Diabetes: {diabetes} | Fitness Goal: {fitness_goal} | Fitness Type: {fitness_type} | Question: {question}"
        )
        input_tokens = tokenizer(
            user_input, max_length=1024, padding="max_length", truncation=True, return_tensors="pt"
        ).to(model.device)

        model.eval()
        with torch.no_grad():
            output_tokens = model.generate(
                input_ids=input_tokens["input_ids"].to(model.device),
                attention_mask=input_tokens["attention_mask"].to(model.device),
                max_length=1024,
                num_beams=4,
                length_penalty=2.0,
                early_stopping=True,
                temperature=1.2,
                top_k=50,
                top_p=0.95,
                do_sample=True
            )

        response = tokenizer.decode(output_tokens[0], skip_special_tokens=True)
        return parse_response(response, question)

# Gradio Interface
def chatbot_interface(sex, age, height, weight, hypertension, diabetes, fitness_goal, fitness_type, question):
    try:
        weight = float(weight)
        height = float(height)
        bmi = round(weight / (height ** 2), 2)
        level = determine_level(bmi)
        bmi_info = f"Your BMI: {bmi}, Level: {level}"
        chatbot_output = chatbot_response(sex, age, height, weight, hypertension, diabetes, fitness_goal, fitness_type, question)
        return bmi_info + "\n\n" + chatbot_output
    except Exception as e:
        return f"Error: {str(e)}"

# Gradio Inputs
inputs = [
    gr.Dropdown(["Male", "Female"], label="Sex"),
    gr.Textbox(label="Age", placeholder="Enter your age"),
    gr.Textbox(label="Height (in meters)", placeholder="e.g., 1.75"),
    gr.Textbox(label="Weight (in kg)", placeholder="e.g., 70"),
    gr.Dropdown(["Yes", "No"], label="Hypertension"),
    gr.Dropdown(["Yes", "No"], label="Diabetes"),
    gr.Dropdown(["Weight Gain", "Weight Loss"], label="Fitness Goal"),
    gr.Dropdown(["Muscular Fitness", "Cardio Fitness"], label="Fitness Type"),
    gr.Textbox(label="Your Question", placeholder="Ask about workouts or nutrition (e.g., 'workout for legs' or 'instruction for squats')")
]

outputs = gr.Markdown(label="Chatbot Response")

# Launch Interface
interface = gr.Interface(
    fn=chatbot_interface,
    inputs=inputs,
    outputs=outputs,
    title="Gym Chatbot",
    description="Ask fitness-related questions, get workout recommendations, and find nutrition details!"
)

In [24]:
if __name__ == "__main__":
    interface.launch()

Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://418bd7ab1a60f44bf9.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


UPDATED NEW 10:17PM