Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 132 additions & 46 deletions package/src/components/Chat/hooks/handleEventToSyncDB.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,152 @@
import type { Event } from 'stream-chat';
import { DefaultStreamChatGenerics } from 'src/types/types';
import type { Event, StreamChat } from 'stream-chat';

import { deleteChannel } from '../../../store/apis/deleteChannel';
import { deleteMember } from '../../../store/apis/deleteMember';
import { deleteMessagesForChannel } from '../../../store/apis/deleteMessagesForChannel';
import { updateMessage } from '../../../store/apis/updateMessage';
import { upsertChannelData } from '../../../store/apis/upsertChannelData';
import { upsertChannelDataFromChannel } from '../../../store/apis/upsertChannelDataFromChannel';
import { upsertChannels } from '../../../store/apis/upsertChannels';
import { upsertMembers } from '../../../store/apis/upsertMembers';
import { upsertMessages } from '../../../store/apis/upsertMessages';
import { upsertReads } from '../../../store/apis/upsertReads';
import { QuickSqliteClient } from '../../../store/QuickSqliteClient';
import { createSelectQuery } from '../../../store/sqlite-utils/createSelectQuery';
import { PreparedQueries } from '../../../store/types';

export const handleEventToSyncDB = (event: Event, flush?: boolean) => {
export const handleEventToSyncDB = <
StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics,
>(
event: Event,
client: StreamChat<StreamChatGenerics>,
flush?: boolean,
) => {
const { type } = event;

// This function is used to guard the queries that require channel to be present in the db first
// If channel is not present in the db, we first fetch the channel data from the channel object
// and then add the queries with a channel create query first
const queriesWithChannelGuard = (
createQueries: (flushOverride?: boolean) => PreparedQueries[],
) => {
const cid = event.cid || event.channel?.cid;

if (!cid) return createQueries(flush);
const channels = QuickSqliteClient.executeSql.apply(
null,
createSelectQuery('channels', ['cid'], {
cid,
}),
);
// a channel is not present in the db, we first fetch the channel data from the channel object.
// this can happen for example when a message.new event is received for a channel that is not in the db due to a channel being hidden.
if (channels.length === 0) {
const channel =
event.channel_type && event.channel_id
? client.channel(event.channel_type, event.channel_id)
: undefined;
if (channel && channel.initialized && !channel.disconnected) {
const channelQuery = upsertChannelDataFromChannel({
channel,
flush,
});
if (channelQuery) {
const newQueries = [...channelQuery, ...createQueries(false)];
if (flush !== false) {
QuickSqliteClient.executeSqlBatch(newQueries);
}
return newQueries;
} else {
console.warn(
`Couldnt create channel queries on ${type} event for an initialized channel that is not in DB, skipping event`,
{ event },
);
return [];
}
} else {
console.warn(
`Received ${type} event for a non initialized channel that is not in DB, skipping event`,
{ event },
);
return [];
}
}
return createQueries(flush);
};

if (type === 'message.read') {
if (event.user?.id && event.cid) {
return upsertReads({
cid: event.cid,
flush,
reads: [
{
last_read: event.received_at as string,
unread_messages: 0,
user: event.user,
},
],
});
const cid = event.cid;
const user = event.user;
if (user?.id && cid) {
return queriesWithChannelGuard((flushOverride) =>
upsertReads({
cid,
flush: flushOverride,
reads: [
{
last_read: event.received_at as string,
unread_messages: 0,
user,
},
],
}),
);
}
}

if (type === 'message.new') {
if (event.message && (!event.message.parent_id || event.message.show_in_channel)) {
return upsertMessages({
flush,
messages: [event.message],
});
const message = event.message;
if (message && (!message.parent_id || message.show_in_channel)) {
return queriesWithChannelGuard((flushOverride) =>
upsertMessages({
flush: flushOverride,
messages: [message],
}),
);
}
}

if (type === 'message.updated' || type === 'message.deleted') {
if (event.message && !event.message.parent_id) {
const message = event.message;
if (message && !message.parent_id) {
// Update only if it exists, otherwise event could be related
// to a message which is not in database.
return updateMessage({
flush,
message: event.message,
});
return queriesWithChannelGuard((flushOverride) =>
updateMessage({
flush: flushOverride,
message,
}),
);
}
}

if (type === 'reaction.updated') {
if (event.message && event.reaction) {
const message = event.message;
if (message && event.reaction) {
// We update the entire message to make sure we also update
// reaction_counts.
return updateMessage({
flush,
message: event.message,
});
return queriesWithChannelGuard((flushOverride) =>
updateMessage({
flush: flushOverride,
message,
}),
);
}
}

if (type === 'reaction.new' || type === 'reaction.deleted') {
if (event.message && !event.message.parent_id) {
const message = event.message;
if (message && !message.parent_id) {
// Here we are relying on the fact message.latest_reactions always includes
// the new reaction. So we first delete all the existing reactions and populate
// the reactions table with message.latest_reactions
return updateMessage({
flush,
message: event.message,
});
return queriesWithChannelGuard((flushOverride) =>
updateMessage({
flush: flushOverride,
message,
}),
);
}
}

Expand Down Expand Up @@ -119,22 +197,30 @@ export const handleEventToSyncDB = (event: Event, flush?: boolean) => {
}

if (type === 'member.added' || type === 'member.updated') {
if (event.member && event.cid) {
return upsertMembers({
cid: event.cid,
flush,
members: [event.member],
});
const member = event.member;
const cid = event.cid;
if (member && cid) {
return queriesWithChannelGuard((flushOverride) =>
upsertMembers({
cid,
flush: flushOverride,
members: [member],
}),
);
}
}

if (type === 'member.removed') {
if (event.member && event.cid) {
return deleteMember({
cid: event.cid,
flush,
member: event.member,
});
const member = event.member;
const cid = event.cid;
if (member && cid) {
return queriesWithChannelGuard((flushOverride) =>
deleteMember({
cid,
flush: flushOverride,
member,
}),
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion package/src/components/Chat/hooks/useSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const useSyncDatabase = <
let listener: ReturnType<StreamChat['on']> | undefined;

if (enableOfflineSupport && initialisedDatabase) {
listener = client?.on(handleEventToSyncDB);
listener = client?.on((event) => handleEventToSyncDB(event, client));
}

return () => {
Expand Down
25 changes: 25 additions & 0 deletions package/src/store/apis/upsertChannelDataFromChannel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { DefaultStreamChatGenerics } from 'src/types/types';
import type { Channel } from 'stream-chat';

import { mapChannelToStorable } from '../mappers/mapChannelToStorable';
import { QuickSqliteClient } from '../QuickSqliteClient';
import { createUpsertQuery } from '../sqlite-utils/createUpsertQuery';

export const upsertChannelDataFromChannel = <
StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics,
>({
channel,
flush = true,
}: {
channel: Channel<StreamChatGenerics>;
flush?: boolean;
}) => {
const storableChannel = mapChannelToStorable(channel);
if (!storableChannel) return;
const query = createUpsertQuery('channels', storableChannel);
if (flush) {
QuickSqliteClient.executeSqlBatch([query]);
}

return [query];
};
68 changes: 68 additions & 0 deletions package/src/store/mappers/mapChannelToStorable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { Channel, ChannelResponse } from 'stream-chat';

import { mapDateTimeToStorable } from './mapDateTimeToStorable';

import type { DefaultStreamChatGenerics } from '../../types/types';

import type { TableRow } from '../types';

export const mapChannelToStorable = <
StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics,
>(
channel: Channel<StreamChatGenerics>,
): TableRow<'channels'> | undefined => {
if (!channel.data) return;
const {
auto_translation_enabled,
auto_translation_language,
cid,
config,
cooldown,
created_at,
deleted_at,
disabled,
frozen,
hidden,
id,
invites,
last_message_at,
member_count,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
members,
muted,
own_capabilities,
team,
truncated_at,
truncated_by,
truncated_by_id,
type,
updated_at,
...extraData
} = channel.data as unknown as ChannelResponse<StreamChatGenerics>;

return {
autoTranslationEnabled: auto_translation_enabled,
autoTranslationLanguage: auto_translation_language,
cid,
config: config && JSON.stringify(config),
cooldown,
createdAt: mapDateTimeToStorable(created_at),
deletedAt: mapDateTimeToStorable(deleted_at),
disabled,
extraData: JSON.stringify(extraData),
frozen,
hidden,
id,
invites: invites && JSON.stringify(invites),
lastMessageAt: mapDateTimeToStorable(last_message_at),
memberCount: member_count,
muted,
ownCapabilities: own_capabilities && JSON.stringify(own_capabilities),
team,
truncatedAt: truncated_at,
truncatedBy: truncated_by && JSON.stringify(truncated_by),
truncatedById: truncated_by_id,
type,
updatedAt: updated_at,
};
};
10 changes: 7 additions & 3 deletions package/src/utils/DBSyncManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ export class DBSyncManager {
};
};

static sync = async () => {
static sync = async <
StreamChatGenerics extends DefaultStreamChatGenerics = DefaultStreamChatGenerics,
>(
client: StreamChat<StreamChatGenerics>,
) => {
if (!this.client?.user) return;
const cids = getAllChannelIds();
// If there are no channels, then there is no need to sync.
Expand All @@ -113,7 +117,7 @@ export class DBSyncManager {
try {
const result = await this.client.sync(cids, lastSyncedAtDate.toISOString());
const queries = result.events.reduce<PreparedQueries[]>((queries, event) => {
queries = queries.concat(handleEventToSyncDB(event, false));
queries = queries.concat(handleEventToSyncDB(event, client, false));
return queries;
}, []);

Expand All @@ -137,7 +141,7 @@ export class DBSyncManager {
if (!this.client) return;

await this.executePendingTasks(this.client);
await this.sync();
await this.sync(this.client);
};

static queueTask = async <
Expand Down