diff --git a/package/src/components/Chat/hooks/handleEventToSyncDB.ts b/package/src/components/Chat/hooks/handleEventToSyncDB.ts index 8214bfce10..2fa8420724 100644 --- a/package/src/components/Chat/hooks/handleEventToSyncDB.ts +++ b/package/src/components/Chat/hooks/handleEventToSyncDB.ts @@ -1,74 +1,152 @@ -import type { Event } from 'stream-chat'; +import { DefaultStreamChatGenerics } from 'src/types/types'; +import type { Event, StreamChat } from 'stream-chat'; import { deleteChannel } from '../../../store/apis/deleteChannel'; import { deleteMember } from '../../../store/apis/deleteMember'; import { deleteMessagesForChannel } from '../../../store/apis/deleteMessagesForChannel'; import { updateMessage } from '../../../store/apis/updateMessage'; import { upsertChannelData } from '../../../store/apis/upsertChannelData'; +import { upsertChannelDataFromChannel } from '../../../store/apis/upsertChannelDataFromChannel'; import { upsertChannels } from '../../../store/apis/upsertChannels'; import { upsertMembers } from '../../../store/apis/upsertMembers'; import { upsertMessages } from '../../../store/apis/upsertMessages'; import { upsertReads } from '../../../store/apis/upsertReads'; +import { QuickSqliteClient } from '../../../store/QuickSqliteClient'; +import { createSelectQuery } from '../../../store/sqlite-utils/createSelectQuery'; +import { PreparedQueries } from '../../../store/types'; -export const handleEventToSyncDB = (event: Event, flush?: boolean) => { +export const handleEventToSyncDB = < + StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics, +>( + event: Event, + client: StreamChat, + flush?: boolean, +) => { const { type } = event; + // This function is used to guard the queries that require channel to be present in the db first + // If channel is not present in the db, we first fetch the channel data from the channel object + // and then add the queries with a channel create query first + const queriesWithChannelGuard = ( + createQueries: (flushOverride?: boolean) => PreparedQueries[], + ) => { + const cid = event.cid || event.channel?.cid; + + if (!cid) return createQueries(flush); + const channels = QuickSqliteClient.executeSql.apply( + null, + createSelectQuery('channels', ['cid'], { + cid, + }), + ); + // a channel is not present in the db, we first fetch the channel data from the channel object. + // this can happen for example when a message.new event is received for a channel that is not in the db due to a channel being hidden. + if (channels.length === 0) { + const channel = + event.channel_type && event.channel_id + ? client.channel(event.channel_type, event.channel_id) + : undefined; + if (channel && channel.initialized && !channel.disconnected) { + const channelQuery = upsertChannelDataFromChannel({ + channel, + flush, + }); + if (channelQuery) { + const newQueries = [...channelQuery, ...createQueries(false)]; + if (flush !== false) { + QuickSqliteClient.executeSqlBatch(newQueries); + } + return newQueries; + } else { + console.warn( + `Couldnt create channel queries on ${type} event for an initialized channel that is not in DB, skipping event`, + { event }, + ); + return []; + } + } else { + console.warn( + `Received ${type} event for a non initialized channel that is not in DB, skipping event`, + { event }, + ); + return []; + } + } + return createQueries(flush); + }; + if (type === 'message.read') { - if (event.user?.id && event.cid) { - return upsertReads({ - cid: event.cid, - flush, - reads: [ - { - last_read: event.received_at as string, - unread_messages: 0, - user: event.user, - }, - ], - }); + const cid = event.cid; + const user = event.user; + if (user?.id && cid) { + return queriesWithChannelGuard((flushOverride) => + upsertReads({ + cid, + flush: flushOverride, + reads: [ + { + last_read: event.received_at as string, + unread_messages: 0, + user, + }, + ], + }), + ); } } if (type === 'message.new') { - if (event.message && (!event.message.parent_id || event.message.show_in_channel)) { - return upsertMessages({ - flush, - messages: [event.message], - }); + const message = event.message; + if (message && (!message.parent_id || message.show_in_channel)) { + return queriesWithChannelGuard((flushOverride) => + upsertMessages({ + flush: flushOverride, + messages: [message], + }), + ); } } if (type === 'message.updated' || type === 'message.deleted') { - if (event.message && !event.message.parent_id) { + const message = event.message; + if (message && !message.parent_id) { // Update only if it exists, otherwise event could be related // to a message which is not in database. - return updateMessage({ - flush, - message: event.message, - }); + return queriesWithChannelGuard((flushOverride) => + updateMessage({ + flush: flushOverride, + message, + }), + ); } } if (type === 'reaction.updated') { - if (event.message && event.reaction) { + const message = event.message; + if (message && event.reaction) { // We update the entire message to make sure we also update // reaction_counts. - return updateMessage({ - flush, - message: event.message, - }); + return queriesWithChannelGuard((flushOverride) => + updateMessage({ + flush: flushOverride, + message, + }), + ); } } if (type === 'reaction.new' || type === 'reaction.deleted') { - if (event.message && !event.message.parent_id) { + const message = event.message; + if (message && !message.parent_id) { // Here we are relying on the fact message.latest_reactions always includes // the new reaction. So we first delete all the existing reactions and populate // the reactions table with message.latest_reactions - return updateMessage({ - flush, - message: event.message, - }); + return queriesWithChannelGuard((flushOverride) => + updateMessage({ + flush: flushOverride, + message, + }), + ); } } @@ -119,22 +197,30 @@ export const handleEventToSyncDB = (event: Event, flush?: boolean) => { } if (type === 'member.added' || type === 'member.updated') { - if (event.member && event.cid) { - return upsertMembers({ - cid: event.cid, - flush, - members: [event.member], - }); + const member = event.member; + const cid = event.cid; + if (member && cid) { + return queriesWithChannelGuard((flushOverride) => + upsertMembers({ + cid, + flush: flushOverride, + members: [member], + }), + ); } } if (type === 'member.removed') { - if (event.member && event.cid) { - return deleteMember({ - cid: event.cid, - flush, - member: event.member, - }); + const member = event.member; + const cid = event.cid; + if (member && cid) { + return queriesWithChannelGuard((flushOverride) => + deleteMember({ + cid, + flush: flushOverride, + member, + }), + ); } } diff --git a/package/src/components/Chat/hooks/useSyncDatabase.ts b/package/src/components/Chat/hooks/useSyncDatabase.ts index 536319850d..9b69dbefa3 100644 --- a/package/src/components/Chat/hooks/useSyncDatabase.ts +++ b/package/src/components/Chat/hooks/useSyncDatabase.ts @@ -22,7 +22,7 @@ export const useSyncDatabase = < let listener: ReturnType | undefined; if (enableOfflineSupport && initialisedDatabase) { - listener = client?.on(handleEventToSyncDB); + listener = client?.on((event) => handleEventToSyncDB(event, client)); } return () => { diff --git a/package/src/store/apis/upsertChannelDataFromChannel.ts b/package/src/store/apis/upsertChannelDataFromChannel.ts new file mode 100644 index 0000000000..b8577e18e4 --- /dev/null +++ b/package/src/store/apis/upsertChannelDataFromChannel.ts @@ -0,0 +1,25 @@ +import { DefaultStreamChatGenerics } from 'src/types/types'; +import type { Channel } from 'stream-chat'; + +import { mapChannelToStorable } from '../mappers/mapChannelToStorable'; +import { QuickSqliteClient } from '../QuickSqliteClient'; +import { createUpsertQuery } from '../sqlite-utils/createUpsertQuery'; + +export const upsertChannelDataFromChannel = < + StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics, +>({ + channel, + flush = true, +}: { + channel: Channel; + flush?: boolean; +}) => { + const storableChannel = mapChannelToStorable(channel); + if (!storableChannel) return; + const query = createUpsertQuery('channels', storableChannel); + if (flush) { + QuickSqliteClient.executeSqlBatch([query]); + } + + return [query]; +}; diff --git a/package/src/store/mappers/mapChannelToStorable.ts b/package/src/store/mappers/mapChannelToStorable.ts new file mode 100644 index 0000000000..1eaf33efdf --- /dev/null +++ b/package/src/store/mappers/mapChannelToStorable.ts @@ -0,0 +1,68 @@ +import type { Channel, ChannelResponse } from 'stream-chat'; + +import { mapDateTimeToStorable } from './mapDateTimeToStorable'; + +import type { DefaultStreamChatGenerics } from '../../types/types'; + +import type { TableRow } from '../types'; + +export const mapChannelToStorable = < + StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics, +>( + channel: Channel, +): TableRow<'channels'> | undefined => { + if (!channel.data) return; + const { + auto_translation_enabled, + auto_translation_language, + cid, + config, + cooldown, + created_at, + deleted_at, + disabled, + frozen, + hidden, + id, + invites, + last_message_at, + member_count, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + members, + muted, + own_capabilities, + team, + truncated_at, + truncated_by, + truncated_by_id, + type, + updated_at, + ...extraData + } = channel.data as unknown as ChannelResponse; + + return { + autoTranslationEnabled: auto_translation_enabled, + autoTranslationLanguage: auto_translation_language, + cid, + config: config && JSON.stringify(config), + cooldown, + createdAt: mapDateTimeToStorable(created_at), + deletedAt: mapDateTimeToStorable(deleted_at), + disabled, + extraData: JSON.stringify(extraData), + frozen, + hidden, + id, + invites: invites && JSON.stringify(invites), + lastMessageAt: mapDateTimeToStorable(last_message_at), + memberCount: member_count, + muted, + ownCapabilities: own_capabilities && JSON.stringify(own_capabilities), + team, + truncatedAt: truncated_at, + truncatedBy: truncated_by && JSON.stringify(truncated_by), + truncatedById: truncated_by_id, + type, + updatedAt: updated_at, + }; +}; diff --git a/package/src/utils/DBSyncManager.ts b/package/src/utils/DBSyncManager.ts index 5fbf6c7bdf..e5638208b9 100644 --- a/package/src/utils/DBSyncManager.ts +++ b/package/src/utils/DBSyncManager.ts @@ -90,7 +90,11 @@ export class DBSyncManager { }; }; - static sync = async () => { + static sync = async < + StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics, + >( + client: StreamChat, + ) => { if (!this.client?.user) return; const cids = getAllChannelIds(); // If there are no channels, then there is no need to sync. @@ -113,7 +117,7 @@ export class DBSyncManager { try { const result = await this.client.sync(cids, lastSyncedAtDate.toISOString()); const queries = result.events.reduce((queries, event) => { - queries = queries.concat(handleEventToSyncDB(event, false)); + queries = queries.concat(handleEventToSyncDB(event, client, false)); return queries; }, []); @@ -137,7 +141,7 @@ export class DBSyncManager { if (!this.client) return; await this.executePendingTasks(this.client); - await this.sync(); + await this.sync(this.client); }; static queueTask = async <