In [None]:
import os
import json
import logging
import sys
from datetime import datetime
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from collections import defaultdict
import uuid
import re
from difflib import SequenceMatcher
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager, config_list_from_json
from autogen.agentchat.contrib.capabilities.transform_messages import TransformMessages
from autogen.agentchat.contrib.capabilities.agent_capability import AgentCapability
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
from langchain_openai import AzureChatOpenAI
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain.memory import ConversationBufferMemory as LCConversationBufferMemory
from langchain.memory import ConversationSummaryBufferMemory








In [None]:
# Set up logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)


In [6]:
import autogen.agentchat.contrib.capabilities
print(dir(autogen.agentchat.contrib.capabilities))


['__all__', '__annotations__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'text_compressors', 'transform_messages', 'transforms', 'transforms_util']


In [7]:
from autogen.agentchat.contrib.capabilities import transform_messages
print(dir(transform_messages))


['Any', 'MessageTransform', 'TYPE_CHECKING', 'TransformMessages', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'colored', 'copy']


In [3]:
# Apply nest_asyncio to allow running asyncio in Jupyter-like environments
nest_asyncio.apply()

In [None]:

# Set up logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)

# Azure OpenAI Configuration
llm_config = {
    "config_list": [
        {
            "model": "gpt-4o",
            "api_key": "",
            "base_url": "",
            "api_type": "azure",
            "api_version": "2025-01-01-preview"
        }
    ],
    "temperature": 0.7,
    "timeout": 120,
    "cache_seed": 42
}




In [None]:
# Create directories for data storage
os.makedirs("data/conversation_history", exist_ok=True)
os.makedirs("data/progress_data", exist_ok=True)
os.makedirs("data/student_conversations", exist_ok=True)

In [None]:
# Agent Configuration
class AgentConfig:
    @staticmethod
    def get_tutor_system_message() -> str:
        return """You are an expert Educational Tutor Agent with deep knowledge across academic subjects. Adapt explanations to the student's grade level, use multiple teaching approaches, provide step-by-step breakdowns, and encourage critical thinking. Evaluate responses, identify knowledge gaps, and suggest practice. Use age-appropriate language, provide examples, and maintain a supportive tone. Personalize responses based on student needs and track progress."""

    @staticmethod
    def get_student_system_message() -> str:
        return """You are a Student Agent representing a learner. Ask clarifying questions, request examples, express learning needs, and engage actively in problem-solving. Share thought processes, seek feedback, and show curiosity. Admit when you don't understand and build on previous knowledge."""

    @staticmethod
    def get_progress_tracker_system_message() -> str:
        return """You are a Progress Tracker Agent. Monitor student performance, identify patterns, and track improvement. Evaluate responses, identify gaps, and provide data-driven insights. Generate detailed, student-friendly progress reports, suggest focus areas, and recommend personalized learning paths based on the provided data."""

In [None]:
# Conversation Memory
class EnhancedConversationMemory:
    """Advanced conversation memory with LangChain integration and summarization."""
    
    def __init__(
        self,
        session_id: Optional[str] = None,
        max_token_limit: int = 4000,
        storage_path: str = "data/conversation_history"
    ):
        self.session_id = session_id or str(uuid.uuid4())
        self.storage_path = storage_path
        self.conversation_history: List[Dict[str, Any]] = []
        self.metadata: Dict[str, Any] = {
            "session_id": self.session_id,
            "created_at": datetime.now().isoformat(),
            "updated_at": datetime.now().isoformat(),
            "message_count": 0,
            "topics_covered": [],
            "subjects_discussed": []
        }
        os.makedirs(self.storage_path, exist_ok=True)
        self._load_conversation()
    
    def add_message(self, role: str, content: str, metadata: Optional[Dict[str, Any]] = None):
        """Add a message to the conversation history."""
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            "message_id": str(uuid.uuid4()),
            "metadata": metadata or {}
        }
        
        self.conversation_history.append(message)
        self.metadata["message_count"] += 1
        self.metadata["updated_at"] = datetime.now().isoformat()
        
        if metadata and "subject" in metadata and metadata["subject"] not in self.metadata["subjects_discussed"]:
            self.metadata["subjects_discussed"].append(metadata["subject"])
        if metadata and "topic" in metadata and metadata["topic"] not in self.metadata["topics_covered"]:
            self.metadata["topics_covered"].append(metadata["topic"])
        
        self._save_conversation()
    
    def get_messages(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """Get conversation messages with optional limit."""
        if limit:
            return self.conversation_history[-limit:]
        return self.conversation_history
    
    def get_langchain_format(self) -> List[BaseMessage]:
        """Convert messages to LangChain format."""
        messages = []
        for msg in self.conversation_history:
            role = msg["role"]
            content = msg["content"]
            
            if role == "system":
                messages.append(SystemMessage(content=content))
            elif role == "user" or role == "human":
                messages.append(HumanMessage(content=content))
            elif role == "assistant" or role == "ai" or role == "tutor":
                messages.append(AIMessage(content=content))
        
        return messages
    
    def clear_memory(self):
        """Clear conversation history."""
        self.conversation_history = []
        self.metadata["message_count"] = 0
        self.metadata["updated_at"] = datetime.now().isoformat()
        self._save_conversation()
    
    def get_conversation_summary(self) -> str:
        """Generate a summary of the conversation."""
        if not self.conversation_history:
            return "No conversation history available."
        
        summary_parts = [
            f"Session ID: {self.session_id}",
            f"Messages: {self.metadata['message_count']}",
            f"Topics: {', '.join(self.metadata.get('topics_covered', []))}",
            f"Subjects: {', '.join(self.metadata.get('subjects_discussed', []))}"
        ]
        
        return "\n".join(summary_parts)
    
    def _save_conversation(self):
        """Save conversation to persistent storage."""
        filename = f"{self.session_id}.json"
        filepath = os.path.join(self.storage_path, filename)
        
        data = {
            "metadata": self.metadata,
            "conversation_history": self.conversation_history
        }
        
        try:
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
        except Exception as e:
            logger.error(f"Error saving conversation: {str(e)}")
    
    def _load_conversation(self):
        """Load existing conversation from storage."""
        filename = f"{self.session_id}.json"
        filepath = os.path.join(self.storage_path, filename)
        
        if os.path.exists(filepath):
            try:
                with open(filepath, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    self.metadata = data.get("metadata", self.metadata)
                    self.conversation_history = data.get("conversation_history", [])
            except Exception as e:
                logger.error(f"Error loading conversation: {str(e)}")

In [None]:
# Progress Memory
@dataclass
class LearningProgress:
    subject: str
    topic: str
    skill_level: float
    confidence_level: float
    attempts: int
    successful_attempts: int
    last_interaction: str
    improvement_trend: str
    recommended_actions: List[str]

@dataclass
class LearningSession:
    session_id: str
    student_id: str
    start_time: str
    end_time: Optional[str]
    subjects_covered: List[str]
    topics_covered: List[str]
    questions_asked: int
    questions_answered_correctly: int
    engagement_score: float
    session_summary: str

class ProgressMemory:
    def __init__(self, student_id: str, storage_path: str = "data/progress_data"):
        self.student_id = student_id
        self.storage_path = storage_path
        self.progress_data: Dict[str, Dict[str, LearningProgress]] = defaultdict(dict)
        self.learning_sessions: List[LearningSession] = []
        self.analytics: Dict[str, Any] = {
            "total_sessions": 0,
            "total_questions": 0,
            "average_accuracy": 0.0,
            "favorite_subjects": [],
            "challenging_topics": []
        }
        os.makedirs(self.storage_path, exist_ok=True)
        self._load_progress()
    
    def start_session(self, session_id: str) -> LearningSession:
        session = LearningSession(
            session_id=session_id,
            student_id=self.student_id,
            start_time=datetime.now().isoformat(),
            end_time=None,
            subjects_covered=[],
            topics_covered=[],
            questions_asked=0,
            questions_answered_correctly=0,
            engagement_score=0.0,
            session_summary=""
        )
        self.learning_sessions.append(session)
        self.analytics["total_sessions"] += 1
        return session
    
    def end_session(self, session_id: str, session_summary: str):
        for session in self.learning_sessions:
            if session.session_id == session_id:
                session.end_time = datetime.now().isoformat()
                session.session_summary = session_summary
                self._update_analytics()
                self._save_progress()
                break
    
    def update_progress(self, subject: str, topic: str, performance_score: float, confidence_level: float, was_successful: bool):
        if subject not in self.progress_data:
            self.progress_data[subject] = {}
        
        if topic not in self.progress_data[subject]:
            self.progress_data[subject][topic] = LearningProgress(
                subject=subject,
                topic=topic,
                skill_level=0.0,
                confidence_level=0.0,
                attempts=0,
                successful_attempts=0,
                last_interaction=datetime.now().isoformat(),
                improvement_trend='stable',
                recommended_actions=[]
            )
        
        progress = self.progress_data[subject][topic]
        progress.attempts += 1
        if was_successful:
            progress.successful_attempts += 1
        
        weight = 0.3
        progress.skill_level = progress.skill_level * (1 - weight) + performance_score * weight
        progress.confidence_level = progress.confidence_level * (1 - weight) + confidence_level * weight
        progress.last_interaction = datetime.now().isoformat()
        progress.improvement_trend = self._calculate_improvement_trend(progress)
        progress.recommended_actions = self._generate_recommendations(progress)
        self._save_progress()
    
    def update_session(self, session_id: str, subject: str, topic: str, question_asked: bool = False, correct_answer: bool = False):
        for session in self.learning_sessions:
            if session.session_id == session_id:
                if subject not in session.subjects_covered:
                    session.subjects_covered.append(subject)
                if topic not in session.topics_covered:
                    session.topics_covered.append(topic)
                if question_asked:
                    session.questions_asked += 1
                    self.analytics["total_questions"] += 1
                if correct_answer:
                    session.questions_answered_correctly += 1
                session.engagement_score = self._calculate_engagement_score(session)
                self._save_progress()
                break
    
    def get_progress_report(self) -> Dict[str, Any]:
        report = {
            "student_id": self.student_id,
            "generated_at": datetime.now().isoformat(),
            "overall_analytics": self.analytics,
            "subject_progress": {},
            "recent_sessions": [asdict(session) for session in self.learning_sessions[-5:]],
            "recommendations": self._generate_overall_recommendations()
        }
        
        for subject, topics in self.progress_data.items():
            subject_data = {
                "average_skill_level": 0.0,
                "average_confidence": 0.0,
                "topics_count": len(topics),
                "topics_detail": {topic: asdict(progress) for topic, progress in topics.items()}
            }
            total_skill = sum(progress.skill_level for progress in topics.values())
            total_confidence = sum(progress.confidence_level for progress in topics.values())
            if len(topics) > 0:
                subject_data["average_skill_level"] = total_skill / len(topics)
                subject_data["average_confidence"] = total_confidence / len(topics)
            report["subject_progress"][subject] = subject_data
        
        return report
    
    def get_struggling_areas(self) -> List[Dict[str, Any]]:
        struggling_areas = []
        for subject, topics in self.progress_data.items():
            for topic, progress in topics.items():
                if progress.skill_level < 0.6 or progress.confidence_level < 0.5 or progress.improvement_trend == 'declining':
                    struggling_areas.append({
                        "subject": subject,
                        "topic": topic,
                        "skill_level": progress.skill_level,
                        "confidence_level": progress.confidence_level,
                        "trend": progress.improvement_trend,
                        "recommendations": progress.recommended_actions
                    })
        struggling_areas.sort(key=lambda x: x["skill_level"])
        return struggling_areas
    
    def _calculate_improvement_trend(self, progress: LearningProgress) -> str:
        if progress.attempts < 3:
            return 'stable'
        success_rate = progress.successful_attempts / progress.attempts
        return 'improving' if success_rate > 0.8 and progress.skill_level > 0.7 else 'declining' if success_rate < 0.4 or progress.skill_level < 0.3 else 'stable'
    
    def _generate_recommendations(self, progress: LearningProgress) -> List[str]:
        recommendations = []
        if progress.skill_level < 0.4:
            recommendations.extend([f"Focus on fundamentals in {progress.topic}", "Request step-by-step explanations"])
        if progress.confidence_level < 0.5:
            recommendations.extend(["Practice easier problems", "Seek encouragement"])
        if progress.improvement_trend == 'declining':
            recommendations.extend(["Review basics", "Try different approaches"])
        if progress.skill_level > 0.8:
            recommendations.extend([f"Explore advanced {progress.subject} topics", "Apply concepts practically"])
        return recommendations
    
    def _generate_overall_recommendations(self) -> List[str]:
        recommendations = []
        if self.analytics["average_accuracy"] < 0.6:
            recommendations.extend(["Focus on understanding over speed", "Ask clarifying questions"])
        if len(self.analytics["challenging_topics"]) > 3:
            recommendations.extend(["Schedule regular reviews", "Break down complex topics"])
        return recommendations
    
    def _calculate_engagement_score(self, session: LearningSession) -> float:
        questions_factor = min(session.questions_asked * 0.2, 0.5)
        accuracy_factor = session.questions_answered_correctly / max(session.questions_asked, 1) * 0.3
        topics_factor = len(session.topics_covered) * 0.1
        return min(questions_factor + accuracy_factor + topics_factor, 1.0)
    
    def _update_analytics(self):
        total_questions = sum(session.questions_asked for session in self.learning_sessions)
        total_correct = sum(session.questions_answered_correctly for session in self.learning_sessions)
        self.analytics["total_questions"] = total_questions
        self.analytics["average_accuracy"] = total_correct / max(total_questions, 1)
        self.analytics["favorite_subjects"] = [
            subject for subject, data in self.get_progress_report()["subject_progress"].items()
            if data["average_skill_level"] > 0.7
        ]
        self.analytics["challenging_topics"] = [
            f"{area['subject']}: {area['topic']}" for area in self.get_struggling_areas()
        ]
    
    def _save_progress(self):
        filename = f"{self.student_id}_progress.json"
        filepath = os.path.join(self.storage_path, filename)
        data = {
            "student_id": self.student_id,
            "progress_data": {subject: {topic: asdict(progress) for topic, progress in topics.items()} for subject, topics in self.progress_data.items()},
            "learning_sessions": [asdict(session) for session in self.learning_sessions],
            "analytics": self.analytics,
            "last_updated": datetime.now().isoformat()
        }
        try:
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2)
        except Exception as e:
            logger.error(f"Error saving progress: {str(e)}")
    
    def _load_progress(self):
        filename = f"{self.student_id}_progress.json"
        filepath = os.path.join(self.storage_path, filename)
        if os.path.exists(filepath):
            try:
                with open(filepath, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                for subject, topics in data.get("progress_data", {}).items():
                    self.progress_data[subject] = {topic: LearningProgress(**progress) for topic, progress in topics.items()}
                self.learning_sessions = [LearningSession(**session) for session in data.get("learning_sessions", [])]
                self.analytics = data.get("analytics", self.analytics)
            except Exception as e:
                logger.error(f"Error loading progress: {str(e)}")

In [None]:
# Create agents with enhanced capabilities
def create_educational_agents(student_id: str, session_id: str, grade_level: str = "middle_school"):
    # Create conversation memory
    conv_memory = EnhancedConversationMemory(session_id=session_id)
    progress_memory = ProgressMemory(student_id=student_id)
    
    # Create tutor agent with enhanced capabilities
    tutor = AssistantAgent(
        name="Educational_Tutor",
        system_message=AgentConfig.get_tutor_system_message(),
        llm_config=llm_config,
        human_input_mode="NEVER",
        max_consecutive_auto_reply=5,
        description="Expert educational tutor with subject matter expertise"
    )
    
    # Create student proxy agent
    student = UserProxyAgent(
        name="Student_Learner",
        system_message=AgentConfig.get_student_system_message() + f"\nGrade Level: {grade_level}",
        human_input_mode="NEVER",
        max_consecutive_auto_reply=3,
        description="Student agent representing a learner"
    )
    
    # Create progress tracker agent
    progress_tracker = AssistantAgent(
        name="Progress_Tracker",
        system_message=AgentConfig.get_progress_tracker_system_message(),
        llm_config=llm_config,
        human_input_mode="NEVER",
        description="Agent that tracks and analyzes student progress"
    )
    
    # Register functions to agents
    def explain_concept(subject: str, topic: str, difficulty_level: str = "medium", learning_style: str = "mixed"):
        logger.info(f"Generating explanation for {subject}: {topic}")
        progress_report = progress_memory.get_progress_report()
        subject_progress = progress_report.get('subject_progress', {}).get(subject, {})
        
        prompt = f"""
        Create a comprehensive explanation for {topic} in {subject}.
        Student Context:
        - Difficulty Level: {difficulty_level}
        - Learning Style: {learning_style}
        - Previous Knowledge: {json.dumps(subject_progress, indent=2) if subject_progress else 'No previous data'}
        
        Include:
        1. Clear introduction
        2. Step-by-step breakdown
        3. Examples and applications
        4. Practice questions
        5. Summary
        """
        
        try:
            # Using AutoGen's built-in LLM instead of LangChain
            response = tutor.generate_reply(
                messages=[{"content": prompt, "role": "user"}],
                sender=student
            )
            explanation = response if response else "No response generated"
            
            conv_memory.add_message("tutor", explanation, {"subject": subject, "topic": topic})
            progress_memory.update_progress(subject, topic, 0.5, 0.5, True)
            progress_memory.update_session(session_id, subject, topic)
            return explanation
        except Exception as e:
            logger.error(f"Error generating explanation: {str(e)}")
            return f"Error generating explanation: {str(e)}"
    
    tutor.register_function(
        function_map={
            "explain_concept": explain_concept
        }
    )
    
    # Create group chat with agents
    groupchat = GroupChat(
        agents=[student, tutor, progress_tracker],
        messages=[],
        max_round=20,
        speaker_selection_method="round_robin"
    )
    
    # Create group chat manager
    manager = GroupChatManager(
        groupchat=groupchat,
        llm_config=llm_config,
        name="Education_Manager",
        description="Manages educational session flow"
    )
    
    return {
        "student": student,
        "tutor": tutor,
        "progress_tracker": progress_tracker,
        "manager": manager,
        "memory": conv_memory,
        "progress_memory": progress_memory
    }

In [None]:
def main():
    logger.info("Starting educational session")
    print("Starting educational session...", flush=True)
    
    student_id = "test_student_001"
    session_id = str(uuid.uuid4())
    
    try:
        # educational agents
        agents = create_educational_agents(
            student_id=student_id,
            session_id=session_id,
            grade_level="high_school"
        )
        
        # Start session
        agents["progress_memory"].start_session(session_id)
        print("Agents initialized successfully!", flush=True)
        
        # Start conversation
        student_agent = agents["student"]
        manager = agents["manager"]
        
        # Tutor explains a concept
        student_agent.initiate_chat(
            manager,
            message="Please explain quadratic equations to me at a medium difficulty level with visual learning style.",
            clear_history=False
        )
        
        # Student asks a question
        student_agent.send(
            message="Can you show a real-world example of quadratic equations?",
            recipient=manager,
            request_reply=True
        )
        
        # Generate practice problems
        student_agent.send(
            message="Could you create 3 practice problems on quadratic equations at medium difficulty?",
            recipient=manager,
            request_reply=True
        )
        
        # End session
        session_summary = "Student engaged with quadratic equations, asked questions, and completed practice problems."
        agents["progress_memory"].end_session(session_id, session_summary)
        
        # Generate progress report
        progress_report = agents["progress_memory"].get_progress_report()
        print("\n=== PROGRESS REPORT ===", flush=True)
        print(json.dumps(progress_report, indent=2), flush=True)
        
    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        print(f"Error: {str(e)}", flush=True)
    
    logger.info("Educational session completed")
    print("\nEducational session completed.", flush=True)

In [None]:

if __name__ == "__main__":
    os.environ["AUTOGEN_USE_DOCKER"] = "False"
    main()

INFO:__main__:Starting educational session
Starting educational session...


Agents initialized successfully!
[33mStudent_Learner[0m (to Education_Manager):

Please explain quadratic equations to me at a medium difficulty level with visual learning style.

--------------------------------------------------------------------------------
[32m
Next speaker: Educational_Tutor
[0m
INFO:httpx:HTTP Request: POST https://idkrag.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
[33mEducational_Tutor[0m (to Education_Manager):

Sure! Let’s explore quadratic equations step by step, and since you’re more of a visual learner, I’ll incorporate diagrams and examples you can imagine or sketch out. Let’s start!

---

### **What is a Quadratic Equation?**
A quadratic equation is any equation that can be written in the form:

\[
ax^2 + bx + c = 0
\]

Where:
- \(x\) represents the variable (what you’re solving for),
- \(a\), \(b\), and \(c\) are constants (numbers),
- \(a \neq 0\) (because if \(a = 0\), it would just