diff --git a/CHANGELOG.md b/CHANGELOG.md index 338ec90..90b3a91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Change Log +## 21.3.0 + +* Add new `Realtime` service with methods for subscribing to channels and receiving messages +* Fix `client.setSession` not working when using realtime +* Deprecate `client.subscribe` method in favor of `Realtime` service + +> Note: Deprecated methods are still available for backwards compatibility, but might be removed in future versions. + ## 21.2.1 * Add transaction support for Databases and TablesDB diff --git a/README.md b/README.md index 333d69f..0374158 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ import { Client, Account } from "appwrite"; To install with a CDN (content delivery network) add the following scripts to the bottom of your tag, but before you use any Appwrite services: ```html - + ``` diff --git a/package.json b/package.json index ace52de..a37763b 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "appwrite", "homepage": "https://appwrite.io/support", "description": "Appwrite is an open-source self-hosted backend server that abstract and simplify complex and repetitive development tasks behind a very simple REST API", - "version": "21.2.1", + "version": "21.3.0", "license": "BSD-3-Clause", "main": "dist/cjs/sdk.js", "exports": { diff --git a/src/client.ts b/src/client.ts index 0bdbc46..afa25ba 100644 --- a/src/client.ts +++ b/src/client.ts @@ -300,7 +300,11 @@ class Client { /** * Holds configuration such as project. */ - config = { + config: { + endpoint: string; + endpointRealtime: string; + [key: string]: string | undefined; + } = { endpoint: 'https://cloud.appwrite.io/v1', endpointRealtime: '', project: '', @@ -316,7 +320,7 @@ class Client { 'x-sdk-name': 'Web', 'x-sdk-platform': 'client', 'x-sdk-language': 'web', - 'x-sdk-version': '21.2.1', + 'x-sdk-version': '21.3.0', 'X-Appwrite-Response-Format': '1.8.0', }; @@ -473,7 +477,9 @@ class Client { } const channels = new URLSearchParams(); - channels.set('project', this.config.project); + if (this.config.project) { + channels.set('project', this.config.project); + } this.realtime.channels.forEach(channel => { channels.append('channels[]', channel); }); @@ -528,10 +534,13 @@ class Client { this.realtime.lastMessage = message; switch (message.type) { case 'connected': - const cookie = JSON.parse(window.localStorage.getItem('cookieFallback') ?? '{}'); - const session = cookie?.[`a_session_${this.config.project}`]; - const messageData = message.data; + let session = this.config.session; + if (!session) { + const cookie = JSON.parse(window.localStorage.getItem('cookieFallback') ?? '{}'); + session = cookie?.[`a_session_${this.config.project}`]; + } + const messageData = message.data; if (session && !messageData.user) { this.realtime.socket?.send(JSON.stringify({ type: 'authentication', @@ -582,6 +591,9 @@ class Client { /** * Subscribes to Appwrite events and passes you the payload in realtime. * + * @deprecated Use the Realtime service instead. + * @see Realtime + * * @param {string|string[]} channels * Channel to subscribe - pass a single channel as a string or multiple with an array of strings. * diff --git a/src/index.ts b/src/index.ts index 8cae5d3..d6b8acd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,6 +16,7 @@ export { Messaging } from './services/messaging'; export { Storage } from './services/storage'; export { TablesDB } from './services/tables-db'; export { Teams } from './services/teams'; +export { Realtime } from './services/realtime'; export type { Models, Payload, RealtimeResponseEvent, UploadProgress } from './client'; export type { QueryTypes, QueryTypesList } from './query'; export { Permission } from './permission'; diff --git a/src/services/realtime.ts b/src/services/realtime.ts new file mode 100644 index 0000000..7c1c403 --- /dev/null +++ b/src/services/realtime.ts @@ -0,0 +1,437 @@ +import { AppwriteException, Client } from '../client'; + +export type RealtimeSubscription = { + close: () => Promise; +} + +export type RealtimeCallback = { + channels: Set; + callback: (event: RealtimeResponseEvent) => void; +} + +export type RealtimeResponse = { + type: string; + data?: any; +} + +export type RealtimeResponseEvent = { + events: string[]; + channels: string[]; + timestamp: string; + payload: T; +} + +export type RealtimeResponseConnected = { + channels: string[]; + user?: object; +} + +export type RealtimeRequest = { + type: 'authentication'; + data: { + session: string; + }; +} + +export enum RealtimeCode { + NORMAL_CLOSURE = 1000, + POLICY_VIOLATION = 1008, + UNKNOWN_ERROR = -1 +} + +export class Realtime { + private readonly TYPE_ERROR = 'error'; + private readonly TYPE_EVENT = 'event'; + private readonly TYPE_PONG = 'pong'; + private readonly TYPE_CONNECTED = 'connected'; + private readonly DEBOUNCE_MS = 1; + private readonly HEARTBEAT_INTERVAL = 20000; // 20 seconds in milliseconds + + private client: Client; + private socket?: WebSocket; + private activeChannels = new Set(); + private activeSubscriptions = new Map>(); + private heartbeatTimer?: number; + + private subCallDepth = 0; + private reconnectAttempts = 0; + private subscriptionsCounter = 0; + private reconnect = true; + + private onErrorCallbacks: Array<(error?: Error, statusCode?: number) => void> = []; + private onCloseCallbacks: Array<() => void> = []; + private onOpenCallbacks: Array<() => void> = []; + + constructor(client: Client) { + this.client = client; + } + + /** + * Register a callback function to be called when an error occurs + * + * @param {Function} callback - Callback function to handle errors + * @returns {void} + */ + public onError(callback: (error?: Error, statusCode?: number) => void): void { + this.onErrorCallbacks.push(callback); + } + + /** + * Register a callback function to be called when the connection closes + * + * @param {Function} callback - Callback function to handle connection close + * @returns {void} + */ + public onClose(callback: () => void): void { + this.onCloseCallbacks.push(callback); + } + + /** + * Register a callback function to be called when the connection opens + * + * @param {Function} callback - Callback function to handle connection open + * @returns {void} + */ + public onOpen(callback: () => void): void { + this.onOpenCallbacks.push(callback); + } + + private startHeartbeat(): void { + this.stopHeartbeat(); + this.heartbeatTimer = window.setInterval(() => { + if (this.socket && this.socket.readyState === WebSocket.OPEN) { + this.socket.send(JSON.stringify({ type: 'ping' })); + } + }, this.HEARTBEAT_INTERVAL); + } + + private stopHeartbeat(): void { + if (this.heartbeatTimer) { + window.clearInterval(this.heartbeatTimer); + this.heartbeatTimer = undefined; + } + } + + private async createSocket(): Promise { + if (this.activeChannels.size === 0) { + this.reconnect = false; + await this.closeSocket(); + return; + } + + const projectId = this.client.config.project; + if (!projectId) { + throw new AppwriteException('Missing project ID'); + } + + let queryParams = `project=${projectId}`; + for (const channel of this.activeChannels) { + queryParams += `&channels[]=${encodeURIComponent(channel)}`; + } + + const endpoint = + this.client.config.endpointRealtime !== '' + ? this.client.config.endpointRealtime + : this.client.config.endpoint || ''; + const realtimeEndpoint = endpoint + .replace('https://', 'wss://') + .replace('http://', 'ws://'); + const url = `${realtimeEndpoint}/realtime?${queryParams}`; + + if (this.socket) { + this.reconnect = false; + await this.closeSocket(); + } + + return new Promise((resolve, reject) => { + try { + this.socket = new WebSocket(url); + + this.socket.addEventListener('open', () => { + this.reconnectAttempts = 0; + this.onOpenCallbacks.forEach(callback => callback()); + this.startHeartbeat(); + resolve(); + }); + + this.socket.addEventListener('message', (event: MessageEvent) => { + try { + const message = JSON.parse(event.data) as RealtimeResponse; + this.handleMessage(message); + } catch (error) { + console.error('Failed to parse message:', error); + } + }); + + this.socket.addEventListener('close', async (event: CloseEvent) => { + this.stopHeartbeat(); + this.onCloseCallbacks.forEach(callback => callback()); + + if (!this.reconnect || event.code === RealtimeCode.POLICY_VIOLATION) { + this.reconnect = true; + return; + } + + const timeout = this.getTimeout(); + console.log(`Realtime disconnected. Re-connecting in ${timeout / 1000} seconds.`); + + await this.sleep(timeout); + this.reconnectAttempts++; + + try { + await this.createSocket(); + } catch (error) { + console.error('Failed to reconnect:', error); + } + }); + + this.socket.addEventListener('error', (event: Event) => { + this.stopHeartbeat(); + const error = new Error('WebSocket error'); + console.error('WebSocket error:', error.message); + this.onErrorCallbacks.forEach(callback => callback(error)); + reject(error); + }); + } catch (error) { + reject(error); + } + }); + } + + private async closeSocket(): Promise { + this.stopHeartbeat(); + + if (this.socket) { + return new Promise((resolve) => { + if (!this.socket) { + resolve(); + return; + } + + if (this.socket.readyState === WebSocket.OPEN || + this.socket.readyState === WebSocket.CONNECTING) { + this.socket.addEventListener('close', () => { + resolve(); + }, { once: true }); + this.socket.close(RealtimeCode.NORMAL_CLOSURE); + } else { + resolve(); + } + }); + } + } + + private getTimeout(): number { + if (this.reconnectAttempts < 5) { + return 1000; + } else if (this.reconnectAttempts < 15) { + return 5000; + } else if (this.reconnectAttempts < 100) { + return 10000; + } else { + return 60000; + } + } + + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + /** + * Subscribe to a single channel + * + * @param {string} channel - Channel name to subscribe to + * @param {Function} callback - Callback function to handle events + * @returns {Promise} Subscription object with close method + */ + public async subscribe( + channel: string, + callback: (event: RealtimeResponseEvent) => void + ): Promise; + + /** + * Subscribe to multiple channels + * + * @param {string[]} channels - Array of channel names to subscribe to + * @param {Function} callback - Callback function to handle events + * @returns {Promise} Subscription object with close method + */ + public async subscribe( + channels: string[], + callback: (event: RealtimeResponseEvent) => void + ): Promise; + + /** + * Subscribe to a single channel with typed payload + * + * @param {string} channel - Channel name to subscribe to + * @param {Function} callback - Callback function to handle events with typed payload + * @returns {Promise} Subscription object with close method + */ + public async subscribe( + channel: string, + callback: (event: RealtimeResponseEvent) => void + ): Promise; + + /** + * Subscribe to multiple channels with typed payload + * + * @param {string[]} channels - Array of channel names to subscribe to + * @param {Function} callback - Callback function to handle events with typed payload + * @returns {Promise} Subscription object with close method + */ + public async subscribe( + channels: string[], + callback: (event: RealtimeResponseEvent) => void + ): Promise; + + public async subscribe( + channelsOrChannel: string | string[], + callback: (event: RealtimeResponseEvent) => void + ): Promise { + const channels = Array.isArray(channelsOrChannel) + ? new Set(channelsOrChannel) + : new Set([channelsOrChannel]); + + this.subscriptionsCounter++; + const count = this.subscriptionsCounter; + + for (const channel of channels) { + this.activeChannels.add(channel); + } + + this.activeSubscriptions.set(count, { + channels, + callback + }); + + this.subCallDepth++; + + await this.sleep(this.DEBOUNCE_MS); + + if (this.subCallDepth === 1) { + await this.createSocket(); + } + + this.subCallDepth--; + + return { + close: async () => { + this.activeSubscriptions.delete(count); + this.cleanUp(channels); + await this.createSocket(); + } + }; + } + + private cleanUp(channels: Set): void { + this.activeChannels = new Set( + Array.from(this.activeChannels).filter(channel => { + if (!channels.has(channel)) { + return true; + } + + const subsWithChannel = Array.from(this.activeSubscriptions.values()) + .filter(sub => sub.channels.has(channel)); + + return subsWithChannel.length > 0; + }) + ); + } + + private handleMessage(message: RealtimeResponse): void { + if (!message.type) { + return; + } + + switch (message.type) { + case this.TYPE_CONNECTED: + this.handleResponseConnected(message); + break; + case this.TYPE_ERROR: + this.handleResponseError(message); + break; + case this.TYPE_EVENT: + this.handleResponseEvent(message); + break; + case this.TYPE_PONG: + // Handle pong response if needed + break; + } + } + + private handleResponseConnected(message: RealtimeResponse): void { + if (!message.data) { + return; + } + + const messageData = message.data as RealtimeResponseConnected; + + let session = this.client.config.session; + if (!session) { + try { + const cookie = JSON.parse(window.localStorage.getItem('cookieFallback') ?? '{}'); + session = cookie?.[`a_session_${this.client.config.project}`]; + } catch (error) { + console.error('Failed to parse cookie fallback:', error); + } + } + + if (session && !messageData.user) { + this.socket?.send(JSON.stringify({ + type: 'authentication', + data: { + session + } + })); + } + } + + private handleResponseError(message: RealtimeResponse): void { + const error = new AppwriteException( + message.data?.message || 'Unknown error' + ); + const statusCode = message.data?.code; + this.onErrorCallbacks.forEach(callback => callback(error, statusCode)); + } + + private handleResponseEvent(message: RealtimeResponse): void { + const data = message.data; + if (!data) { + return; + } + + const channels = data.channels as string[]; + const events = data.events as string[]; + const payload = data.payload; + const timestamp = data.timestamp as string; + + if (!channels || !events || !payload) { + return; + } + + const hasActiveChannel = channels.some(channel => + this.activeChannels.has(channel) + ); + + if (!hasActiveChannel) { + return; + } + + for (const [_, subscription] of this.activeSubscriptions) { + const hasSubscribedChannel = channels.some(channel => + subscription.channels.has(channel) + ); + + if (hasSubscribedChannel) { + const response: RealtimeResponseEvent = { + events, + channels, + timestamp, + payload + }; + subscription.callback(response); + } + } + } +}