# Fix and Test Real-Time Chat Application

This notebook will walk through the process of fixing and testing a real-time chat application with reaction support. We'll fix code issues, implement missing functionality, and test various features.

## 1. Setup Environment

First, let's install the required dependencies and configure our Python environment.

In [None]:
import sys
import subprocess
import venv
import os

def setup_venv():
    venv_path = os.path.join(os.getcwd(), '.venv')
    if not os.path.exists(venv_path):
        print("Creating virtual environment...")
        venv.create(venv_path, with_pip=True)
    
    # Get the Python interpreter path in the virtual environment
    if sys.platform == 'win32':
        python_path = os.path.join(venv_path, 'Scripts', 'python.exe')
    else:
        python_path = os.path.join(venv_path, 'bin', 'python')
    
    # Install dependencies
    requirements = [
        'fastapi>=0.110',
        'uvicorn[standard]>=0.29',
        'jinja2>=3.1',
        'python-multipart>=0.0.6',
        'starlette>=0.36',
        'pydantic>=2.7',
        'anyio>=4.0',
        'websockets>=10.4'
    ]
    
    print("Installing dependencies...")
    for req in requirements:
        subprocess.run([python_path, '-m', 'pip', 'install', req])
    
    return python_path

python_path = setup_venv()
print(f"Virtual environment is ready with Python at: {python_path}")

## 2. Fix Code Structure

Now let's fix the issues in the main.py file. We need to:
1. Remove the duplicate `if room in self.users:` block
2. Fix the `store_message` method definition
3. Ensure proper indentation and code organization

In [None]:
import os

def fix_main_py():
    file_path = os.path.join(os.getcwd(), 'app', 'main.py')
    fixed_content = '''from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from typing import Dict, List, Union, Optional
from starlette.requests import Request
from datetime import datetime
import uvicorn, json, uuid
from pydantic import ValidationError
from .schemas import Message, MessageBroadcast, ReactionRequest, MessageRequest, ReactionData, AddReactionRequest, RemoveReactionRequest

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")

class ConnectionManager:
    def __init__(self):
        self.rooms: Dict[str, List[WebSocket]] = {}
        self.users: Dict[str, Dict[str, str]] = {}   # room ➞ {ws_id: username}
        self.messages: Dict[str, Dict[str, Message]] = {}  # room ➞ {message_id: Message}

    async def connect(self, room: str, username: str, websocket: WebSocket):
        await websocket.accept()
        self.rooms.setdefault(room, []).append(websocket)
        self.users.setdefault(room, {})[id(websocket)] = username
        await self.broadcast(room, {"type": "join", "user": username, "online": list(self.users[room].values())})

    async def disconnect(self, room: str, websocket: WebSocket):
        if room in self.rooms and websocket in self.rooms[room]:
            self.rooms[room].remove(websocket)
            if room in self.users and id(websocket) in self.users[room]:
                username = self.users[room].pop(id(websocket))
                await self.broadcast(room, {"type": "leave", "user": username, "online": list(self.users[room].values())})
            if not self.rooms[room]:
                del self.rooms[room]
                if room in self.users:
                    del self.users[room]

    def store_message(self, room: str, message: Message) -> None:
        """Store a message in the room's message history"""
        self.messages.setdefault(room, {})[message.id] = message
    
    def get_message(self, room: str, message_id: str) -> Optional[Message]:
        """Get a specific message by ID"""
        return self.messages.get(room, {}).get(message_id)
    
    def verify_user_in_room(self, room: str, username: str) -> bool:
        """Verify that a user is currently connected to the room"""
        if room not in self.users:
            return False
        return username in self.users[room].values()
    
    def add_reaction(self, room: str, message_id: str, emoji: str, username: str) -> bool:
        """Add a reaction to a message. Returns True if successful."""
        message = self.get_message(room, message_id)
        if not message:
            return False
        
        if emoji not in message.reactions.emoji:
            message.reactions.emoji[emoji] = []
        
        if username not in message.reactions.emoji[emoji]:
            message.reactions.emoji[emoji].append(username)
        
        return True
    
    def remove_reaction(self, room: str, message_id: str, emoji: str, username: str) -> bool:
        """Remove a reaction from a message. Returns True if successful."""
        message = self.get_message(room, message_id)
        if not message:
            return False
        
        if emoji in message.reactions.emoji and username in message.reactions.emoji[emoji]:
            message.reactions.emoji[emoji].remove(username)
            # Remove emoji key if no users have this reaction
            if not message.reactions.emoji[emoji]:
                del message.reactions.emoji[emoji]
            return True
        
        return False

    async def broadcast(self, room: str, message: Union[dict, MessageBroadcast]):
        """Broadcast a message to all clients in a room"""
        if room in self.rooms:
            # Convert to dict if it's a Pydantic model
            if isinstance(message, MessageBroadcast):
                message_data = message.model_dump(exclude_none=True)
                # Convert datetime to ISO string for JSON serialization
                if "timestamp" in message_data and message_data["timestamp"]:
                    message_data["timestamp"] = message_data["timestamp"].isoformat()
                # Convert reactions to dict format
                if "reactions" in message_data and message_data["reactions"]:
                    message_data["reactions"] = message_data["reactions"]["emoji"]
            else:
                message_data = message
            
            message_text = json.dumps(message_data)
            disconnected = []
            for websocket in self.rooms[room]:
                try:
                    await websocket.send_text(message_text)
                except:
                    disconnected.append(websocket)
            
            # Clean up disconnected websockets
            for websocket in disconnected:
                await self.disconnect(room, websocket)

manager = ConnectionManager()

@app.get("/")
async def get_index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.websocket("/ws/{room}/{username}")
async def websocket_endpoint(websocket: WebSocket, room: str, username: str):
    await manager.connect(room, username, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            try:
                if data["type"] == "message":
                    message_request = MessageRequest(**data)
                    message = Message(
                        id=str(uuid.uuid4()),
                        type="message",
                        user=username,
                        content=message_request.content,
                        timestamp=datetime.now(),
                    )
                    manager.store_message(room, message)
                    await manager.broadcast(room, MessageBroadcast(
                        type="message",
                        user=username,
                        content=message_request.content,
                        message_id=message.id,
                        timestamp=message.timestamp
                    ))
                elif data["type"] == "add_reaction":
                    request = AddReactionRequest(**data)
                    if manager.verify_user_in_room(room, username):
                        if manager.add_reaction(room, request.message_id, request.emoji, username):
                            message = manager.get_message(room, request.message_id)
                            if message:
                                await manager.broadcast(room, MessageBroadcast(
                                    type="reaction_update",
                                    message_id=request.message_id,
                                    emoji=request.emoji,
                                    users=message.reactions.emoji[request.emoji]
                                ))
                elif data["type"] == "remove_reaction":
                    request = RemoveReactionRequest(**data)
                    if manager.verify_user_in_room(room, username):
                        if manager.remove_reaction(room, request.message_id, request.emoji, username):
                            message = manager.get_message(room, request.message_id)
                            users = message.reactions.emoji.get(request.emoji, []) if message else []
                            await manager.broadcast(room, MessageBroadcast(
                                type="reaction_update",
                                message_id=request.message_id,
                                emoji=request.emoji,
                                users=users
                            ))
            except ValidationError as e:
                print(f"Validation error: {e}")
                        
    except WebSocketDisconnect:
        await manager.disconnect(room, websocket)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
'''
    
    with open(file_path, 'w') as f:
        f.write(fixed_content)
    
    print(f"Fixed main.py has been written to {file_path}")

fix_main_py()

## 3. Message Storage Implementation

Let's test the message storage functionality to make sure our fixes work correctly.

In [None]:
import sys
import os
sys.path.append(os.getcwd())

from datetime import datetime
from app.schemas import Message, ReactionData
from app.main import ConnectionManager

def test_message_storage():
    manager = ConnectionManager()
    room = "test_room"
    
    # Create a test message
    message = Message(
        id="test-message-1",
        type="message",
        user="test_user",
        content="Hello, testing message storage!",
        timestamp=datetime.now(),
        reactions=ReactionData()
    )
    
    # Store the message
    manager.store_message(room, message)
    
    # Retrieve the message
    retrieved_message = manager.get_message(room, "test-message-1")
    
    assert retrieved_message is not None, "Message should be retrieved successfully"
    assert retrieved_message.id == "test-message-1", "Message ID should match"
    assert retrieved_message.content == "Hello, testing message storage!", "Message content should match"
    
    print("Message storage test passed successfully!")

test_message_storage()

## 4. Test WebSocket Connection

Now let's test the WebSocket connection functionality.

In [None]:
import asyncio
import json
import websockets
import subprocess
import sys
import os
from threading import Thread
import time

async def test_websocket_connection():
    # Start the server in a separate process
    server_process = subprocess.Popen([sys.executable, "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"])
    time.sleep(2)  # Wait for server to start
    
    try:
        # Connect to the WebSocket
        uri = "ws://localhost:8000/ws/test_room/test_user"
        async with websockets.connect(uri) as websocket:
            print("Connected to WebSocket")
            
            # Send a test message
            test_message = {
                "type": "message",
                "content": "Test WebSocket message"
            }
            await websocket.send(json.dumps(test_message))
            print("Sent test message")
            
            # Wait for message broadcast
            response = await websocket.recv()
            data = json.loads(response)
            print(f"Received response: {data}")
            
            assert data["type"] == "message", "Should receive a message type response"
            assert data["content"] == "Test WebSocket message", "Message content should match"
            
            print("WebSocket connection test passed successfully!")
            
    except Exception as e:
        print(f"Error during WebSocket test: {e}")
    finally:
        # Stop the server
        server_process.terminate()
        server_process.wait()

# Run the test
asyncio.run(test_websocket_connection())

## 5. Run Integration Tests

Let's run the test_reactions.py script to verify the reaction handling functionality.

In [None]:
import asyncio
import subprocess
import os
import time

async def run_integration_tests():
    # Start the server
    server_process = subprocess.Popen([sys.executable, "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"])
    time.sleep(2)  # Wait for server to start
    
    try:
        # Run the test_reactions.py script
        test_script = os.path.join(os.getcwd(), "test_reactions.py")
        process = subprocess.run([sys.executable, test_script], capture_output=True, text=True)
        
        print("Test output:")
        print(process.stdout)
        
        if process.returncode == 0:
            print("Integration tests passed successfully!")
        else:
            print(f"Integration tests failed with error:\n{process.stderr}")
            
    finally:
        # Stop the server
        server_process.terminate()
        server_process.wait()

# Run the integration tests
asyncio.run(run_integration_tests())

## 6. Test Reaction System

Now let's run the demo_reaction_updates.py script to test the complete reaction system with multiple clients.

In [None]:
import asyncio
import subprocess
import os
import time

async def run_demo_reaction_updates():
    # Start the server
    server_process = subprocess.Popen([sys.executable, "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"])
    time.sleep(2)  # Wait for server to start
    
    try:
        # Run the demo_reaction_updates.py script
        demo_script = os.path.join(os.getcwd(), "demo_reaction_updates.py")
        process = subprocess.run([sys.executable, demo_script], capture_output=True, text=True)
        
        print("Demo output:")
        print(process.stdout)
        
        if process.returncode == 0:
            print("Demo reaction updates test passed successfully!")
        else:
            print(f"Demo reaction updates test failed with error:\n{process.stderr}")
            
    finally:
        # Stop the server
        server_process.terminate()
        server_process.wait()

# Run the demo
asyncio.run(run_demo_reaction_updates())

## 7. Run Full Application

Finally, let's start the FastAPI server and test the complete application with multiple browser clients.

In [None]:
import subprocess
import sys

# Start the FastAPI server
subprocess.run([sys.executable, "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"])