Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions packages/app/src/server/mcp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export const createServerFactory = (_webServerInstance: WebServer, sharedApiClie
options?: QueryLoggerOptions
) => void;

type BaseQueryLoggerOptions = Omit<QueryLoggerOptions, 'durationMs' | 'success' | 'error'>;
type BaseQueryLoggerOptions = Omit<QueryLoggerOptions, 'durationMs' | 'error'>;

interface QueryLoggingConfig<T> {
methodName: string;
Expand All @@ -175,17 +175,24 @@ export const createServerFactory = (_webServerInstance: WebServer, sharedApiClie
const start = performance.now();
try {
const result = await work();
const durationMs = performance.now() - start;
const durationMs = Math.round(performance.now() - start);
const successOptions = config.successOptions?.(result) ?? {};
const { success: successOverride, ...restSuccessOptions } = successOptions;
const resultHasError =
typeof result === 'object' &&
result !== null &&
'isError' in result &&
Boolean((result as { isError?: boolean }).isError);
const successFlag = successOverride ?? !resultHasError;
logFn(config.methodName, config.query, config.parameters, {
...config.baseOptions,
...successOptions,
...restSuccessOptions,
durationMs,
success: true,
success: successFlag,
});
return result;
} catch (error) {
const durationMs = performance.now() - start;
const durationMs = Math.round(performance.now() - start);
logFn(config.methodName, config.query, config.parameters, {
...config.baseOptions,
durationMs,
Expand Down
23 changes: 23 additions & 0 deletions packages/app/src/server/transport/base-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface SessionMetadata {
pingFailures?: number;
lastPingAttempt?: Date;
ipAddress?: string;
authToken?: string;
}

/**
Expand Down Expand Up @@ -172,6 +173,20 @@ export abstract class BaseTransport {
this.metrics.trackIpAddress(ipAddress);
}

/**
* Track an IP address for a specific client
*/
protected trackClientIpAddress(ipAddress: string | undefined, clientInfo?: { name: string; version: string }): void {
this.metrics.trackClientIpAddress(ipAddress, clientInfo);
}

/**
* Track auth status for a specific client
*/
protected trackClientAuth(authToken: string | undefined, clientInfo?: { name: string; version: string }): void {
this.metrics.trackClientAuth(authToken, clientInfo);
}

/**
* Extract IP address from request headers
* Handles x-forwarded-for, x-real-ip, and direct IP
Expand Down Expand Up @@ -454,6 +469,14 @@ export abstract class StatefulTransport<TSession extends BaseSession = BaseSessi
session.metadata.clientInfo = clientInfo;
// Associate session with real client for metrics tracking
this.metrics.associateSessionWithClient(clientInfo);

// Track IP address for this client if available
if (session.metadata.ipAddress) {
this.trackClientIpAddress(session.metadata.ipAddress, clientInfo);
}

// Track auth status for this client
this.trackClientAuth(session.metadata.authToken, clientInfo);
}

if (clientCapabilities) {
Expand Down
2 changes: 2 additions & 0 deletions packages/app/src/server/transport/sse-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export class SseTransport extends StatefulTransport<SSEConnection> {
const cleanup = this.createCleanupFunction(sessionId);

// Store connection with metadata
const authToken = headers['authorization']?.replace(/^Bearer\s+/i, '');
const connection: SSEConnection = {
transport,
server,
Expand All @@ -130,6 +131,7 @@ export class SseTransport extends StatefulTransport<SSEConnection> {
isAuthenticated: authResult.shouldContinue && !!headers['authorization'],
capabilities: {},
ipAddress,
authToken,
},
cleaningUp: false,
};
Expand Down
7 changes: 7 additions & 0 deletions packages/app/src/server/transport/stateless-http-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ export class StatelessHttpTransport extends BaseTransport {
this.associateSessionWithClient(extractedClientInfo);
this.updateClientActivity(extractedClientInfo);

// Track IP address for this client
this.trackClientIpAddress(ipAddress, extractedClientInfo);

// Track auth status for this client
const authToken = headers['authorization']?.replace(/^Bearer\s+/i, '');
this.trackClientAuth(authToken, extractedClientInfo);

// Update analytics session with client info
if (this.analyticsMode && sessionId) {
this.updateAnalyticsSessionClientInfo(sessionId, extractedClientInfo);
Expand Down
11 changes: 9 additions & 2 deletions packages/app/src/server/transport/streamable-http-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class StreamableHttpTransport extends StatefulTransport<Session> {
return;
}

transport = await this.createSession(headers);
transport = await this.createSession(headers, req);
} else if (!sessionId) {
// No session ID and not an initialization request
this.trackError(400);
Expand Down Expand Up @@ -220,12 +220,16 @@ export class StreamableHttpTransport extends StatefulTransport<Session> {
await this.removeSession(sessionId);
}

private async createSession(requestHeaders?: Record<string, string>): Promise<StreamableHTTPServerTransport> {
private async createSession(requestHeaders?: Record<string, string>, req?: Request): Promise<StreamableHTTPServerTransport> {
// Create server instance using factory with request headers
// Note: Auth validation is now done in handlePostRequest before calling this method
const result = await this.serverFactory(requestHeaders || null);
const server = result.server;

// Extract IP address and auth token for tracking
const ipAddress = req ? this.extractIpAddress(req.headers as Record<string, string | string[] | undefined>, req.ip) : undefined;
const authToken = requestHeaders?.['authorization']?.replace(/^Bearer\s+/i, '');

const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId: string) => {
Expand All @@ -235,6 +239,7 @@ export class StreamableHttpTransport extends StatefulTransport<Session> {
logSystemEvent('initialize', sessionId, {
clientSessionId: sessionId,
isAuthenticated: !!requestHeaders?.['authorization'],
ipAddress,
});

// Create session object and store it immediately
Expand All @@ -248,6 +253,8 @@ export class StreamableHttpTransport extends StatefulTransport<Session> {
requestCount: 0,
isAuthenticated: !!requestHeaders?.['authorization'],
capabilities: {},
ipAddress,
authToken,
},
cleaningUp: false,
};
Expand Down
8 changes: 6 additions & 2 deletions packages/app/src/server/utils/query-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ export function logSearchQuery(
): void {
// Use a stable mcpServerSessionId per process/transport instance
const mcpServerSessionId = getMcpServerSessionId();
const normalizedDurationMs =
options?.durationMs !== undefined ? Math.round(options.durationMs) : undefined;
const serializedParameters = JSON.stringify(data);
const requestPayload = {
methodName,
Expand All @@ -230,7 +232,7 @@ export function logSearchQuery(
totalResults: options?.totalResults,
resultsShared: options?.resultsShared,
responseCharCount: options?.responseCharCount,
durationMs: options?.durationMs,
durationMs: normalizedDurationMs,
success: options?.success ?? true,
errorMessage: normalizedError,
});
Expand All @@ -247,6 +249,8 @@ export function logPromptQuery(
): void {
// Use a stable mcpServerSessionId per process/transport instance
const mcpServerSessionId = getMcpServerSessionId();
const normalizedDurationMs =
options?.durationMs !== undefined ? Math.round(options.durationMs) : undefined;
const serializedParameters = JSON.stringify(data);
const requestPayload = {
methodName,
Expand All @@ -269,7 +273,7 @@ export function logPromptQuery(
totalResults: options?.totalResults,
resultsShared: options?.resultsShared,
responseCharCount: options?.responseCharCount,
durationMs: options?.durationMs,
durationMs: normalizedDurationMs,
success: options?.success ?? true,
errorMessage: normalizedError,
});
Expand Down
85 changes: 85 additions & 0 deletions packages/app/src/shared/transport-metrics.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createHash } from 'node:crypto';
import type { TransportType } from './constants.js';

/**
Expand Down Expand Up @@ -77,6 +78,9 @@ export interface ClientMetrics {
activeConnections: number;
totalConnections: number;
toolCallCount: number;
newIpCount: number;
anonCount: number;
uniqueAuthCount: number;
}

/**
Expand Down Expand Up @@ -228,6 +232,9 @@ export interface TransportMetricsResponse {
activeConnections: number;
totalConnections: number;
toolCallCount: number;
newIpCount: number;
anonCount: number;
uniqueAuthCount: number;
}>;

sessions: SessionData[];
Expand Down Expand Up @@ -409,6 +416,13 @@ class RollingWindowCounter {
}
}

/**
* Hash auth tokens before counting them to avoid storing raw secrets.
*/
function hashToken(token: string): string {
return createHash('sha256').update(token).digest('hex');
}

/**
* Centralized metrics counter for transport operations
*/
Expand All @@ -418,13 +432,17 @@ export class MetricsCounter {
private rollingHour: RollingWindowCounter;
private rolling3Hours: RollingWindowCounter;
private uniqueIps: Set<string>;
private clientIps: Map<string, Set<string>>; // Map of clientKey -> Set of IPs
private clientAuthHashes: Map<string, Set<string>>; // Map of clientKey -> Set of auth token hashes

constructor() {
this.metrics = createEmptyMetrics();
this.rollingMinute = new RollingWindowCounter(1);
this.rollingHour = new RollingWindowCounter(60);
this.rolling3Hours = new RollingWindowCounter(180);
this.uniqueIps = new Set();
this.clientIps = new Map();
this.clientAuthHashes = new Map();
}

/**
Expand Down Expand Up @@ -509,6 +527,70 @@ export class MetricsCounter {
}
}

/**
* Track an IP address for a specific client
*/
trackClientIpAddress(ipAddress: string | undefined, clientInfo?: { name: string; version: string }): void {
// Always track globally
this.trackIpAddress(ipAddress);

// Track per-client if client info is available
if (ipAddress && clientInfo) {
const clientKey = getClientKey(clientInfo.name, clientInfo.version);
const clientMetrics = this.metrics.clients.get(clientKey);

if (clientMetrics) {
// Get or create the IP set for this client
let clientIpSet = this.clientIps.get(clientKey);
if (!clientIpSet) {
clientIpSet = new Set();
this.clientIps.set(clientKey, clientIpSet);
}

// Check if this is a new IP for this client
const isNewIp = !clientIpSet.has(ipAddress);
if (isNewIp) {
clientIpSet.add(ipAddress);
clientMetrics.newIpCount++;
}
}
}
}

/**
* Track auth status for a specific client
*/
trackClientAuth(authToken: string | undefined, clientInfo?: { name: string; version: string }): void {
if (!clientInfo) return;

const clientKey = getClientKey(clientInfo.name, clientInfo.version);
const clientMetrics = this.metrics.clients.get(clientKey);

if (clientMetrics) {
if (!authToken) {
// Anonymous request
clientMetrics.anonCount++;
} else {
// Authenticated request - hash the token for privacy
const tokenHash = hashToken(authToken);

// Get or create the auth hash set for this client
let clientAuthSet = this.clientAuthHashes.get(clientKey);
if (!clientAuthSet) {
clientAuthSet = new Set();
this.clientAuthHashes.set(clientKey, clientAuthSet);
}

// Check if this is a new auth token for this client
const isNewAuth = !clientAuthSet.has(tokenHash);
if (isNewAuth) {
clientAuthSet.add(tokenHash);
clientMetrics.uniqueAuthCount++;
}
}
}
}

/**
* Update active connection count
*/
Expand Down Expand Up @@ -544,6 +626,9 @@ export class MetricsCounter {
activeConnections: 1,
totalConnections: 1,
toolCallCount: 0,
newIpCount: 0,
anonCount: 0,
uniqueAuthCount: 0,
};
this.metrics.clients.set(clientKey, clientMetrics);
} else {
Expand Down
Loading