Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1312: Add Control Messages and Intermediate Stream Serde #207

Closed
wants to merge 6 commits into from

Conversation

xinyuiscool
Copy link
Contributor

In this patch, we add the control message types which includes:

  • EndOfStreamMessage
  • WatermarkMessage

To support in-band data and control messages, we provide a wrapper serde (IntermediateMessageSerde) to serialize/deserialize data/control messages based on message type byte (first byte in the intermediate stream message). The format of the message is defined in SAMZA-1312. The patch integrates this serde with SerdeManager.

Tested in example jobs deployed locally and works as expected.

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good!

@@ -124,6 +126,9 @@ public JobConfig generateConfig(String executionPlanJson) {

configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);

// write input/output streams to configs
inEdges.stream().filter(StreamEdge::isIntermeidate).forEach(e -> addStreamConfig(e, configs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/isIntermeidate/isIntermediate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch! Covfefe!

this(streamSpec, false);
}

StreamEdge(StreamSpec streamSpec, boolean isIntermeidate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Intermeidate/Intermediate

@@ -133,6 +133,7 @@ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
StreamEdge edge = getOrCreateStreamEdge(streamSpec);
edge.addSourceNode(from);
edge.addTargetNode(to);
edge.setIsIntermediate(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider wrapping this into the constructor, and getting rid off the setter?
StreamEdge(isIntermediate)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Fixed it.

@@ -124,6 +126,9 @@ public JobConfig generateConfig(String executionPlanJson) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should move the log line after the newly appended configs. This will cause the configs related to intermediate streams to not be reflected when printing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, moved it.

@@ -124,6 +126,9 @@ public JobConfig generateConfig(String executionPlanJson) {

configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);

// write input/output streams to configs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be slightly cleaner to return a map from addStreamConfig instead of mutating the passed in configs map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function creates the configs map so I think it's ok to set fields on it directly in addStreamConfig. It's a private so it won't have outer-scope impact.

return taskCount;
}

@JsonProperty("version")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider de-coupling serialization (Jackson annotations etc..) from the actual ControlMessage itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use getter/setter to avoid the jsonproperty thing, or any ideas? I guess it doesn't matter much even if we want to use other kinds of serde. The annotations will be ignored, right?



/**
* This class provides serialization/deserialziation of the intermediate messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: deserialization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covfefe!

.map(serde => (systemStream, new StringSerde("UTF-8")))
}).toMap

val intermedidateStreamMessageSerdes = intermediateStreams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: s/intermedidate/intermediate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Convfefe!


} catch (Exception e) {
// For backward compatibility, we fall back to user-provided serde
return userMessageSerde.fromBytes(bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably safer to fail early instead of attempting to de-serialize with the user-provided serde?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is for backward compatibility. I added more comments here so it is easier to understand. The logic here is a little bit cumbersome.

break;
default:
object = null;
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are handling backwards compatibility if (and only if) the first sentinel byte is either a 0, 1 or 2. In all other cases, would n't we simply return null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to throw the exception instead of silently ignore those value. Here I set to null is that the type exists but not handled. It's not that great either. I updated the patch to throw an exception here too.

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me.

approved.

one f/b:

There appear to be the following information we embed in our Config object:

  1. Configs specified to us by the user
  2. Configs that we clobber/ rewrite in the ExecutionPlanner
  3. Configs that are generated and are purely internal to the Samza run-time. For instance, StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID
  4. Misc. stuff like our generated JSON plan

2,3 corresponds to the serialization of the graph itself. Would be nice to have a plan to separate these. (not necessarily this PR)

import org.apache.samza.message.WatermarkMessage;
import org.codehaus.jackson.type.TypeReference;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: no new line

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants