From 4bb9a39359c569eb97f9bed5d7832057f39638c4 Mon Sep 17 00:00:00 2001 From: Shubhashish-Chakraborty Date: Sun, 19 Apr 2026 17:00:25 +0530 Subject: [PATCH 1/7] base poc working - core virtual classroom, durable objection --- public/classroom_poc.html | 733 ++++++++++++++++++++++++++++++++++++++ src/worker.py | 330 ++++++++++++++++- wrangler.toml | 10 + 3 files changed, 1072 insertions(+), 1 deletion(-) create mode 100644 public/classroom_poc.html diff --git a/public/classroom_poc.html b/public/classroom_poc.html new file mode 100644 index 0000000..6007344 --- /dev/null +++ b/public/classroom_poc.html @@ -0,0 +1,733 @@ + + + + + + + Virtual Classroom + + + + + + + + + + + + +
+
+ +
+

+ Virtual Classroom +

+

Real Time Classroom Experience

+
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + +
+
+ +
+
+
+
+ + + + + +
+
+

+ + Active Participants +

+ +
+
+ +
+
+ + +
+ + + + + diff --git a/src/worker.py b/src/worker.py index 49e3951..08bb180 100644 --- a/src/worker.py +++ b/src/worker.py @@ -43,10 +43,12 @@ from typing import Any, Dict from urllib.parse import urlparse, parse_qs -from workers import Response +from workers import Response, DurableObject import js from pyodide.ffi import to_js +from js import WebSocketPair, WebSocketRequestResponsePair +import uuid _SENTRY_INITIALIZED = False _SENTRY_DSN: str = "" @@ -1329,6 +1331,321 @@ async def serve_static(path: str, env): mime = _MIME.get(ext, "text/plain") return Response(content, headers={"Content-Type": mime, **_CORS}) +class ClassroomDO(DurableObject): + """WebSocket based virtual classroom Durable Object. + + Each room_id maps to one DO instance. Connected clients share: + - room_state (participant list, broadcast on join/leave) + - position_update (x/y movement relay) + - chat_message (basic text relay) + - seat mgmt (update_seat / leave_seat) + """ + + def __init__(self, ctx, env): + super().__init__(ctx, env) + # sessions: session_id -> {ws, participant_id, display_name, position, direction, is_moving, seat_id} + self.sessions = {} + + # Restore hibernated WebSocket connections + for ws in self.ctx.getWebSockets(): + attachment = ws.deserializeAttachment() + if attachment: + try: + data = json.loads(attachment) if isinstance(attachment, str) else attachment + sid = data.get("session_id", str(uuid.uuid4())) + self.sessions[sid] = { + "ws": ws, + "participant_id": data.get("participant_id", "unknown"), + "display_name": data.get("display_name", "Unknown"), + "position": data.get("position", {"x": 400, "y": 300}), + "direction": data.get("direction", "down"), + "is_moving": False, + "seat_id": data.get("seat_id", ""), + } + except Exception: + pass + + # Auto respond to keepAlive pings without waking up the instance + self.ctx.setWebSocketAutoResponse( + WebSocketRequestResponsePair.new("ping", "pong") + ) + + async def on_fetch(self, request): + url = request.url + upgrade = request.headers.get("Upgrade") or "" + + if upgrade.lower() != "websocket": + return Response(json.dumps({"error": "Expected WebSocket upgrade"}), + status=426, + headers={"Content-Type": "application/json"}) + + # Extract participant_id and display_name from query string + parsed = urlparse(url) + qs = parse_qs(parsed.query) + participant_id = (qs.get("participant_id") or ["anon"])[0] + display_name = (qs.get("display_name") or [participant_id])[0] + + # Create WebSocket pair + client, server = WebSocketPair.new().object_values() + + # Accept with hibernation support + self.ctx.acceptWebSocket(server) + + # Generate session ID (supports multi-tab: same participant_id, different session) + session_id = str(uuid.uuid4()) + + # Attachment data: survives hibernation + attachment = json.dumps({ + "session_id": session_id, + "participant_id": participant_id, + "display_name": display_name, + "position": {"x": 400, "y": 300}, + "direction": "down", + "seat_id": "", + }) + server.serializeAttachment(attachment) + + # Track locally + self.sessions[session_id] = { + "ws": server, + "participant_id": participant_id, + "display_name": display_name, + "position": {"x": 400, "y": 300}, + "direction": "down", + "is_moving": False, + "seat_id": "", + } + + # Send user_info to the newly connected client + try: + server.send(json.dumps({ + "type": "user_info", + "session_id": session_id, + "participant_id": participant_id, + "display_name": display_name, + })) + except Exception: + pass + + # Broadcast updated room_state to EVERYONE + self._broadcast_room_state() + + # Notifying others that someone joined! + self._broadcast(json.dumps({ + "type": "participant_joined", + "participant_id": participant_id, + "display_name": display_name, + }), exclude_session_id=session_id) + + return Response(None, status=101, web_socket=client) + + # Event Router + async def on_webSocketMessage(self, ws, message): + try: + data = json.loads(message) if isinstance(message, str) else json.loads(message.decode("utf-8")) + except Exception: + return + + msg_type = data.get("type", "") + session = self._session_for_ws(ws) + if not session: + return + + sid, info = session + + if msg_type == "position_update": + # Update stored position + info["position"] = data.get("position", info["position"]) + info["direction"] = data.get("direction", info["direction"]) + info["is_moving"] = data.get("isMoving", False) + self._persist_attachment(sid, info) + + # Broadcast to all (except sender) + self._broadcast(json.dumps({ + "type": "position_update", + "participant_id": info["participant_id"], + "display_name": info["display_name"], + "position": info["position"], + "direction": info["direction"], + "isMoving": info["is_moving"], + }), exclude_session_id=sid) + + elif msg_type == "chat_message": + text = (data.get("text") or "").strip() + if not text: + return + self._broadcast(json.dumps({ + "type": "chat_message", + "participant_id": info["participant_id"], + "display_name": info["display_name"], + "text": text, + "timestamp": data.get("timestamp", ""), + })) + + elif msg_type == "update_seat": + seat_id = data.get("seat_id", "") + if not seat_id: + return + + # Check if seat is occupied by someone else + for other_sid, other_info in self.sessions.items(): + if (other_info["seat_id"] == seat_id + and other_info["participant_id"] != info["participant_id"]): + try: + ws.send(json.dumps({ + "type": "seat_occupied", + "message": "This seat is already taken by another student.", # Seat Occupied! + "seat_id": seat_id, + })) + except Exception: + pass + return + + # Assign seat to this participant + for s_id, s_info in self.sessions.items(): + if s_info["participant_id"] == info["participant_id"]: + s_info["seat_id"] = seat_id + self._persist_attachment(s_id, s_info) + + self._broadcast(json.dumps({ + "type": "seat_updated", + "participant_id": info["participant_id"], + "display_name": info["display_name"], + "seat_id": seat_id, + })) + self._broadcast_room_state() + + elif msg_type == "leave_seat": + old_seat = info["seat_id"] + # Clear seat for all tabs of this participant + for s_id, s_info in self.sessions.items(): + if s_info["participant_id"] == info["participant_id"]: + s_info["seat_id"] = "" + self._persist_attachment(s_id, s_info) + + self._broadcast(json.dumps({ + "type": "seat_left", + "participant_id": info["participant_id"], + "display_name": info["display_name"], + "seat_id": old_seat, + })) + self._broadcast_room_state() + + async def on_webSocketClose(self, ws, code, reason, wasClean): + try: + ws.close(code, reason) + except Exception: + pass + + session = self._session_for_ws(ws) + if not session: + return + + sid, info = session + participant_id = info["participant_id"] + display_name = info["display_name"] + + # Remove this specific session + self.sessions.pop(sid, None) + + # Check if participant still has other tabs open + still_connected = any( + s["participant_id"] == participant_id + for s in self.sessions.values() + ) + + if not still_connected: + self._broadcast(json.dumps({ + "type": "participant_left", + "participant_id": participant_id, + "display_name": display_name, + })) + + self._broadcast_room_state() + + async def on_webSocketError(self, ws, error): + """Handle WebSocket errors — treat as close.""" + await self.on_webSocketClose(ws, 1011, "WebSocket error", False) + + # HELPERS: + + def _session_for_ws(self, ws): + """Find (session_id, info) for a given WebSocket object. + Uses deserializeAttachment so it works after hibernation wakeUp + """ + try: + raw = ws.deserializeAttachment() + if raw: + data = json.loads(raw) if isinstance(raw, str) else raw + sid = data.get("session_id", "") + if sid and sid in self.sessions: + return (sid, self.sessions[sid]) + except Exception: + pass + + for sid, info in self.sessions.items(): + try: + if info["ws"] == ws: + return (sid, info) + except Exception: + pass + return None + + def _broadcast(self, msg, exclude_session_id=None): + """Send a JSON string to all connected WebSockets. + exclude_session_id: session_id string to skip (the sender). + """ + dead = [] + for sid, info in self.sessions.items(): + if sid == exclude_session_id: + continue + try: + info["ws"].send(msg) + except Exception: + dead.append(sid) + for sid in dead: + self.sessions.pop(sid, None) + + def _broadcast_room_state(self): + """Build and broadcast the current room state.""" + # De-duplicate by participant_id (multi-tab) + seen = {} + for info in self.sessions.values(): + pid = info["participant_id"] + if pid not in seen: + seen[pid] = { + "participant_id": pid, + "display_name": info["display_name"], + "position": info["position"], + "direction": info["direction"], + "is_moving": info.get("is_moving", False), + "seat_id": info.get("seat_id", ""), + } + + state_msg = json.dumps({ + "type": "room_state", + "participants": list(seen.values()), + "count": len(seen), + }) + self._broadcast(state_msg) + + def _persist_attachment(self, session_id, info): + """Persist session data in the WebSocket attachment.""" + ws = self.sessions.get(session_id, {}).get("ws") + if not ws: + return + try: + ws.serializeAttachment(json.dumps({ + "session_id": session_id, + "participant_id": info["participant_id"], + "display_name": info["display_name"], + "position": info["position"], + "direction": info["direction"], + "seat_id": info.get("seat_id", ""), + })) + except Exception: + pass + # --------------------------------------------------------------------------- # Main dispatcher @@ -1346,6 +1663,17 @@ async def _dispatch(request, env): if not _is_basic_auth_valid(request, env): return _unauthorized_basic() return await serve_static("/admin.html", env) + + m_classroom = re.fullmatch(r"/api/classroom/([A-Za-z0-9_-]+)", path) + if m_classroom: + room_id = m_classroom.group(1) + try: + do_id = env.CLASSROOM_DO.idFromName(room_id) + stub = env.CLASSROOM_DO.get(do_id) + return await stub.fetch(request) + except Exception as e: + await capture_exception(e, request, env, "classroom_do_dispatch") + return err("Failed to connect to classroom", 500) if path.startswith("/api/"): if path == "/api/init" and method == "POST": diff --git a/wrangler.toml b/wrangler.toml index c281b20..516154e 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -18,6 +18,16 @@ database_name = "education_db" database_id = "a0021f2e-a8cc-4e20-8910-3c7290ba47a6" +# Virtual Classroom Durable Object +[[durable_objects.bindings]] +name = "CLASSROOM_DO" +class_name = "ClassroomDO" + +[[migrations]] +tag = "v1" +new_sqlite_classes = ["ClassroomDO"] + + [observability] enabled = false head_sampling_rate = 1 From d72a6e257d7cb083fae82e43bf355b62e75fca71 Mon Sep 17 00:00:00 2001 From: Shubhashish-Chakraborty Date: Sun, 19 Apr 2026 17:30:06 +0530 Subject: [PATCH 2/7] added durableObject stub, pytest fix --- tests/conftest.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 2c15bd5..8d4995d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,8 +35,17 @@ def __repr__(self): return f"" +class _DurableObject: + """Stub for workers.DurableObject base class.""" + + def __init__(self, ctx=None, env=None): + self.ctx = ctx + self.env = env + + class _WorkersModule: Response = _Response + DurableObject = _DurableObject sys.modules["workers"] = _WorkersModule() @@ -112,9 +121,32 @@ def getRandomValues(buf): return b"\x00" * 12 +class _WebSocketPair: + """Stub for js.WebSocketPair.""" + + @staticmethod + def new(): + from unittest.mock import MagicMock + pair = MagicMock() + pair.object_values.return_value = (MagicMock(), MagicMock()) + return pair + + +class _WebSocketRequestResponsePair: + """Stub for js.WebSocketRequestResponsePair.""" + + @staticmethod + def new(request, response): + from unittest.mock import MagicMock + return MagicMock() + + class _JsModule: crypto = _Crypto() Uint8Array = _Uint8Array() + WebSocketPair = _WebSocketPair() + WebSocketRequestResponsePair = _WebSocketRequestResponsePair() + class Object: @staticmethod def fromEntries(entries): From 8149e4d1016cdfc3a1e3147d0e9ea7a5eab894c2 Mon Sep 17 00:00:00 2001 From: Shubhashish-Chakraborty Date: Mon, 20 Apr 2026 00:35:14 +0530 Subject: [PATCH 3/7] implemented valid coderabbit suggestions, made the position update more accurate --- public/classroom_poc.html | 348 +++++++++++++++++++++++++------------- src/worker.py | 303 ++++++++++++++++----------------- 2 files changed, 375 insertions(+), 276 deletions(-) diff --git a/public/classroom_poc.html b/public/classroom_poc.html index 6007344..d506e59 100644 --- a/public/classroom_poc.html +++ b/public/classroom_poc.html @@ -27,7 +27,6 @@ };