Skip to content

Commit dcfc9aa

Browse files
committed
feat(backend): implement satellite event handling and processing
1 parent ef0d19b commit dcfc9aa

File tree

7 files changed

+921
-0
lines changed

7 files changed

+921
-0
lines changed
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/**
2+
* Satellite Event Dispatcher
3+
*
4+
* Convention-based event handler system that auto-discovers and routes events
5+
* to appropriate handlers based on event type.
6+
*/
7+
8+
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
9+
import type {
10+
EventHandlerRegistry,
11+
SatelliteEvent,
12+
EventProcessingResult
13+
} from './types';
14+
import Ajv from 'ajv';
15+
import addFormats from 'ajv-formats';
16+
import type { FastifyBaseLogger } from 'fastify';
17+
18+
// Auto-discover event handlers from this directory
19+
// Convention: Each handler exports EVENT_TYPE, SCHEMA, and handle()
20+
const handlerModules = [
21+
() => import('./mcp-server-started'),
22+
() => import('./mcp-tool-executed'),
23+
() => import('./mcp-server-crashed'),
24+
// Add new handlers here - they will be automatically registered
25+
];
26+
27+
// Initialize AJV validator
28+
const ajv = new Ajv({
29+
allErrors: true,
30+
strict: false,
31+
strictTypes: false
32+
});
33+
addFormats(ajv);
34+
35+
/**
36+
* Build event handler registry by loading all handler modules
37+
*/
38+
async function buildEventRegistry(): Promise<EventHandlerRegistry> {
39+
const registry: EventHandlerRegistry = {};
40+
41+
for (const loadHandler of handlerModules) {
42+
try {
43+
const handler = await loadHandler();
44+
45+
// Validate handler exports required fields
46+
if (!handler.EVENT_TYPE || !handler.SCHEMA || !handler.handle) {
47+
console.error('Invalid event handler - missing required exports:', handler);
48+
continue;
49+
}
50+
51+
registry[handler.EVENT_TYPE] = {
52+
EVENT_TYPE: handler.EVENT_TYPE,
53+
SCHEMA: handler.SCHEMA,
54+
handle: handler.handle
55+
};
56+
} catch (error) {
57+
console.error('Failed to load event handler:', error);
58+
}
59+
}
60+
61+
return registry;
62+
}
63+
64+
// Singleton registry instance
65+
let eventRegistry: EventHandlerRegistry | null = null;
66+
67+
/**
68+
* Get or initialize event handler registry
69+
*/
70+
async function getEventRegistry(): Promise<EventHandlerRegistry> {
71+
if (!eventRegistry) {
72+
eventRegistry = await buildEventRegistry();
73+
}
74+
return eventRegistry;
75+
}
76+
77+
/**
78+
* Process a single event
79+
*/
80+
async function processEvent(
81+
satelliteId: string,
82+
event: SatelliteEvent,
83+
db: LibSQLDatabase,
84+
logger: FastifyBaseLogger
85+
): Promise<EventProcessingResult> {
86+
try {
87+
const registry = await getEventRegistry();
88+
89+
// Check if event type is registered
90+
const handler = registry[event.type];
91+
if (!handler) {
92+
logger.warn({ eventType: event.type }, 'Unknown event type');
93+
return {
94+
success: false,
95+
error: `Unknown event type: ${event.type}`
96+
};
97+
}
98+
99+
// Validate event data against handler schema
100+
const validate = ajv.compile(handler.SCHEMA);
101+
const valid = validate(event.data);
102+
103+
if (!valid) {
104+
const errors = validate.errors?.map(e => `${e.instancePath} ${e.message}`).join(', ');
105+
logger.warn({
106+
eventType: event.type,
107+
validationErrors: errors
108+
}, 'Event validation failed');
109+
return {
110+
success: false,
111+
error: `Validation failed: ${errors}`
112+
};
113+
}
114+
115+
// Parse event timestamp
116+
const eventTimestamp = new Date(event.timestamp);
117+
if (isNaN(eventTimestamp.getTime())) {
118+
return {
119+
success: false,
120+
error: 'Invalid timestamp format'
121+
};
122+
}
123+
124+
// Execute handler
125+
await handler.handle(satelliteId, event.data, db, eventTimestamp);
126+
127+
logger.info({
128+
satelliteId,
129+
eventType: event.type
130+
}, 'Event processed successfully');
131+
132+
return {
133+
success: true,
134+
eventId: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
135+
};
136+
137+
} catch (error) {
138+
logger.error({
139+
satelliteId,
140+
eventType: event.type,
141+
error: error instanceof Error ? error.message : 'Unknown error'
142+
}, 'Event processing failed');
143+
144+
return {
145+
success: false,
146+
error: error instanceof Error ? error.message : 'Unknown error'
147+
};
148+
}
149+
}
150+
151+
/**
152+
* Process batch of events from satellite
153+
*
154+
* @param satelliteId - Satellite identifier
155+
* @param events - Array of events to process
156+
* @param db - Database instance
157+
* @param logger - Fastify logger
158+
* @returns Batch processing results
159+
*/
160+
export async function processBatch(
161+
satelliteId: string,
162+
events: SatelliteEvent[],
163+
db: LibSQLDatabase,
164+
logger: FastifyBaseLogger
165+
): Promise<{
166+
processed: number;
167+
failed: number;
168+
eventIds: string[];
169+
failures: Array<{ index: number; type: string; error: string; }>;
170+
}> {
171+
const results = {
172+
processed: 0,
173+
failed: 0,
174+
eventIds: [] as string[],
175+
failures: [] as Array<{ index: number; type: string; error: string; }>
176+
};
177+
178+
// Process each event individually
179+
for (let i = 0; i < events.length; i++) {
180+
const event = events[i];
181+
const result = await processEvent(satelliteId, event, db, logger);
182+
183+
if (result.success && result.eventId) {
184+
results.processed++;
185+
results.eventIds.push(result.eventId);
186+
} else {
187+
results.failed++;
188+
results.failures.push({
189+
index: i,
190+
type: event.type,
191+
error: result.error || 'Unknown error'
192+
});
193+
}
194+
}
195+
196+
logger.info({
197+
satelliteId,
198+
batchSize: events.length,
199+
processed: results.processed,
200+
failed: results.failed
201+
}, 'Batch processing complete');
202+
203+
return results;
204+
}
205+
206+
/**
207+
* Get list of registered event types
208+
* Useful for debugging and documentation
209+
*/
210+
export async function getRegisteredEventTypes(): Promise<string[]> {
211+
const registry = await getEventRegistry();
212+
return Object.keys(registry);
213+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* MCP Server Crashed Event Handler
3+
*
4+
* Updates satelliteProcesses table when an MCP server crashes unexpectedly
5+
*/
6+
7+
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
8+
import { satelliteProcesses } from '../../db/schema.sqlite';
9+
import { eq } from 'drizzle-orm';
10+
11+
// Event type identifier
12+
export const EVENT_TYPE = 'mcp.server.crashed';
13+
14+
// JSON Schema for Fastify validation
15+
export const SCHEMA = {
16+
type: 'object',
17+
properties: {
18+
processId: {
19+
type: 'string',
20+
minLength: 1,
21+
description: 'Process identifier in satelliteProcesses table'
22+
},
23+
serverId: {
24+
type: 'string',
25+
minLength: 1,
26+
description: 'MCP server identifier that crashed'
27+
},
28+
serverName: {
29+
type: 'string',
30+
minLength: 1,
31+
description: 'Human-readable MCP server name'
32+
},
33+
teamId: {
34+
type: 'string',
35+
minLength: 1,
36+
description: 'Team identifier for the crashed server'
37+
},
38+
exitCode: {
39+
type: 'number',
40+
description: 'Process exit code'
41+
},
42+
signal: {
43+
type: 'string',
44+
description: 'Signal that terminated the process (e.g., SIGKILL, SIGSEGV)'
45+
},
46+
errorMessage: {
47+
type: 'string',
48+
description: 'Error message or crash reason'
49+
},
50+
stackTrace: {
51+
type: 'string',
52+
description: 'Stack trace if available'
53+
}
54+
},
55+
required: ['processId', 'serverId', 'serverName', 'teamId'],
56+
additionalProperties: true
57+
} as const;
58+
59+
// TypeScript interface for type safety
60+
interface ServerCrashedData {
61+
processId: string;
62+
serverId: string;
63+
serverName: string;
64+
teamId: string;
65+
exitCode?: number;
66+
signal?: string;
67+
errorMessage?: string;
68+
stackTrace?: string;
69+
}
70+
71+
/**
72+
* Handle mcp.server.crashed event
73+
*
74+
* Updates the satelliteProcesses table to mark the process as failed.
75+
* This event enables immediate alerting and crash analysis.
76+
*/
77+
export async function handle(
78+
satelliteId: string,
79+
eventData: Record<string, unknown>,
80+
db: LibSQLDatabase,
81+
eventTimestamp: Date
82+
): Promise<void> {
83+
const data = eventData as unknown as ServerCrashedData;
84+
85+
// Build error message from available data
86+
const errorDetails = [];
87+
if (data.exitCode !== undefined) errorDetails.push(`Exit code: ${data.exitCode}`);
88+
if (data.signal) errorDetails.push(`Signal: ${data.signal}`);
89+
if (data.errorMessage) errorDetails.push(data.errorMessage);
90+
91+
const errorMessage = errorDetails.length > 0
92+
? errorDetails.join(' | ')
93+
: 'Process crashed unexpectedly';
94+
95+
// Update process status to failed
96+
await db
97+
.update(satelliteProcesses)
98+
.set({
99+
status: 'failed',
100+
health_status: 'unhealthy',
101+
error_message: errorMessage,
102+
stopped_at: eventTimestamp,
103+
updated_at: new Date()
104+
})
105+
.where(eq(satelliteProcesses.id, data.processId));
106+
107+
// Future enhancement: Trigger alert notifications
108+
// Future enhancement: Create incident tracking record
109+
// Future enhancement: Automatic restart based on policies
110+
}

0 commit comments

Comments
 (0)