diff --git a/package-lock.json b/package-lock.json index ca86eef..b15cbd4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,8 @@ "version": "0.0.0-automated", "license": "MIT", "dependencies": { - "buffer": "^6.0.3" + "buffer": "^6.0.3", + "nanoid": "^5.1.5" }, "devDependencies": { "@commitlint/cli": "^19.3.0", @@ -4261,6 +4262,23 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", "dev": true }, + "node_modules/nanoid": { + "version": "5.1.5", + "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-5.1.5.tgz", + "integrity": "sha512-Ir/+ZpE9fDsNH0hQ3C68uyThDXzYcim2EqcZ8zn8Chtt1iylPT9xXJB0kPCnqzgcEGikO9RxSrh63MsmVCU7Fw==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/ai" + } + ], + "bin": { + "nanoid": "bin/nanoid.js" + }, + "engines": { + "node": "^18 || >=20" + } + }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", diff --git a/package.json b/package.json index 0966592..3a9ea81 100644 --- a/package.json +++ b/package.json @@ -65,6 +65,7 @@ "webpack-cli": "^5.1.4" }, "dependencies": { - "buffer": "^6.0.3" + "buffer": "^6.0.3", + "nanoid": "^5.1.5" } } diff --git a/src/AnamClient.ts b/src/AnamClient.ts index 579d087..417993d 100644 --- a/src/AnamClient.ts +++ b/src/AnamClient.ts @@ -1,11 +1,13 @@ +import { nanoid } from 'nanoid'; +import { ClientError, ErrorCode } from './lib/ClientError'; import { - ClientError, + ClientMetricMeasurement, DEFAULT_ANAM_API_VERSION, DEFAULT_ANAM_METRICS_BASE_URL, - ErrorCode, - setErrorMetricsBaseUrl, - setCurrentSessionInfo, -} from './lib/ClientError'; + setClientMetricsBaseUrl, + sendClientMetric, + setMetricsContext, +} from './lib/ClientMetrics'; import { CoreApiRestClient, InternalEventEmitter, @@ -21,6 +23,7 @@ import { PersonaConfig, StartSessionOptions, StartSessionResponse, + ConnectionClosedCode, } from './types'; import { TalkMessageStream } from './types/TalkMessageStream'; import { Buffer } from 'buffer'; @@ -64,7 +67,7 @@ export default class AnamClient { this.clientOptions = options; if (options?.api?.baseUrl || options?.api?.apiVersion) { - setErrorMetricsBaseUrl( + setClientMetricsBaseUrl( options.api.baseUrl || DEFAULT_ANAM_METRICS_BASE_URL, options.api.apiVersion || DEFAULT_ANAM_API_VERSION, ); @@ -114,7 +117,9 @@ export default class AnamClient { if (sessionToken) { const decodedToken = this.decodeJwt(sessionToken); this.organizationId = decodedToken.accountId; - setCurrentSessionInfo(this.sessionId, this.organizationId); + setMetricsContext({ + organizationId: this.organizationId, + }); const tokenType = decodedToken.type?.toLowerCase(); @@ -187,6 +192,11 @@ export default class AnamClient { const { heartbeatIntervalSeconds, maxWsReconnectionAttempts, iceServers } = clientConfig; + this.sessionId = sessionId; + setMetricsContext({ + sessionId: this.sessionId, + }); + try { this.streamingClient = new StreamingClient( sessionId, @@ -212,11 +222,22 @@ export default class AnamClient { audioDeviceId: this.clientOptions?.audioDeviceId, disableInputAudio: this.clientOptions?.disableInputAudio, }, + metrics: { + showPeerConnectionStatsReport: + this.clientOptions?.metrics?.showPeerConnectionStatsReport ?? + false, + peerConnectionStatsReportOutputFormat: + this.clientOptions?.metrics + ?.peerConnectionStatsReportOutputFormat ?? 'console', + }, }, this.publicEventEmitter, this.internalEventEmitter, ); } catch (error) { + setMetricsContext({ + sessionId: null, + }); throw new ClientError( 'Failed to initialize streaming client', ErrorCode.CLIENT_ERROR_CODE_SERVER_ERROR, @@ -228,8 +249,6 @@ export default class AnamClient { ); } - this.sessionId = sessionId; - setCurrentSessionInfo(this.sessionId, this.organizationId); return sessionId; } @@ -253,15 +272,26 @@ export default class AnamClient { public async stream( userProvidedAudioStream?: MediaStream, ): Promise { + if (this._isStreaming) { + throw new Error('Already streaming'); + } + // generate a new ID here to track the attempt + const attemptCorrelationId = nanoid(); + setMetricsContext({ + attemptCorrelationId, + sessionId: null, // reset sessionId + }); + sendClientMetric( + ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_SESSION_ATTEMPT, + '1', + ); if (this.clientOptions?.disableInputAudio && userProvidedAudioStream) { console.warn( 'AnamClient:Input audio is disabled. User provided audio stream will be ignored.', ); } await this.startSessionIfNeeded(userProvidedAudioStream); - if (this._isStreaming) { - throw new Error('Already streaming'); - } + this._isStreaming = true; return new Promise((resolve) => { // set stream callbacks to capture the stream @@ -311,6 +341,16 @@ export default class AnamClient { videoElementId: string, userProvidedAudioStream?: MediaStream, ): Promise { + // generate a new ID here to track the attempt + const attemptCorrelationId = nanoid(); + setMetricsContext({ + attemptCorrelationId, + sessionId: null, // reset sessionId + }); + sendClientMetric( + ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_SESSION_ATTEMPT, + '1', + ); if (this.clientOptions?.disableInputAudio && userProvidedAudioStream) { console.warn( 'AnamClient:Input audio is disabled. User provided audio stream will be ignored.', @@ -371,10 +411,18 @@ export default class AnamClient { public async stopStreaming(): Promise { if (this.streamingClient) { - this.streamingClient.stopConnection(); + this.publicEventEmitter.emit( + AnamEvent.CONNECTION_CLOSED, + ConnectionClosedCode.NORMAL, + ); + await this.streamingClient.stopConnection(); this.streamingClient = null; this.sessionId = null; - setCurrentSessionInfo(null, this.organizationId); + setMetricsContext({ + attemptCorrelationId: null, + sessionId: null, + organizationId: this.organizationId, + }); this._isStreaming = false; } } diff --git a/src/lib/ClientError.ts b/src/lib/ClientError.ts index 06d39b5..85f0f65 100644 --- a/src/lib/ClientError.ts +++ b/src/lib/ClientError.ts @@ -1,4 +1,4 @@ -import { CLIENT_METADATA } from './constants'; +import { ClientMetricMeasurement, sendClientMetric } from './ClientMetrics'; export enum ErrorCode { CLIENT_ERROR_CODE_USAGE_LIMIT_REACHED = 'CLIENT_ERROR_CODE_USAGE_LIMIT_REACHED', @@ -12,75 +12,6 @@ export enum ErrorCode { CLIENT_ERROR_CODE_CONFIGURATION_ERROR = 'CLIENT_ERROR_CODE_CONFIGURATION_ERROR', } -export const DEFAULT_ANAM_METRICS_BASE_URL = 'https://api.anam.ai'; -export const DEFAULT_ANAM_API_VERSION = '/v1'; - -export enum ClientMetricMeasurement { - CLIENT_METRIC_MEASUREMENT_ERROR = 'client_error', - CLIENT_METRIC_MEASUREMENT_CONNECTION_CLOSED = 'client_connection_closed', - CLIENT_METRIC_MEASUREMENT_CONNECTION_ESTABLISHED = 'client_connection_established', -} - -let anamCurrentBaseUrl = DEFAULT_ANAM_METRICS_BASE_URL; -let anamCurrentApiVersion = DEFAULT_ANAM_API_VERSION; - -let currentSessionId: string | null = null; -let currentOrganizationId: string | null = null; - -export const setErrorMetricsBaseUrl = ( - baseUrl: string, - apiVersion: string = DEFAULT_ANAM_API_VERSION, -) => { - anamCurrentBaseUrl = baseUrl; - anamCurrentApiVersion = apiVersion; -}; - -export const setCurrentSessionInfo = ( - sessionId: string | null, - organizationId: string | null, -) => { - currentSessionId = sessionId; - currentOrganizationId = organizationId; -}; - -export const sendErrorMetric = async ( - name: string, - value: string, - tags?: Record, -) => { - try { - const metricTags: Record = { - ...CLIENT_METADATA, - ...tags, - }; - - // Add session and organization IDs if available - if (currentSessionId) { - metricTags.sessionId = currentSessionId; - } - if (currentOrganizationId) { - metricTags.organizationId = currentOrganizationId; - } - - await fetch( - `${anamCurrentBaseUrl}${anamCurrentApiVersion}/metrics/client`, - { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - name, - value, - tags: metricTags, - }), - }, - ); - } catch (error) { - console.error('Failed to send error metric:', error); - } -}; - export class ClientError extends Error { code: ErrorCode; statusCode: number; @@ -101,7 +32,7 @@ export class ClientError extends Error { Object.setPrototypeOf(this, ClientError.prototype); // Send error metric when error is created - sendErrorMetric( + sendClientMetric( ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_ERROR, code, { diff --git a/src/lib/ClientMetrics.ts b/src/lib/ClientMetrics.ts new file mode 100644 index 0000000..2c9b71e --- /dev/null +++ b/src/lib/ClientMetrics.ts @@ -0,0 +1,394 @@ +import { CLIENT_METADATA } from './constants'; + +export const DEFAULT_ANAM_METRICS_BASE_URL = 'https://api.anam.ai'; +export const DEFAULT_ANAM_API_VERSION = '/v1'; + +export enum ClientMetricMeasurement { + CLIENT_METRIC_MEASUREMENT_ERROR = 'client_error', + CLIENT_METRIC_MEASUREMENT_CONNECTION_CLOSED = 'client_connection_closed', + CLIENT_METRIC_MEASUREMENT_CONNECTION_ESTABLISHED = 'client_connection_established', + CLIENT_METRIC_MEASUREMENT_SESSION_ATTEMPT = 'client_session_attempt', + CLIENT_METRIC_MEASUREMENT_SESSION_SUCCESS = 'client_session_success', +} + +let anamCurrentBaseUrl = DEFAULT_ANAM_METRICS_BASE_URL; +let anamCurrentApiVersion = DEFAULT_ANAM_API_VERSION; + +export const setClientMetricsBaseUrl = ( + baseUrl: string, + apiVersion: string = DEFAULT_ANAM_API_VERSION, +) => { + anamCurrentBaseUrl = baseUrl; + anamCurrentApiVersion = apiVersion; +}; + +export interface AnamMetricsContext { + sessionId: string | null; + organizationId: string | null; + attemptCorrelationId: string | null; +} + +let anamMetricsContext: AnamMetricsContext = { + sessionId: null, + organizationId: null, + attemptCorrelationId: null, +}; + +export const setMetricsContext = (context: Partial) => { + anamMetricsContext = { ...anamMetricsContext, ...context }; +}; + +export const sendClientMetric = async ( + name: string, + value: string, + tags?: Record, +) => { + try { + const metricTags: Record = { + ...CLIENT_METADATA, + ...tags, + }; + + // Add session and organization IDs if available + if (anamMetricsContext.sessionId) { + metricTags.sessionId = anamMetricsContext.sessionId; + } + if (anamMetricsContext.organizationId) { + metricTags.organizationId = anamMetricsContext.organizationId; + } + if (anamMetricsContext.attemptCorrelationId) { + metricTags.attemptCorrelationId = anamMetricsContext.attemptCorrelationId; + } + + await fetch( + `${anamCurrentBaseUrl}${anamCurrentApiVersion}/metrics/client`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + name, + value, + tags: metricTags, + }), + }, + ); + } catch (error) { + console.error('Failed to send error metric:', error); + } +}; + +export interface RTCStatsJsonReport { + personaVideoStream?: { + framesReceived: number | string; + framesDropped: number | string; + framesPerSecond: number | string; + packetsReceived: number | string; + packetsLost: number | string; + resolution?: string; + jitter?: number; + }[]; + personaAudioStream?: { + packetsReceived: number | string; + packetsLost: number | string; + audioLevel: number | string; + jitter?: number; + totalAudioEnergy?: number; + }[]; + userAudioInput?: { + packetsSent: number | string; + retransmittedPackets?: number; + avgPacketSendDelay?: number; + }[]; + codecs?: { + status: string; + mimeType: string; + payloadType: string | number; + clockRate?: number; + channels?: number; + }[]; + transportLayer?: { + dtlsState: string; + iceState: string; + bytesSent?: number; + bytesReceived?: number; + }[]; + issues: string[]; +} + +export const createRTCStatsReport = ( + stats: RTCStatsReport, + outputFormat: 'console' | 'json' = 'console', +): RTCStatsJsonReport | void => { + /** + * constructs a report of the RTC stats for logging to the console or returns as JSON + */ + + // Collect stats by type for organized reporting + const statsByType: Record = {}; + + stats.forEach((report) => { + if (!statsByType[report.type]) { + statsByType[report.type] = []; + } + statsByType[report.type].push(report); + }); + + // Initialize JSON report structure + const jsonReport: RTCStatsJsonReport = { + issues: [], + }; + + // Build video statistics (Persona video output) + const inboundVideo = + statsByType['inbound-rtp']?.filter((r) => r.kind === 'video') || []; + if (inboundVideo.length > 0) { + jsonReport.personaVideoStream = []; + + inboundVideo.forEach((report) => { + const videoData = { + framesReceived: report.framesReceived ?? 'unknown', + framesDropped: report.framesDropped ?? 'unknown', + framesPerSecond: report.framesPerSecond ?? 'unknown', + packetsReceived: report.packetsReceived ?? 'unknown', + packetsLost: report.packetsLost ?? 'unknown', + resolution: + report.frameWidth && report.frameHeight + ? `${report.frameWidth}x${report.frameHeight}` + : undefined, + jitter: report.jitter !== undefined ? report.jitter : undefined, + }; + + jsonReport.personaVideoStream!.push(videoData); + }); + } + + // Build audio statistics (Persona audio output) + const inboundAudio = + statsByType['inbound-rtp']?.filter((r) => r.kind === 'audio') || []; + if (inboundAudio.length > 0) { + jsonReport.personaAudioStream = []; + + inboundAudio.forEach((report) => { + const audioData = { + packetsReceived: report.packetsReceived ?? 'unknown', + packetsLost: report.packetsLost ?? 'unknown', + audioLevel: report.audioLevel ?? 'unknown', + jitter: report.jitter !== undefined ? report.jitter : undefined, + totalAudioEnergy: + report.totalAudioEnergy !== undefined + ? report.totalAudioEnergy + : undefined, + }; + + jsonReport.personaAudioStream!.push(audioData); + }); + } + + // Build user audio input statistics + const outboundAudio = + statsByType['outbound-rtp']?.filter((r) => r.kind === 'audio') || []; + if (outboundAudio.length > 0) { + jsonReport.userAudioInput = []; + + outboundAudio.forEach((report) => { + const userAudioData = { + packetsSent: report.packetsSent ?? 'unknown', + retransmittedPackets: report.retransmittedPacketsSent ?? undefined, + avgPacketSendDelay: + report.totalPacketSendDelay !== undefined + ? (report.totalPacketSendDelay / (report.packetsSent || 1)) * 1000 + : undefined, + }; + + jsonReport.userAudioInput!.push(userAudioData); + }); + } + + // Build codec information + if (statsByType['codec']) { + jsonReport.codecs = []; + + statsByType['codec'].forEach((report) => { + const codecData = { + status: report.payloadType ? 'Active' : 'Available', + mimeType: report.mimeType || 'Unknown', + payloadType: report.payloadType || 'N/A', + clockRate: report.clockRate || undefined, + channels: report.channels || undefined, + }; + + jsonReport.codecs!.push(codecData); + }); + } + + // Build transport layer information + if (statsByType['transport']) { + jsonReport.transportLayer = []; + + statsByType['transport'].forEach((report) => { + const transportData = { + dtlsState: report.dtlsState || 'unknown', + iceState: report.iceState || 'unknown', + bytesSent: report.bytesSent || undefined, + bytesReceived: report.bytesReceived || undefined, + }; + + jsonReport.transportLayer!.push(transportData); + }); + } + + // Build issues summary + const issues: string[] = []; + + // Check for video issues + inboundVideo.forEach((report) => { + if (typeof report.framesDropped === 'number' && report.framesDropped > 0) { + issues.push(`Video: ${report.framesDropped} frames dropped`); + } + if (typeof report.packetsLost === 'number' && report.packetsLost > 0) { + issues.push(`Video: ${report.packetsLost} packets lost`); + } + if ( + typeof report.framesPerSecond === 'number' && + report.framesPerSecond < 23 + ) { + issues.push(`Video: Low frame rate (${report.framesPerSecond} fps)`); + } + }); + + // Check for audio issues + inboundAudio.forEach((report) => { + if (typeof report.packetsLost === 'number' && report.packetsLost > 0) { + issues.push(`Audio: ${report.packetsLost} packets lost`); + } + if (typeof report.jitter === 'number' && report.jitter > 0.1) { + issues.push( + `Audio: High jitter (${(report.jitter * 1000).toFixed(1)}ms)`, + ); + } + }); + + jsonReport.issues = issues; + + // Return JSON if requested + if (outputFormat === 'json') { + return jsonReport; + } + + // Generate console output from JSON report + console.group('📊 WebRTC Session Statistics Report'); + + // Console output for video stream + if ( + jsonReport.personaVideoStream && + jsonReport.personaVideoStream.length > 0 + ) { + console.group('📹 Persona Video Stream (Inbound)'); + jsonReport.personaVideoStream.forEach((videoData) => { + console.log(`Frames Received: ${videoData.framesReceived}`); + console.log(`Frames Dropped: ${videoData.framesDropped}`); + console.log(`Frames Per Second: ${videoData.framesPerSecond}`); + console.log( + `Packets Received: ${typeof videoData.packetsReceived === 'number' ? videoData.packetsReceived.toLocaleString() : videoData.packetsReceived}`, + ); + console.log(`Packets Lost: ${videoData.packetsLost}`); + if (videoData.resolution) { + console.log(`Resolution: ${videoData.resolution}`); + } + if (videoData.jitter !== undefined) { + console.log(`Jitter: ${videoData.jitter.toFixed(5)}ms`); + } + }); + console.groupEnd(); + } + + // Console output for audio stream + if ( + jsonReport.personaAudioStream && + jsonReport.personaAudioStream.length > 0 + ) { + console.group('🔊 Persona Audio Stream (Inbound)'); + jsonReport.personaAudioStream.forEach((audioData) => { + console.log( + `Packets Received: ${typeof audioData.packetsReceived === 'number' ? audioData.packetsReceived.toLocaleString() : audioData.packetsReceived}`, + ); + console.log(`Packets Lost: ${audioData.packetsLost}`); + console.log(`Audio Level: ${audioData.audioLevel}`); + if (audioData.jitter !== undefined) { + console.log(`Jitter: ${audioData.jitter.toFixed(5)}ms`); + } + if (audioData.totalAudioEnergy !== undefined) { + console.log( + `Total Audio Energy: ${audioData.totalAudioEnergy.toFixed(6)}`, + ); + } + }); + console.groupEnd(); + } + + // Console output for user audio input + if (jsonReport.userAudioInput && jsonReport.userAudioInput.length > 0) { + console.group('🎤 User Audio Input (Outbound)'); + jsonReport.userAudioInput.forEach((userAudioData) => { + console.log( + `Packets Sent: ${typeof userAudioData.packetsSent === 'number' ? userAudioData.packetsSent.toLocaleString() : userAudioData.packetsSent}`, + ); + if (userAudioData.retransmittedPackets) { + console.log( + `Retransmitted Packets: ${userAudioData.retransmittedPackets}`, + ); + } + if (userAudioData.avgPacketSendDelay !== undefined) { + console.log( + `Avg Packet Send Delay: ${userAudioData.avgPacketSendDelay.toFixed(5)}ms`, + ); + } + }); + console.groupEnd(); + } + + // Console output for codecs + if (jsonReport.codecs && jsonReport.codecs.length > 0) { + console.group('🔧 Codecs Used'); + jsonReport.codecs.forEach((codecData) => { + console.log( + `${codecData.status} ${codecData.mimeType} - Payload Type: ${codecData.payloadType}`, + ); + if (codecData.clockRate) { + console.log(` Clock Rate: ${codecData.clockRate}Hz`); + } + if (codecData.channels) { + console.log(` Channels: ${codecData.channels}`); + } + }); + console.groupEnd(); + } + + // Console output for transport layer + if (jsonReport.transportLayer && jsonReport.transportLayer.length > 0) { + console.group('🚚 Transport Layer'); + jsonReport.transportLayer.forEach((transportData) => { + console.log(`DTLS State: ${transportData.dtlsState}`); + console.log(`ICE State: ${transportData.iceState}`); + if (transportData.bytesReceived || transportData.bytesSent) { + console.log( + `Data Transfer (bytes) - Sent: ${(transportData.bytesSent || 0).toLocaleString()}, Received: ${(transportData.bytesReceived || 0).toLocaleString()}`, + ); + } + }); + console.groupEnd(); + } + + // Console output for issues + if (jsonReport.issues.length > 0) { + console.group('⚠️ Potential Issues Detected'); + jsonReport.issues.forEach((issue) => console.warn(issue)); + console.groupEnd(); + } else { + console.log('✅ No significant issues detected'); + } + + console.groupEnd(); +}; diff --git a/src/lib/constants.ts b/src/lib/constants.ts index 0144f76..099be98 100644 --- a/src/lib/constants.ts +++ b/src/lib/constants.ts @@ -6,15 +6,6 @@ export const DEFAULT_HEADERS = { export const DEFAULT_API_BASE_URL = 'https://api.anam.ai'; export const DEFAULT_API_VERSION = '/v1'; // include the leading slash -// Connection closed codes -export const CONNECTION_CLOSED_CODE_NORMAL = 'CONNECTION_CLOSED_CODE_NORMAL'; -export const CONNECTION_CLOSED_CODE_MICROPHONE_PERMISSION_DENIED = - 'CONNECTION_CLOSED_CODE_MICROPHONE_PERMISSION_DENIED'; -export const CONNECTION_CLOSED_CODE_SIGNALLING_CLIENT_CONNECTION_FAILURE = - 'CONNECTION_CLOSED_CODE_SIGNALLING_CLIENT_CONNECTION_FAILURE'; -export const CONNECTION_CLOSED_CODE_WEBRTC_FAILURE = - 'CONNECTION_CLOSED_CODE_WEBRTC_FAILURE'; - export const CLIENT_METADATA = { client: 'js-sdk', version: '0.0.0-automated', diff --git a/src/modules/MessageHistoryClient.ts b/src/modules/MessageHistoryClient.ts index fe53c0e..09d7af3 100644 --- a/src/modules/MessageHistoryClient.ts +++ b/src/modules/MessageHistoryClient.ts @@ -6,7 +6,7 @@ import { InternalEvent, AnamEvent, } from '../types'; -import { PublicEventEmitter, InternalEventEmitter } from '../modules'; +import { PublicEventEmitter, InternalEventEmitter } from '.'; export class MessageHistoryClient { private publicEventEmitter: PublicEventEmitter; private internalEventEmitter: InternalEventEmitter; diff --git a/src/modules/PublicEventEmitter.ts b/src/modules/PublicEventEmitter.ts index 913282a..c267c71 100644 --- a/src/modules/PublicEventEmitter.ts +++ b/src/modules/PublicEventEmitter.ts @@ -1,4 +1,7 @@ -import { ClientMetricMeasurement, sendErrorMetric } from '../lib/ClientError'; +import { + ClientMetricMeasurement, + sendClientMetric, +} from '../lib/ClientMetrics'; import { AnamEvent, EventCallbacks } from '../types'; export class PublicEventEmitter { @@ -33,16 +36,18 @@ export class PublicEventEmitter { ...args: EventCallbacks[K] extends (...args: infer P) => any ? P : never ): void { if (event === AnamEvent.CONNECTION_ESTABLISHED) { - sendErrorMetric( + sendClientMetric( ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_CONNECTION_ESTABLISHED, '1', ); } if (event === AnamEvent.CONNECTION_CLOSED) { - sendErrorMetric( + const [closeCode, details] = args; + sendClientMetric( ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_CONNECTION_CLOSED, - args[0] as string, + closeCode as string, + details ? { details: details as string } : undefined, ); } diff --git a/src/modules/SignallingClient.ts b/src/modules/SignallingClient.ts index 573a661..a663308 100644 --- a/src/modules/SignallingClient.ts +++ b/src/modules/SignallingClient.ts @@ -1,11 +1,11 @@ -import { CONNECTION_CLOSED_CODE_SIGNALLING_CLIENT_CONNECTION_FAILURE } from '../lib/constants'; -import { InternalEventEmitter, PublicEventEmitter } from '../modules'; +import { InternalEventEmitter, PublicEventEmitter } from '.'; import { AnamEvent, InternalEvent, SignalMessage, SignalMessageAction, SignallingClientOptions, + ConnectionClosedCode, } from '../types'; import { TalkMessageStreamPayload } from '../types/signalling/TalkMessageStreamPayload'; @@ -146,7 +146,7 @@ export class SignallingClient { console.error('SignallingClient - onOpen: error in onOpen', e); this.publicEventEmitter.emit( AnamEvent.CONNECTION_CLOSED, - CONNECTION_CLOSED_CODE_SIGNALLING_CLIENT_CONNECTION_FAILURE, + ConnectionClosedCode.SIGNALLING_CLIENT_CONNECTION_FAILURE, ); } } @@ -168,7 +168,7 @@ export class SignallingClient { } this.publicEventEmitter.emit( AnamEvent.CONNECTION_CLOSED, - CONNECTION_CLOSED_CODE_SIGNALLING_CLIENT_CONNECTION_FAILURE, + ConnectionClosedCode.SIGNALLING_CLIENT_CONNECTION_FAILURE, ); } } diff --git a/src/modules/StreamingClient.ts b/src/modules/StreamingClient.ts index f808fc6..138ff8a 100644 --- a/src/modules/StreamingClient.ts +++ b/src/modules/StreamingClient.ts @@ -1,8 +1,3 @@ -import { - CONNECTION_CLOSED_CODE_MICROPHONE_PERMISSION_DENIED, - CONNECTION_CLOSED_CODE_NORMAL, - CONNECTION_CLOSED_CODE_WEBRTC_FAILURE, -} from '../lib/constants'; import { EngineApiRestClient, InternalEventEmitter, @@ -17,10 +12,17 @@ import { SignalMessageAction, StreamingClientOptions, WebRtcTextMessageEvent, + ConnectionClosedCode, } from '../types'; import { TalkMessageStream } from '../types/TalkMessageStream'; import { TalkStreamInterruptedSignalMessage } from '../types/signalling/TalkStreamInterruptedSignalMessage'; +import { + ClientMetricMeasurement, + createRTCStatsReport, + sendClientMetric, +} from '../lib/ClientMetrics'; +const SUCCESS_METRIC_POLLING_TIMEOUT_MS = 15000; // After this time we will stop polling for the first frame and consider the session a failure. export class StreamingClient { private publicEventEmitter: PublicEventEmitter; private internalEventEmitter: InternalEventEmitter; @@ -38,6 +40,10 @@ export class StreamingClient { private inputAudioState: InputAudioState = { isMuted: false }; private audioDeviceId: string | undefined; private disableInputAudio: boolean; + private successMetricPoller: ReturnType | null = null; + private successMetricFired = false; + private showPeerConnectionStatsReport: boolean = false; + private peerConnectionStatsReportOutputFormat: 'console' | 'json' = 'console'; constructor( sessionId: string, @@ -78,6 +84,10 @@ export class StreamingClient { sessionId, ); this.audioDeviceId = options.inputAudio.audioDeviceId; + this.showPeerConnectionStatsReport = + options.metrics?.showPeerConnectionStatsReport ?? false; + this.peerConnectionStatsReportOutputFormat = + options.metrics?.peerConnectionStatsReportOutputFormat ?? 'console'; } private onInputAudioStateChange( @@ -106,6 +116,80 @@ export class StreamingClient { }); } + private startSuccessMetricPolling() { + if (this.successMetricPoller || this.successMetricFired) { + return; + } + + const timeoutId = setTimeout(() => { + if (this.successMetricPoller) { + console.warn( + 'No video frames received, there is a problem with the connection.', + ); + clearInterval(this.successMetricPoller); + this.successMetricPoller = null; + } + }, SUCCESS_METRIC_POLLING_TIMEOUT_MS); + + this.successMetricPoller = setInterval(async () => { + if (!this.peerConnection || this.successMetricFired) { + if (this.successMetricPoller) { + clearInterval(this.successMetricPoller); + } + clearTimeout(timeoutId); + return; + } + + try { + const stats = await this.peerConnection.getStats(); + + let videoDetected = false; + let detectionMethod = null; + + stats.forEach((report) => { + // Find the report for inbound video + if (report.type === 'inbound-rtp' && report.kind === 'video') { + // Method 1: Try framesDecoded (most reliable when available) + if ( + report.framesDecoded !== undefined && + report.framesDecoded > 0 + ) { + videoDetected = true; + detectionMethod = 'framesDecoded'; + } else if ( + report.framesReceived !== undefined && + report.framesReceived > 0 + ) { + videoDetected = true; + detectionMethod = 'framesReceived'; + } else if ( + report.bytesReceived > 0 && + report.packetsReceived > 0 && + // Additional check: ensure we've received enough data for actual video + report.bytesReceived > 100000 // rough threshold + ) { + videoDetected = true; + detectionMethod = 'bytesReceived'; + } + } + }); + if (videoDetected && !this.successMetricFired) { + this.successMetricFired = true; + sendClientMetric( + ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_SESSION_SUCCESS, + '1', + detectionMethod ? { detectionMethod } : undefined, + ); + if (this.successMetricPoller) { + clearInterval(this.successMetricPoller); + } + clearTimeout(timeoutId); + this.successMetricPoller = null; + } + } catch (error) {} + }, 500); + } + public muteInputAudio(): InputAudioState { const oldAudioState: InputAudioState = this.inputAudioState; const newAudioState: InputAudioState = { @@ -183,8 +267,8 @@ export class StreamingClient { } } - public stopConnection() { - this.shutdown(); + public async stopConnection() { + await this.shutdown(); } public async sendTalkCommand(content: string): Promise { @@ -265,7 +349,8 @@ export class StreamingClient { console.log('StreamingClient - onSignalMessage: reason', reason); this.publicEventEmitter.emit( AnamEvent.CONNECTION_CLOSED, - CONNECTION_CLOSED_CODE_NORMAL, + ConnectionClosedCode.SERVER_CLOSED_CONNECTION, + reason, ); // close the peer connection this.shutdown(); @@ -352,12 +437,12 @@ export class StreamingClient { if (err.name === 'NotAllowedError' && err.message === 'Permission denied') { this.publicEventEmitter.emit( AnamEvent.CONNECTION_CLOSED, - CONNECTION_CLOSED_CODE_MICROPHONE_PERMISSION_DENIED, + ConnectionClosedCode.MICROPHONE_PERMISSION_DENIED, ); } else { this.publicEventEmitter.emit( AnamEvent.CONNECTION_CLOSED, - CONNECTION_CLOSED_CODE_WEBRTC_FAILURE, + ConnectionClosedCode.WEBRTC_FAILURE, ); } @@ -373,6 +458,9 @@ export class StreamingClient { private onTrackEventHandler(event: RTCTrackEvent) { if (event.track.kind === 'video') { + // start polling stats to detect successful video data received + this.startSuccessMetricPolling(); + this.videoStream = event.streams[0]; this.publicEventEmitter.emit( AnamEvent.VIDEO_STREAM_STARTED, @@ -384,6 +472,14 @@ export class StreamingClient { // unregister the callback after the first frame this.videoElement?.cancelVideoFrameCallback(handle); this.publicEventEmitter.emit(AnamEvent.VIDEO_PLAY_STARTED); + if (!this.successMetricFired) { + this.successMetricFired = true; + sendClientMetric( + ClientMetricMeasurement.CLIENT_METRIC_MEASUREMENT_SESSION_SUCCESS, + '1', + { detectionMethod: 'videoElement' }, + ); + } }); } } else if (event.track.kind === 'audio') { @@ -501,7 +597,27 @@ export class StreamingClient { await this.signallingClient.sendOffer(this.peerConnection.localDescription); } - private shutdown() { + private async shutdown() { + if (this.showPeerConnectionStatsReport) { + const stats = await this.peerConnection?.getStats(); + if (stats) { + const report = createRTCStatsReport( + stats, + this.peerConnectionStatsReportOutputFormat, + ); + if (report) { + console.log(report, undefined, 2); + } + } + } + // reset video frame polling + if (this.successMetricPoller) { + clearInterval(this.successMetricPoller); + this.successMetricPoller = null; + } + this.successMetricFired = false; + + // stop the input audio stream try { if (this.inputAudioStream) { this.inputAudioStream.getTracks().forEach((track) => { @@ -515,6 +631,8 @@ export class StreamingClient { error, ); } + + // stop the signalling client try { this.signallingClient.stop(); } catch (error) { @@ -523,6 +641,8 @@ export class StreamingClient { error, ); } + + // close the peer connection try { if ( this.peerConnection && diff --git a/src/types/AnamPublicClientOptions.ts b/src/types/AnamPublicClientOptions.ts index a88366e..1368ef2 100644 --- a/src/types/AnamPublicClientOptions.ts +++ b/src/types/AnamPublicClientOptions.ts @@ -5,4 +5,8 @@ export interface AnamPublicClientOptions { voiceDetection?: VoiceDetectionOptions; audioDeviceId?: string; disableInputAudio?: boolean; + metrics?: { + showPeerConnectionStatsReport?: boolean; + peerConnectionStatsReportOutputFormat?: 'console' | 'json'; + }; } diff --git a/src/types/events/index.ts b/src/types/events/index.ts index e748075..73576fd 100644 --- a/src/types/events/index.ts +++ b/src/types/events/index.ts @@ -3,3 +3,4 @@ export type { EventCallbacks } from './public/EventCallbacks'; export { InternalEvent } from './internal/InternalEvent'; export type { InternalEventCallbacks } from './internal/InternalEventCallbacks'; export type { EventCallback } from './EventCallback'; +export { ConnectionClosedCode } from './public/ConnectionClosedCodes'; diff --git a/src/types/events/public/ConnectionClosedCodes.ts b/src/types/events/public/ConnectionClosedCodes.ts new file mode 100644 index 0000000..a1f3736 --- /dev/null +++ b/src/types/events/public/ConnectionClosedCodes.ts @@ -0,0 +1,7 @@ +export enum ConnectionClosedCode { + NORMAL = 'CONNECTION_CLOSED_CODE_NORMAL', + MICROPHONE_PERMISSION_DENIED = 'CONNECTION_CLOSED_CODE_MICROPHONE_PERMISSION_DENIED', + SIGNALLING_CLIENT_CONNECTION_FAILURE = 'CONNECTION_CLOSED_CODE_SIGNALLING_CLIENT_CONNECTION_FAILURE', + WEBRTC_FAILURE = 'CONNECTION_CLOSED_CODE_WEBRTC_FAILURE', + SERVER_CLOSED_CONNECTION = 'CONNECTION_CLOSED_CODE_SERVER_CLOSED_CONNECTION', +} diff --git a/src/types/events/public/EventCallbacks.ts b/src/types/events/public/EventCallbacks.ts index 74b73c3..ff24c40 100644 --- a/src/types/events/public/EventCallbacks.ts +++ b/src/types/events/public/EventCallbacks.ts @@ -1,3 +1,4 @@ +import { ConnectionClosedCode } from './ConnectionClosedCodes'; import { Message, MessageStreamEvent, AnamEvent } from '../../index'; export type EventCallbacks = { @@ -6,7 +7,10 @@ export type EventCallbacks = { messageEvent: MessageStreamEvent, ) => void; [AnamEvent.CONNECTION_ESTABLISHED]: () => void; - [AnamEvent.CONNECTION_CLOSED]: (reason: string) => void; + [AnamEvent.CONNECTION_CLOSED]: ( + reason: ConnectionClosedCode, + details?: string, + ) => void; [AnamEvent.INPUT_AUDIO_STREAM_STARTED]: (audioStream: MediaStream) => void; [AnamEvent.VIDEO_STREAM_STARTED]: (videoStream: MediaStream) => void; [AnamEvent.VIDEO_PLAY_STARTED]: () => void; diff --git a/src/types/index.ts b/src/types/index.ts index e017fa5..da22b65 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -10,3 +10,4 @@ export { MessageRole } from './messageHistory'; // need to export this explicitl export type * from './events'; export { AnamEvent } from './events'; // need to export this explicitly to avoid enum import issues export { InternalEvent } from './events'; // need to export this explicitly to avoid enum import issues +export { ConnectionClosedCode } from './events'; // need to export this explicitly to avoid enum import issues diff --git a/src/types/streaming/StreamingClientOptions.ts b/src/types/streaming/StreamingClientOptions.ts index 0126c90..e2a564d 100644 --- a/src/types/streaming/StreamingClientOptions.ts +++ b/src/types/streaming/StreamingClientOptions.ts @@ -7,4 +7,8 @@ export interface StreamingClientOptions { signalling: SignallingClientOptions; iceServers: RTCIceServer[]; inputAudio: InputAudioOptions; + metrics?: { + showPeerConnectionStatsReport?: boolean; + peerConnectionStatsReportOutputFormat?: 'console' | 'json'; + }; }