Skip to content

Commit

Permalink
pass down enum to syncengine
Browse files Browse the repository at this point in the history
  • Loading branch information
milaGGL committed Feb 7, 2024
1 parent 3383fce commit 9e3428d
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 174 deletions.
83 changes: 26 additions & 57 deletions packages/firestore/src/core/event_manager.ts
Expand Up @@ -60,11 +60,9 @@ export interface Observer<T> {
export interface EventManager {
onListen?: (
query: Query,
enableRemoteListen: boolean
listenStatus: ListenStatus
) => Promise<ViewSnapshot>;
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
onUnlisten?: (query: Query, unlistenStatus: UnlistenStatus) => Promise<void>;
}

export function newEventManager(): EventManager {
Expand All @@ -84,44 +82,25 @@ export class EventManagerImpl implements EventManager {
/** Callback invoked when a Query is first listen to. */
onListen?: (
query: Query,
enableRemoteListen: boolean
listenStatus: ListenStatus
) => Promise<ViewSnapshot>;
/** Callback invoked once all listeners to a Query are removed. */
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;

/**
* Callback invoked when a Query starts listening to the remote store, while
* already listening to the cache.
*/
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
/**
* Callback invoked when a Query stops listening to the remote store, while
* still listening to the cache.
*/
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
onUnlisten?: (query: Query, unlistenStatus: UnlistenStatus) => Promise<void>;
}

function validateEventManager(eventManagerImpl: EventManagerImpl): void {
debugAssert(!!eventManagerImpl.onListen, 'onListen not set');
debugAssert(
!!eventManagerImpl.onFirstRemoteStoreListen,
'onFirstRemoteStoreListen not set'
);
debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set');
debugAssert(
!!eventManagerImpl.onLastRemoteStoreUnlisten,
'onLastRemoteStoreUnlisten not set'
);
}

const enum ListenStatus {
export const enum ListenStatus {
FirstListenAndRequireWatchConnection,
FirstListenAndNotRequireWatchConnection,
NotFirstListenButRequireWatchConnection,
NoActionRequired
}

const enum UnlistenStatus {
export const enum UnlistenStatus {
LastListenAndRequireWatchDisconnection,
LastListenAndNotRequireWatchDisconnection,
NotLastListenButRequireWatchDisconnection,
Expand Down Expand Up @@ -149,27 +128,20 @@ export async function eventManagerListen(
listenerStatus = ListenStatus.NotFirstListenButRequireWatchConnection;
}

try {
switch (listenerStatus) {
case ListenStatus.FirstListenAndRequireWatchConnection:
queryInfo.viewSnap = await eventManagerImpl.onListen!(query, true);
break;
case ListenStatus.FirstListenAndNotRequireWatchConnection:
queryInfo.viewSnap = await eventManagerImpl.onListen!(query, false);
break;
case ListenStatus.NotFirstListenButRequireWatchConnection:
await eventManagerImpl.onFirstRemoteStoreListen!(query);
break;
default:
break;
if (listenerStatus !== ListenStatus.NoActionRequired) {
try {
queryInfo.viewSnap = await eventManagerImpl.onListen!(
query,
listenerStatus
);
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e as Error,
`Initialization of query '${stringifyQuery(listener.query)}' failed`
);
listener.onError(firestoreError);
return;
}
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e as Error,
`Initialization of query '${stringifyQuery(listener.query)}' failed`
);
listener.onError(firestoreError);
return;
}

eventManagerImpl.queries.set(query, queryInfo);
Expand Down Expand Up @@ -222,17 +194,14 @@ export async function eventManagerUnlisten(
}
}
}
switch (unlistenStatus) {
case UnlistenStatus.LastListenAndRequireWatchDisconnection:
eventManagerImpl.queries.delete(query);
return eventManagerImpl.onUnlisten!(query, true);
case UnlistenStatus.LastListenAndNotRequireWatchDisconnection:
if (unlistenStatus !== UnlistenStatus.NoActionRequired) {
if (
unlistenStatus !==
UnlistenStatus.NotLastListenButRequireWatchDisconnection
) {
eventManagerImpl.queries.delete(query);
return eventManagerImpl.onUnlisten!(query, false);
case UnlistenStatus.NotLastListenButRequireWatchDisconnection:
return eventManagerImpl.onLastRemoteStoreUnlisten!(query);
default:
return;
}
return eventManagerImpl.onUnlisten!(query, unlistenStatus);
}
}

Expand Down
12 changes: 1 addition & 11 deletions packages/firestore/src/core/firestore_client.ts
Expand Up @@ -86,9 +86,7 @@ import {
syncEngineLoadBundle,
syncEngineRegisterPendingWritesCallback,
syncEngineUnlisten,
syncEngineWrite,
triggerRemoteStoreListen,
triggerRemoteStoreUnlisten
syncEngineWrite
} from './sync_engine_impl';
import { Transaction } from './transaction';
import { TransactionOptions } from './transaction_options';
Expand Down Expand Up @@ -399,14 +397,6 @@ export async function getEventManager(
null,
onlineComponentProvider.syncEngine
);
eventManager.onFirstRemoteStoreListen = triggerRemoteStoreListen.bind(
null,
onlineComponentProvider.syncEngine
);
eventManager.onLastRemoteStoreUnlisten = triggerRemoteStoreUnlisten.bind(
null,
onlineComponentProvider.syncEngine
);
return eventManager;
}

Expand Down
171 changes: 84 additions & 87 deletions packages/firestore/src/core/sync_engine_impl.ts
Expand Up @@ -82,7 +82,9 @@ import {
EventManager,
eventManagerOnOnlineStateChange,
eventManagerOnWatchChange,
eventManagerOnWatchError
eventManagerOnWatchError,
ListenStatus,
UnlistenStatus
} from './event_manager';
import { ListenSequence } from './listen_sequence';
import {
Expand Down Expand Up @@ -293,25 +295,18 @@ export function newSyncEngine(
export async function syncEngineListen(
syncEngine: SyncEngine,
query: Query,
shouldListenToRemote: boolean = true
listenStatus: ListenStatus = ListenStatus.FirstListenAndRequireWatchConnection
): Promise<ViewSnapshot> {
const syncEngineImpl = ensureWatchCallbacks(syncEngine);

let targetId;
let viewSnapshot;

const queryView = syncEngineImpl.queryViewsByQuery.get(query);
if (queryView) {
// PORTING NOTE: With Multi-Tab Web, it is possible that a query view
// already exists when EventManager calls us for the first time. This
// happens when the primary tab is already listening to this query on
// behalf of another tab and the user of the primary also starts listening
// to the query. EventManager will not have an assigned target ID in this
// case and calls `listen` to obtain this ID.
targetId = queryView.targetId;
syncEngineImpl.sharedClientState.addLocalQueryTarget(targetId);
viewSnapshot = queryView.view.computeInitialSnapshot();
} else {

if (
!queryView ||
listenStatus == ListenStatus.NotFirstListenButRequireWatchConnection

Check failure on line 308 in packages/firestore/src/core/sync_engine_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

Expected '===' and instead saw '=='
) {
const targetData = await localStoreAllocateTarget(
syncEngineImpl.localStore,
queryToTarget(query)
Expand All @@ -320,11 +315,12 @@ export async function syncEngineListen(
// PORTING NOTE: When the query is listening to cache only, we skip sending it over to Watch by
// not registering it in shared client state, and directly calculate initial snapshots and
// subsequent updates from cache.
const status: QueryTargetState = shouldListenToRemote
? syncEngineImpl.sharedClientState.addLocalQueryTarget(
targetData.targetId
)
: 'not-current';
const status: QueryTargetState =
listenStatus != ListenStatus.FirstListenAndNotRequireWatchConnection

Check failure on line 319 in packages/firestore/src/core/sync_engine_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

Expected '!==' and instead saw '!='
? syncEngineImpl.sharedClientState.addLocalQueryTarget(
targetData.targetId
)
: 'not-current';

targetId = targetData.targetId;
viewSnapshot = await initializeViewAndComputeSnapshot(
Expand All @@ -335,34 +331,27 @@ export async function syncEngineListen(
targetData.resumeToken
);

if (syncEngineImpl.isPrimaryClient && shouldListenToRemote) {
if (
syncEngineImpl.isPrimaryClient &&
listenStatus != ListenStatus.FirstListenAndNotRequireWatchConnection

Check failure on line 336 in packages/firestore/src/core/sync_engine_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

Expected '!==' and instead saw '!='
) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}
} else {
// PORTING NOTE: With Multi-Tab Web, it is possible that a query view
// already exists when EventManager calls us for the first time. This
// happens when the primary tab is already listening to this query on
// behalf of another tab and the user of the primary also starts listening
// to the query. EventManager will not have an assigned target ID in this
// case and calls `listen` to obtain this ID.
targetId = queryView.targetId;
syncEngineImpl.sharedClientState.addLocalQueryTarget(targetId);
viewSnapshot = queryView.view.computeInitialSnapshot();
}

return viewSnapshot;
}

export async function triggerRemoteStoreListen(
syncEngine: SyncEngine,
query: Query
): Promise<void> {
const syncEngineImpl = ensureWatchCallbacks(syncEngine);

const targetData = await localStoreAllocateTarget(
syncEngineImpl.localStore,
queryToTarget(query)
);

// PORTING NOTE: Register the target ID with local Firestore client as active
// watch target.
syncEngineImpl.sharedClientState.addLocalQueryTarget(targetData.targetId);

if (syncEngineImpl.isPrimaryClient) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}
}

/**
* Registers a view for a previously unknown query and computes its initial
* snapshot.
Expand Down Expand Up @@ -421,7 +410,7 @@ async function initializeViewAndComputeSnapshot(
export async function syncEngineUnlisten(
syncEngine: SyncEngine,
query: Query,
shouldUnlistenToRemote: boolean
unlistenStatus: UnlistenStatus = UnlistenStatus.LastListenAndRequireWatchDisconnection
): Promise<void> {
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
const queryView = syncEngineImpl.queryViewsByQuery.get(query)!;
Expand All @@ -434,36 +423,65 @@ export async function syncEngineUnlisten(
// to the target.
const queries = syncEngineImpl.queriesByTarget.get(queryView.targetId)!;
if (queries.length > 1) {
syncEngineImpl.queriesByTarget.set(
queryView.targetId,
queries.filter(q => !queryEquals(q, query))
);
syncEngineImpl.queryViewsByQuery.delete(query);
if (
unlistenStatus !==
UnlistenStatus.NotLastListenButRequireWatchDisconnection
) {
syncEngineImpl.queriesByTarget.set(
queryView.targetId,
queries.filter(q => !queryEquals(q, query))
);
syncEngineImpl.queryViewsByQuery.delete(query);
}
return;
}

// No other queries are mapped to the target, clean up the query and the target.
if (syncEngineImpl.isPrimaryClient) {
// We need to remove the local query target first to allow us to verify
// whether any other client is still interested in this target.
syncEngineImpl.sharedClientState.removeLocalQueryTarget(queryView.targetId);
const targetRemainsActive =
syncEngineImpl.sharedClientState.isActiveQueryTarget(queryView.targetId);

if (!targetRemainsActive) {
await localStoreReleaseTarget(
syncEngineImpl.localStore,
queryView.targetId,
/*keepPersistedTargetData=*/ false
)
.then(() => {
syncEngineImpl.sharedClientState.clearQueryState(queryView.targetId);
if (shouldUnlistenToRemote) {
remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId);
}
removeAndCleanupTarget(syncEngineImpl, queryView.targetId);
})
.catch(ignoreIfPrimaryLeaseLoss);
if (
unlistenStatus == UnlistenStatus.NotLastListenButRequireWatchDisconnection

Check failure on line 442 in packages/firestore/src/core/sync_engine_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

Expected '===' and instead saw '=='
) {
// PORTING NOTE: Unregister the target ID with local Firestore client as
// watch target.
syncEngineImpl.sharedClientState.removeLocalQueryTarget(
queryView.targetId
);

remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId);
} else {
// We need to remove the local query target first to allow us to verify
// whether any other client is still interested in this target.
syncEngineImpl.sharedClientState.removeLocalQueryTarget(
queryView.targetId
);
const targetRemainsActive =
syncEngineImpl.sharedClientState.isActiveQueryTarget(
queryView.targetId
);

if (!targetRemainsActive) {
await localStoreReleaseTarget(
syncEngineImpl.localStore,
queryView.targetId,
/*keepPersistedTargetData=*/ false
)
.then(() => {
syncEngineImpl.sharedClientState.clearQueryState(
queryView.targetId
);
if (
unlistenStatus ==

Check failure on line 473 in packages/firestore/src/core/sync_engine_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

Expected '===' and instead saw '=='
UnlistenStatus.LastListenAndRequireWatchDisconnection
) {
remoteStoreUnlisten(
syncEngineImpl.remoteStore,
queryView.targetId
);
}
removeAndCleanupTarget(syncEngineImpl, queryView.targetId);
})
.catch(ignoreIfPrimaryLeaseLoss);
}
}
} else {
removeAndCleanupTarget(syncEngineImpl, queryView.targetId);
Expand All @@ -475,27 +493,6 @@ export async function syncEngineUnlisten(
}
}

export async function triggerRemoteStoreUnlisten(
syncEngine: SyncEngine,
query: Query
): Promise<void> {
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
const queryView = syncEngineImpl.queryViewsByQuery.get(query)!;
debugAssert(
!!queryView,
'Trying to unlisten on query not found:' + stringifyQuery(query)
);
const queries = syncEngineImpl.queriesByTarget.get(queryView.targetId)!;

if (syncEngineImpl.isPrimaryClient && queries.length === 1) {
// PORTING NOTE: Unregister the target ID with local Firestore client as
// watch target.
syncEngineImpl.sharedClientState.removeLocalQueryTarget(queryView.targetId);

remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId);
}
}

/**
* Initiates the write of local mutation batch which involves adding the
* writes to the mutation queue, notifying the remote store about new
Expand Down

0 comments on commit 9e3428d

Please sign in to comment.