Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Do not hold on to block data in the plot worker #7235

Merged
merged 26 commits into from
Dec 23, 2023

Conversation

cfoust
Copy link
Contributor

@cfoust cfoust commented Dec 15, 2023

User-Facing Changes
Drastically reduce the amount of memory required for plotting.

Description
In the beginning, I moved the plot pipeline to a Web Worker in order to reduce the burden that dataset creation and downsampling put on the main thread. This was a big change, so to reduce the number of variables I kept the logic for aggregating messages the same. As a result, we had two copies of the message data used for plotting: one in the main thread and another in the worker. This approach cost a lot of memory, but interacting with plots was much faster.

This PR finally closes the loop and makes it so that the main thread sends the messages that plots need on demand; the worker only holds on to plot datasets and not the messages necessary to produce them. This greatly reduces the memory usage of plots, with the downside that the relevant raw messages must be transmitted to the worker in full when plot parameters change.

This costs a fair bit of complexity, so to make this easier to test and reason about I broke out the logic for sending messages to clients into a standalone state machine that's used by useDatasets and added a suite of tests.

Known issues
There is still a timing issue affecting plot stories that I am investigating. Update: fixed!

@cfoust
Copy link
Contributor Author

cfoust commented Dec 19, 2023

I fixed the issue with current data. We now retransmit it to a specific client when plot params change. Right now we transmit all current data subscribed to by plots, not just what the client actually needs, so it's a little wasteful but I'm still polishing.

@defunctzombie
Copy link
Contributor

relevant raw messages must be transmitted to the worker in full when plot parameters change.

What plot parameter changes cause this to happen?

let blockState = initBlockState();
let clients: Record<string, Client> = {};
let datasetsState: DatasetsState = initDatasets();
let callbacks: Record<string, (topics: SubscribePayload[]) => void> = {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of all these globals - why do we have globals here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's module-level state that exists outside of the React tree. I prefer not to connect state to React that doesn't need to be there (and, for example, does not on its own trigger component updates.)

@cfoust
Copy link
Contributor Author

cfoust commented Dec 20, 2023

What plot parameter changes cause this to happen?

Anything that affects the output points--so, basically, all plot parameter changes. Theoretically there are some kinds of mutations for which we could just modify the existing dataset, but without having the messages used to generate that dataset it's pretty tricky.

@cfoust
Copy link
Contributor Author

cfoust commented Dec 20, 2023

Just to recap:

  1. I realized that we also needed to retransmit current data to the worker when plot parameters changed, though fixing this is much simpler than the approach necessary for transmitting blocks.
  2. I made it so that when a client is reset, we only send the current data it needs rather than all of the accumulated current messages.

},
// eslint-disable-next-line @foxglove/no-boolean-parameters
setLive(value: boolean): void {
state = setLive(value, state);
},
unregister(id: string): void {
unregisterCleint(id: string): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Cleint

Comment on lines +82 to +88
// Only send the current events that the client can actually use. This also
// saves us from having to `structuredClone` unused data
const topics = new Set(R.uniq(getClientPayloads(client).map(({ topic }) => topic)));
void service?.addCurrentData(
current.filter(({ topic }) => topics.has(topic)),
id,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all plot clients have a rolling data window (e.g. only the last 10 seconds are plotted), could we also filter out any current data that is not within that window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bit tricky and even potentially undesirable--as a user, I would not expect that to happen. For example, users seem to rely on the fact that data stays around (even if it's not visible) for the CSV export functionality

* client specified by `clientId`.
*/
export function addCurrentData(
events: readonly MessageEvent[],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to be at max only one message per topic? Because otherwise we might drop some messages with R.map((messages) => messages.slice(-1), current) below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's only for a single-message plot--which necessarily will ignore anything but the most recent message


if (isSingleMessage(params)) {
const plotData = buildPlot(
current: accumulate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all of these new current messages are already preloaded, do we still have to recompute the plot data and send it to the client? I would assume that the plot data wouldn't/shouldn't change in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically yes, but it's a pretty negligible amount of compute, worth fixing later only if we really think it's a problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding: in the case where we do have both blocks & current data, where is the logic that chooses to use the block data instead of the current data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For generating the final dataset? That happens here

concatEffects((newState: State): StateAndEffects => {
const { pending, blocks } = newState;
const { pending } = newState;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What changed in this section? I am getting a bit lost in the details and might be missing the big picture. Is it that we now use applyBlockUpdate instead of splatting the pending data into blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, exactly. It was weird and complicated to have two different code paths for ingesting block data, one that used messages and one that used BlockUpdates.

Comment on lines +128 to +148
const allUpdates = pending.map(
(update: BlockUpdate): [next: BlockUpdate, applied: BlockUpdate] => {
const { updates } = update;
const [used, unused] = R.partition(
({ id: clientId }) => clientIds.includes(clientId),
updates,
);
return [
{ ...update, updates: unused },
{ ...update, updates: used },
];
},
);

const allNewTopics = getAllTopics(newState);
const newData = R.pick(allNewTopics, pending);
if (R.isEmpty(newData)) {
return [newState, []];
}
const newPending: BlockUpdate[] = allUpdates
.filter(([unused]) => unused.updates.length > 0)
.map(([unused]) => unused);

const newTopics = Object.keys(newData);
const updatesToApply: BlockUpdate[] = allUpdates
.filter(([, used]) => used.updates.length > 0)
.map(([, used]) => used);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've understood what this is doing (though maybe I am missing something). I find there seems to be a lot of dancing around the difficulty of trying to partition each item in a list and then extract or sort of "transpose" the results to end up with two lists.

I think it would be shorter, clearer, and more efficient if structured more like below – unless there is some subtlety that I have lost – but if there is, I think it is too subtle because I can't see it after several minutes of reading the code.

const newPending: BlockUpdate[] = [];
const updatesToApply: BlockUpdate[] = [];
for (const update of pending) {
  for (const { id: clientId } of update.updates) {
    if (clientIds.includes(clientId)) {
      updatesToApply.push(update);
    } else {
      newPending.push(update);
    }
  }
}

(I'm also not quite sure I understand why we have two levels of nested collections – pending updates where each update has its own array of updates?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this section could use some improvement--no denying that--your code snippet copies the BlockUpdate (including all of its updates) on every iteration. The subtlety is that ClientUpdates refer to data inside of the messages field on the BlockUpdate; you cannot just combine BlockUpdates as it seems you're suggesting in the parenthetical at the end of your comment.

I can think of ways to clean this up, however! It's a good call-out that this is hard to understand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by copy? I don’t see anything in my snippet that would be creating new objects, just organizing them into new arrays right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“copy” here meaning “duplicate”, not in terms of memory layout. You are appending the entire BlockUpdate to the list for every ClientUpdate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which, though it’s clearly just copying the reference, is a little counterintuitive since then you have to reaggregate them when you apply all the pending updates for a client that joins later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think I get your point now. Would this be more accurate?

const newPending: BlockUpdate[] = [];
const updatesToApply: BlockUpdate[] = [];
for (const blockUpdate of pending) {
  const [used, unused] = R.partition(
    ({ id: clientId }) => clientIds.includes(clientId),
    blockUpdate.updates,
  );
  if (used.length > 0) {
    updatesToApply.push({ ...blockUpdate, updates: used });
  }
  if (unused.length > 0) {
    newPending.push({ ...blockUpdate, updates: unused });
  }
}

Comment on lines +50 to +53
// We aggregate all of the updates for each client and then apply them as a
// group. This is because we don't want the `shouldReset` field, which can
// reset the plot data, to throw away data we aggregated from an update we
// just applied.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like this means we don't necessarily want to apply the updates in order – why is that? I could imagine that if there is a list of updates with a reset=true in the middle, then it should theoretically reset things that were aggregated before it but not after it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I could state this more clearly, but a single BlockUpdate contains a single "step" forward in the block state machine. In other words, this code cannot result in out-of-order application. The scenario it's avoiding is where you apply ClientUpdates from the updates field one at a time. If one update contains shouldReset=true, all of them will for a given client, but if you apply them one by one, you might reset the client's data entirely (with shouldReset) several times.

ex, this goes from:

shouldReset update shouldReset update

to

shouldReset update update

// If we get updates for clients that haven't registered yet, we've got to
// keep that data around and use it when they register
const clientIds = state.clients.map(({ id }) => id);
const unused = updates.filter(({ id }) => !clientIds.includes(id));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random drive-by thought as I'm reading this – if we are usually accessing updates by specific client ids, would it make sense to store them grouped by id instead of in a single array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is totally subjective and I'm not going to die on this hill, I have a mild preference in favor of lists over associative arrays, mostly because it makes mapping and filtering more ergonomic. In this case, however, it may make sense given that we just groupBy in applyBlockUpdate anyway, but I will think about it

packages/studio-base/src/panels/Plot/processor/messages.ts Outdated Show resolved Hide resolved
packages/studio-base/src/panels/Plot/blocks.ts Outdated Show resolved Hide resolved
packages/studio-base/src/panels/Plot/processor/messages.ts Outdated Show resolved Hide resolved
Comment on lines 91 to 92
return {
...a,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mildly concerned that a proliferation of reduce with object spreads will lead to a lot more CPU work being done than it would be with a loop more like const clientMessages = {}; for (const clientUpdate of updates) { clientMessages[clientUpdate.update.topic] = ... }. In less-hot code paths or when N is small I'm sure it's not a big deal but I am gathering that this will be happening rapidly many times in a row for each topic during preloading?

Copy link
Contributor Author

@cfoust cfoust Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the worst case this code block runs N*M times (for each client) while preloading data, where N is the number of blocks, M is the number of topics, so usually this is pretty small. In relative terms the compute cost of this is dwarfed by dataset generation and downsampling, where runtime complexity is obviously a function of the number of messages.


if (isSingleMessage(params)) {
const plotData = buildPlot(
current: accumulate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding: in the case where we do have both blocks & current data, where is the logic that chooses to use the block data instead of the current data?

@defunctzombie defunctzombie merged commit dcf781b into main Dec 23, 2023
14 checks passed
@defunctzombie defunctzombie deleted the caleb/13-12-23/feat/dont-keep-block-data branch December 23, 2023 20:24
@@ -24,6 +25,9 @@ import { BlockUpdate, ClientUpdate } from "../blocks";
import { Messages } from "../internalTypes";
import { isSingleMessage } from "../params";

// Maximum number of accumulated current messages before triggering a cull
const ACCUMULATED_CURRENT_MESSAGE_CULL_THRESHOLD = 50_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have an upper limit for memory size? Depending on the message size, 50k messages might be way more than the allowed JS heap size, so keeping on to the full message objects should be avoided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly - though I think we use typedarrays for storing this data so it won't be the same kind of impact on the heap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, you are right. I was referring to the wrong code section, I actually meant

datasetsState = updateCurrent(messages, datasetsState);
where we concatenate new current message events to existing ones, and there seems to be no culling in place like it is here. I'm pretty sure that this is one of the main reasons for OOM crashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> #7298

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants