Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
Ignore current seek when a new seek occurs in RandomAccessPlayer (#2231)
Browse files Browse the repository at this point in the history
In RandomAccessPlayer when a seek is already loading messages, new seek requests are queued (queue size of 1). Prior to this change, when a seek request arrived while an existing seek request was in-flight, the existing request would finish, emit state, and then the new request would start.

Prior to emitting state, the in-flight seek request would set the nextReadStartTime which would override the nextReadStartTime of the latest seek request. Even if the seek requests were for the same time, the first request would alter the next read time of the subsequent request.

This change updates the RandomAccessPlayer logic to detect if a seek request occured while another was in-flight and avoid emitting a state update for the now-stale seek request.

Fixes: #1712
  • Loading branch information
defunctzombie committed Dec 1, 2021
1 parent dd72e37 commit e5712a3
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 23 deletions.
5 changes: 2 additions & 3 deletions packages/studio-base/src/components/PlayerManager.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
PropsWithChildren,
useCallback,
useContext,
useEffect,
useLayoutEffect,
useMemo,
useRef,
Expand Down Expand Up @@ -119,10 +118,10 @@ export default function PlayerManager(props: PropsWithChildren<PlayerManagerProp
return headerStampPlayer;
}, [basePlayer, initialMessageOrder, userNodeActions]);

useEffect(() => {
useLayoutEffect(() => {
player?.setMessageOrder(messageOrder ?? DEFAULT_MESSAGE_ORDER);
}, [player, messageOrder]);
useEffect(() => {
useLayoutEffect(() => {
player?.setUserNodes(userNodes ?? EMPTY_USER_NODES);
}, [player, userNodes]);

Expand Down
43 changes: 42 additions & 1 deletion packages/studio-base/src/players/RandomAccessPlayer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,46 @@ describe("RandomAccessPlayer", () => {
source.close();
});

// Seeking while loading messages for a previous seek cancels the previous seek
it("a new seek during existing seek cancels existing seek", async () => {
const provider = new TestProvider();
const source = new RandomAccessPlayer(
{ name: "TestProvider", args: { provider }, children: [] },
{ ...playerOptions, seekToTime: getSeekToTime() },
);
const store = new MessageStore(2);
source.setListener(store.add);

await store.done;

const emptyMessages = async (): Promise<GetMessagesResult> => {
return {};
};

provider.getMessages = emptyMessages;

store.reset(1);
source.setSubscriptions([{ topic: "/foo/bar" }]);
source.requestBackfill();

await store.done;

// This replicates behavior of seeking during a processing sync
provider.getMessages = async (): Promise<GetMessagesResult> => {
provider.getMessages = emptyMessages;
source.seekPlayback({ sec: 11, nsec: 0 });
return {};
};

store.reset(1);
source.seekPlayback({ sec: 11, nsec: 0 });

const messages: any = await store.done;
expect(messages[0]!.activeData.currentTime).toEqual({ sec: 11, nsec: 0 });

source.close();
});

it("calls listener with player state changes on play/pause", async () => {
const provider = new TestProvider();
const source = new RandomAccessPlayer(
Expand Down Expand Up @@ -540,6 +580,7 @@ describe("RandomAccessPlayer", () => {
expect(end).toEqual({ sec: 10, nsec: 0 });
return getMessagesResult;
case 2: {
// from starting playback (tick)
expect(start).toEqual({ sec: 10, nsec: 0 });
expect(end).toEqual({ sec: 10, nsec: 4000000 });
const parsedMessages: MessageEvent<unknown>[] = [
Expand Down Expand Up @@ -1008,7 +1049,7 @@ describe("RandomAccessPlayer", () => {
lastGetMessagesCall.resolve(getMessagesResult);
expect(lastGetMessagesCall).toEqual({
start: { sec: 10, nsec: 0 }, // Clamped to start
end: { sec: 10, nsec: 1 }, // Clamped to start
end: { sec: 10, nsec: 0 }, // Clamped to start
topics: { parsedMessages: ["/foo/bar"] },
resolve: expect.any(Function),
});
Expand Down
49 changes: 30 additions & 19 deletions packages/studio-base/src/players/RandomAccessPlayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ export default class RandomAccessPlayer implements Player {
return;
}

// our read finished and we didn't seed during the read, prepare for the next tick
// our read finished and we didn't seek during the read, prepare for the next tick
// we need to do this after checking for seek changes since seek time may have changed
this._nextReadStartTime = add(end, { sec: 0, nsec: 1 });

Expand Down Expand Up @@ -559,12 +559,21 @@ export default class RandomAccessPlayer implements Player {
}

private _seekPlaybackInternal = debouncePromise(async (backfillDuration?: Time) => {
// track seek time so _tick can know if a seek happened while reading messages in a tick
const seekTime = Date.now();
this._lastSeekStartTime = seekTime;

this._cancelSeekBackfill = false;
// cancel any queued _emitState that might later emit messages from before we seeked
this._messages = [];

// If a backfill duration is provided, we always perform the backfill even if playing
// If a backfill duration is not provided, we only perform the backfill when paused
if (this._isPlaying && !backfillDuration) {
this._lastSeekEmitTime = seekTime;
return;
}

// backfill includes the current time we've seek'd to
// playback after backfill will load messages after the seek time
const backfillEnd = clampTime(this._nextReadStartTime, this._start, this._end);
Expand All @@ -583,25 +592,26 @@ export default class RandomAccessPlayer implements Player {
this._end,
);

// Only getMessages if we have some messages to get.
if (backfillDuration || !this._isPlaying) {
const { parsedMessages: messages } = await this._getMessages(backfillStart, backfillEnd);
// Only emit the messages if we haven't seeked again / emitted messages since we
// started loading them. Note that for the latter part just checking for `isPlaying`
// is not enough because the user might have started playback and then paused again!
// Therefore we really need something like `this._cancelSeekBackfill`.
if (this._lastSeekStartTime === seekTime && !this._cancelSeekBackfill) {
// similar to _tick(), we set the next start time past where we have read
// this happens after reading and confirming that playback or other seeking hasn't happened
this._nextReadStartTime = add(backfillEnd, { sec: 0, nsec: 1 });

this._messages = messages;
this._lastSeekEmitTime = seekTime;
this._emitState();
}
} else {
// If we are playing, make sure we set this emit time so that consumers will know that we seeked.
const prevNextReadStartTime = this._nextReadStartTime;
const { parsedMessages: messages } = await this._getMessages(backfillStart, backfillEnd);

// If the read time was altered (another seek request), then ignore messages from this seek
if (prevNextReadStartTime !== this._nextReadStartTime) {
return;
}

// Only emit the messages if we haven't seeked again / emitted messages since we
// started loading them. Note that for the latter part just checking for `isPlaying`
// is not enough because the user might have started playback and then paused again!
// Therefore we really need something like `this._cancelSeekBackfill`.
if (!this._cancelSeekBackfill) {
// similar to _tick(), we set the next start time past where we have read
// this happens after reading and confirming that playback or other seeking hasn't happened
this._nextReadStartTime = add(backfillEnd, { sec: 0, nsec: 1 });

this._messages = messages;
this._lastSeekEmitTime = seekTime;
this._emitState();
}
});

Expand All @@ -610,6 +620,7 @@ export default class RandomAccessPlayer implements Player {
if (!this._initialized) {
return;
}

this._metricsCollector.seek(time);
this._setNextReadStartTime(time);
this._seekPlaybackInternal(backfillDuration);
Expand Down

0 comments on commit e5712a3

Please sign in to comment.