In [1]:
import os
import json
import random
import gym
import numpy as np
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from dotenv import load_dotenv
import google.generativeai as genai
from typing import Dict, List, Tuple
import time
from collections import defaultdict

  _warn(("h5py is running against HDF5 {0} when it was built against {1}, "


In [4]:
import os
import json
import random
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from dotenv import load_dotenv
import google.generativeai as genai
from typing import Dict, List, Tuple
import time
from collections import defaultdict

# Load Gemini API Key
def load_api_key() -> None:
    env_path = "../models/key.env"
    load_dotenv(env_path)
    api_key = os.getenv("GEMINI_API_KEY") or "AIzaSyCcE1d9GrpAGiQh3Xrqlhs5E_wx4oAZ4d8"
    if not api_key:
        raise ValueError("❌ Gemini API Key not found.")
    genai.configure(api_key=api_key)
    print("✅ Gemini API Key loaded successfully!")

# Load dataset
def load_dataset() -> List[Dict]:
    dataset_path = "../datasets/game.json"
    try:
        with open(dataset_path, "r") as file:
            dataset = json.load(file)
        if "intents" not in dataset:
            raise ValueError("❌ Dataset must have an 'intents' key.")
        for intent in dataset["intents"]:
            if "scenario" not in intent or "responses" not in intent:
                raise ValueError(f"❌ Each intent must have 'scenario' and 'responses': {intent}")
            for response in intent["responses"]:
                if "option" not in response or "reward" not in response:
                    raise ValueError("❌ Each response must have 'option' and 'reward'.")
        print(f"✅ Dataset loaded successfully with {len(dataset['intents'])} scenarios!")
        return dataset["intents"]
    except FileNotFoundError:
        raise FileNotFoundError(f"❌ Dataset file '{dataset_path}' not found.")
    except json.JSONDecodeError:
        raise ValueError(f"❌ Failed to parse JSON file '{dataset_path}'.")

# Stress tracking
class StressMetrics:
    def __init__(self):
        self.stress_level = 0
        self.stress_history = []
        self.response_times = []
        self.incorrect_choices = 0
        self.correct_choices = 0

    def update_stress(self, reward: float, response_time: float) -> None:
        stress_change = 0
        if reward < 0:
            self.incorrect_choices += 1
            stress_change = 0.2
        else:
            self.correct_choices += 1
            stress_change = -0.1
        stress_change += min(response_time / 10, 0.5)
        self.stress_level = max(0, min(1, self.stress_level + stress_change))
        self.stress_history.append(self.stress_level)
        self.response_times.append(response_time)

    def get_summary(self) -> Dict:
        return {
            "current_stress": self.stress_level,
            "average_stress": sum(self.stress_history) / len(self.stress_history) if self.stress_history else 0,
            "correct_choices": self.correct_choices,
            "incorrect_choices": self.incorrect_choices,
            "average_response_time": sum(self.response_times) / len(self.response_times) if self.response_times else 0
        }

# Environment
class EmotionGameEnv(gym.Env):
    metadata = {'render_modes': ['human']}

    def __init__(self):
        super().__init__()
        self.dataset = load_dataset()
        self.stress_metrics = StressMetrics()
        self.interaction_history = []
        self.current_episode = 0
        self.emotions = ["sad", "angry", "happy", "stress", "neutral", "fear", "anxious"]
        self.difficulty_levels = ["easy", "medium", "hard"]
        self.observation_space = spaces.Dict({
            "emotion": spaces.Discrete(len(self.emotions)),
            "difficulty": spaces.Discrete(len(self.difficulty_levels))
        })
        self.action_space = spaces.Discrete(4)
        self.max_episode_length = 10
        self.state = None
        self.current_scenario = None
        self.episode_step = 0
        self.available_scenarios = self._categorize_scenarios()

    def _categorize_scenarios(self) -> Dict[str, List[Dict]]:
        categorized = defaultdict(list)
        for scenario in self.dataset:
            categorized[scenario.get("emotion", "neutral")].append(scenario)
        return categorized

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.current_episode += 1
        self.episode_step = 0
        emotion = random.choice(self.emotions)
        difficulty = random.choice(self.difficulty_levels)
        possible_scenarios = [
            s for s in self.available_scenarios.get(emotion, [])
            if s.get("difficulty") == difficulty
        ]
        if not possible_scenarios:
            possible_scenarios = self.dataset
        self.current_scenario = random.choice(possible_scenarios)
        self.state = {
            "emotion": self.emotions.index(emotion),
            "difficulty": self.difficulty_levels.index(difficulty)
        }
        return self.state, {}

    def step(self, action: int) -> Tuple[Dict, float, bool, bool, Dict]:
        start_time = time.time()
        self.episode_step += 1
        try:
            responses = self.current_scenario["responses"]
            if action >= len(responses):
                reward = -1
                response_text = "Invalid choice"
            else:
                response_data = responses[action]
                reward = response_data["reward"]
                response_text = response_data.get("text", response_data.get("option", ""))
                difficulty = self.difficulty_levels[self.state["difficulty"]]
                if difficulty == "hard":
                    reward *= 1.5
                elif difficulty == "easy":
                    reward *= 0.8
        except Exception:
            reward = -1
            response_text = "Error in scenario"

        response_time = time.time() - start_time
        self.stress_metrics.update_stress(reward, response_time)

        self.interaction_history.append({
            "episode": self.current_episode,
            "step": self.episode_step,
            "emotion": self.emotions[self.state["emotion"]],
            "scenario": self.current_scenario["scenario"],
            "action": action,
            "response": response_text,
            "reward": reward,
            "response_time": response_time,
            "stress_level": self.stress_metrics.stress_level
        })

        done = self.episode_step >= self.max_episode_length or reward < -0.5

        info = {
            "scenario": self.current_scenario["scenario"],
            "response": response_text,
            "stress_level": self.stress_metrics.stress_level,
            "correct_response": self.current_scenario.get("correct_response", ""),
            "difficulty": self.current_scenario.get("difficulty", self.difficulty_levels[self.state["difficulty"]])
        }

        next_state = {
            "emotion": self.state["emotion"],
            "difficulty": random.choice([
                self.state["difficulty"]] * 3 + [
                max(0, self.state["difficulty"] - 1),
                min(len(self.difficulty_levels)-1, self.state["difficulty"] + 1)
            ])
        }

        return next_state, reward, done, False, info

    def render(self):
        print(f"\nEmotion: {self.emotions[self.state['emotion']]}, "
              f"Difficulty: {self.difficulty_levels[self.state['difficulty']]}, "
              f"Scenario: {self.current_scenario['scenario']}, "
              f"Stress Level: {self.stress_metrics.stress_level:.2f}")

    def get_history(self):
        return self.interaction_history

    def get_stress_summary(self):
        return self.stress_metrics.get_summary()

# Gemini-based therapy
def get_therapy_response(emotion: str, scenario: str, stress_level: float, history: List[Dict]) -> str:
    recent_history = history[-3:] if history else []
    context = "\n".join(
        f"Previous interaction: When feeling {h['emotion']} in situation '{h['scenario']}', "
        f"the response was '{h['response']}' resulting in reward {h['reward']:.1f} and stress {h['stress_level']:.2f}."
        for h in recent_history
    )

    therapy_approaches = {
        "stress": "cognitive behavioral therapy",
        "anxious": "grounding and mindfulness",
        "angry": "anger management",
        "sad": "positive reframing",
        "happy": "positive reinforcement",
        "fear": "exposure therapy",
        "neutral": "general counseling"
    }
    approach = therapy_approaches.get(emotion, "general counseling")

    prompt = (
        f"You are a compassionate AI therapist trained in CBT, DBT, and mindfulness.\n"
        f"The user feels {emotion} (stress level {stress_level:.2f}) in scenario: '{scenario}'.\n"
        f"Use {approach} to help them.\n"
        f"Here is context:\n{context}\nTherapeutic response:"
    )
    try:
        model = genai.GenerativeModel("gemini-pro")
        response = model.generate_content(prompt)
        return response.text.strip()
    except Exception as e:
        return f"⚠️ Error from Gemini: {e}"

# Training
def train_rl_model(env: EmotionGameEnv, total_timesteps: int = 20000) -> PPO:
    check_env(env)
    eval_callback = EvalCallback(
        env,
        callback_on_new_best=StopTrainingOnRewardThreshold(reward_threshold=5.0, verbose=1),
        verbose=1,
        eval_freq=1000,
        best_model_save_path="./best_models/"
    )
    model = PPO("MultiInputPolicy", env, verbose=1, learning_rate=0.0003, tensorboard_log="./tensorboard_logs/")
    model.learn(total_timesteps=total_timesteps, callback=eval_callback)
    model.save("emotional_rl_agent_enhanced")
    return model

# Evaluation
def evaluate_model(env: EmotionGameEnv, model_path: str = "emotional_rl_agent_enhanced", num_tests: int = 5):
    print("\n📊 Evaluation Started")
    try:
        model = PPO.load(model_path)
    except:
        print("Model not found.")
        return

    for test in range(num_tests):
        print(f"\n=== Test Case {test+1} ===")
        obs, _ = env.reset()
        done = False
        while not done:
            env.render()
            action, _ = model.predict(obs)
            obs, reward, done, _, info = env.step(action)
            emotion = env.emotions[obs["emotion"]]
            gpt_response = get_therapy_response(emotion, info["scenario"], env.stress_metrics.stress_level, env.get_history())
            print(f"\nTherapeutic Feedback: {gpt_response}")

# Entry point
def main():
    load_api_key()
    env = EmotionGameEnv()
    train_new = input("Train new model? (y/n): ").lower() == 'y'
    if train_new:
        model = train_rl_model(env)
    else:
        model = PPO.load("emotional_rl_agent_enhanced")
    evaluate_model(env, num_tests=3)

if __name__ == "__main__":
    main()


✅ Gemini API Key loaded successfully!
✅ Dataset loaded successfully with 79 scenarios!
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ./tensorboard_logs/PPO_1
Eval num_timesteps=1000, episode_reward=10.80 +/- 17.29
Episode length: 4.20 +/- 3.06
---------------------------------
| eval/              |          |
|    mean_ep_length  | 4.2      |
|    mean_reward     | 10.8     |
| time/              |          |
|    total_timesteps | 1000     |
---------------------------------




New best mean reward!
Stopping training because the mean reward 10.80  is above the threshold 5.0

📊 Evaluation Started

=== Test Case 1 ===

Emotion: angry, Difficulty: easy, Scenario: You recently lost something or someone important to you. How do you cope?, Stress Level: 0.40

Therapeutic Feedback: ⚠️ Error from Gemini: 404 models/gemini-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.

Emotion: angry, Difficulty: easy, Scenario: You recently lost something or someone important to you. How do you cope?, Stress Level: 0.30

Therapeutic Feedback: ⚠️ Error from Gemini: 404 models/gemini-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.

Emotion: angry, Difficulty: easy, Scenario: You recently lost something or someone important to you. How do you cope?, Stress Le