Skip to content

Commit

Permalink
Fix "savedOp" metadata property propagation for grouped ops (microsof…
Browse files Browse the repository at this point in the history
…t#20837)

Container class will set savedOp property on op before processing stashed ops.
Container Runtime uses this information to differentiate ops.

I do not see us propagating savedOp metadata property from grouped ops (as visible in Container) to ungrouped ops. OpGroupingManager.ungroupOp() does not do it as far as I can see. I think we are getting lucky with ID compression ops as they are usually not grouped (there is usually a single ID compression op, as it's part of its own batch). When/if any of those assumptions change, this code would stop working.

Propagate properly this property.
  • Loading branch information
vladsud authored and Tony Murphy committed Apr 29, 2024
1 parent ee2267c commit c7f07b5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
49 changes: 26 additions & 23 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ import {
type OutboundContainerRuntimeMessage,
type UnknownContainerRuntimeMessage,
} from "./messageTypes.js";
import { IBatchMetadata, IIdAllocationMetadata } from "./metadata.js";
import { IBatchMetadata, ISavedOpMetadata } from "./metadata.js";
import {
BatchMessage,
IBatch,
Expand Down Expand Up @@ -671,11 +671,13 @@ type MessageWithContext =
message: InboundSequencedContainerRuntimeMessage;
modernRuntimeMessage: true;
local: boolean;
savedOp?: boolean;
}
| {
message: InboundSequencedContainerRuntimeMessageOrSystemMessage;
modernRuntimeMessage: false;
local: boolean;
savedOp?: boolean;
};

const summarizerRequestUrl = "_summarizer";
Expand Down Expand Up @@ -1128,7 +1130,7 @@ export class ContainerRuntime
// Id Compressor serializes final state (see getPendingLocalState()). As result, it needs to skip all ops that preceeded that state
// (such ops will be marked by Loader layer as savedOp === true)
// That said, in "delayed" mode it's possible that Id Compressor was never initialized before getPendingLocalState() is called.
// In such case we have to process all ops, including those marked with saveOp === true.
// In such case we have to process all ops, including those marked with savedOp === true.
private readonly skipSavedCompressorOps: boolean;

public get idCompressorMode() {
Expand Down Expand Up @@ -2540,21 +2542,28 @@ export class ContainerRuntime
// We do not need to make a deep copy. Each layer will just replace message.contents itself,
// but will not modify the contents object (likely it will replace it on the message).
const messageCopy = { ...messageArg };
const savedOp = (messageCopy.metadata as ISavedOpMetadata)?.savedOp;
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
if (modernRuntimeMessage) {
this.processCore({
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
// There is nothing really ensuring that anytime original message.type is Operation that
// the result messages will be so. In the end modern bool being true only directs to
// throw error if ultimately unrecognized without compat details saying otherwise.
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
});
} else {
// Unrecognized message will be ignored.
this.processCore({ message, local, modernRuntimeMessage });
}
const msg: MessageWithContext = modernRuntimeMessage
? {
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
// There is nothing really ensuring that anytime original message.type is Operation that
// the result messages will be so. In the end modern bool being true only directs to
// throw error if ultimately unrecognized without compat details saying otherwise.
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
}
: // Unrecognized message will be ignored.
{
message,
local,
modernRuntimeMessage,
};
msg.savedOp = savedOp;

// ensure that we observe any re-entrancy, and if needed, rebase ops
this.ensureNoDataModelChanges(() => this.processCore(msg));
}
}

Expand Down Expand Up @@ -2642,13 +2651,7 @@ export class ContainerRuntime
// stashed ops flow. The compressor is stashed with these ops already processed.
// That said, in idCompressorMode === "delayed", we might not serialize ID compressor, and
// thus we need to process all the ops.
if (
!(
this.skipSavedCompressorOps &&
(messageWithContext.message.metadata as IIdAllocationMetadata)?.savedOp ===
true
)
) {
if (!(this.skipSavedCompressorOps && messageWithContext.savedOp === true)) {
const range = messageWithContext.message.contents;
// Some other client turned on the id compressor. If we have not turned it on,
// put it in a pending queue and delay finalization.
Expand Down
4 changes: 2 additions & 2 deletions packages/runtime/container-runtime/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ export interface IBlobMetadata {
}

/**
* The IdCompressor needs to know if this is a replayed savedOp as those need to be skipped in stashed ops scenarios.
* ContainerRuntime needs to know if this is a replayed savedOp as those need to be skipped in stashed ops scenarios.
*/
export interface IIdAllocationMetadata {
export interface ISavedOpMetadata {
savedOp?: boolean;
}

0 comments on commit c7f07b5

Please sign in to comment.