New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: plan serialization jackson annotations #3520
feat: plan serialization jackson annotations #3520
Conversation
Hey @rodesai . I am curious about the general approach here. I notice the fields of the objects that are serialized are specified using Jackson annotations. Does that not mean the serialized form will change even if there is a single change to any of the fields that are annotated? How do we then maintain compatibility between versions of ksql? E.g. if node 1 is running a slightly different version of ksql to node 2 then the serialized form shared between the nodes won't be compatible unless there have no changes to any of the annotated objects. If we are defining a serialization format that we want to be usable between different versions of ksql, I suspect we want to define it elsewhere and lock it down. |
So we're on the same page - the current design is for these plans to be backwards-compatible - new versions of KSQL should be able to read and execute old plans, and old versions should be able to detect new incompatible plans (and then halt command topic execution). To make sure this happens, we'll modify the query translation tests to write out and run against the execution plans KSQL generates. We will also extend these to fail when the plan changes (even in a compatible way). If the plan changes in an incompatible way the tests will fail to deserialize or fail test cases, and the developer should fix their change. If the plan changes in a compatible way, we'll have a tool for generating and writing out a new set of plans (for the cases that need it, alongside the old plans).
I (maybe?) agree here. I'll be moving these into their own module (ksql-models) in a follow-up. The testing described above should be enough to keep them compatible. Not sure if this goes as far as what you had in mind. |
I think, quite possibly, that I don't understand well enough the overall plan. But I kind of hear alarm bells going off, if we're exposing a part of the implementation as something that is set in stone, and we can't evolve over time. In general any approach where we tie the implementation down to a specific form, sounds like a set of ankle shackles for the future, and that scares me. Reminds me a bit of the horror of Java serialization, where users would serialize parts of their internal object model, which was deserialized by other instances. For that to work across versions it meant that the internal objects that were serialized could never change (or at least they could only change in a compatible way), but either way, was a bit of a nightmare. Let's talk about this in person. As I said, it's quite possible I haven't fully understood the proposal yet :) |
638bac2
to
bd73651
Compare
50c93ba
to
21bae3f
Compare
Some quick thoughts on the serialized form: Stream-Stream Join:
I think the issue here is that you're serializing the physical model and ddl command as-is, rather than only serializing the minimum data we need to rebuild that physical plan. If you got rid of all the duplication and unnecessary/implicit data, then the serialized form would be much cleaner. Of course, this means you can't just deserialize each type directly. You'll need something that can pull the necessary information together, e.g. to build the ddl command you'd need the statement text from the top level object and the details within the ddlCommand node. This comes back to my previous comment on a previous PR on the same topic: we need to be careful how we expose this internal state, so that we're as loosely coupled as we can be to it. (As Tim alludes to). |
I agree, and plan to remove this in a follow-up as discussed on #3722
I've got a follow up to clean this up. I'll add a json ignore to it for now.
Yes, but I think this is actually useful - we might want to use a different serialization format when repartitioning or materializing a stream/table. That said, maybe there's some information we can clean up from this (e.g. windowing info)
Hold-over for compatibility reasons from before the major version bump. will clean this up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai
General comments:
- we should be marking public factory methods as
@JsonCreator
, not private constructors. - we should avoid marking private fields as @JsonProperty where ever possible. It breaks encapsulation.
- we should make
@JsonProperty
asrequired
where they are not optional.
Having said that, I'm still going to block this PR for two reasons:
- There a no tests that any of these types serialize to / from JSON correctly. If we're going this route it's crucial to have such tests.
- I'm still have reservations about the approach. Details below.
So.. design wize, I understand that the idea of serializing the JSON representation of our query plan to the command topic is to allow us to:
a) Change our SQL syntax in non-backwards compatible ways.
b) Change the query plan a specific SQL query is built as, .e.g. avoiding an unnecessary repartition step.
However, the current approach still feels like we're strongly coupling our impl with our serialized form. @purplefox had reservations about this too. At the moment, it feels like too much of our internal implementation is being exposed in the JSON, because the JSON is being built from our internal model. This makes it much harder to change that internal model. For example, I plan on enhancing LogicalSchema to know if the key is windowed or not, (after all this is part of the schema!). This will mean we no longer need KeyFormat
, but KeyFormat
is part of our JSON model.
So, what's the solution? I'm not 100% sure. My gut is that we need to decouple the JSON from many of the types that have JSON annotations added in this PR. Maybe we should limit what classes get serialized to JSON to those in the ksql-execution module and io.confluent.ksql.execution
package, and maybe the *Name
types, or something like that? Adding new Pojo types to this package to hold the info we need for a specific part of the process, and then later use this pojo to build the actual types we need. This might be more work, but provide better design, less coupling and more flexibility. Another alternative would be to use custom serde classes. Though this tends to lead to less readable code. Another suggestion would be to define the schema as a JSON Schema and code gen classes from this.
@purplefox might how some ideas too. Maybe we can have a three-way discussion? I'm sure we can come up with something!
As I said, blocking the PR for this reason. Feel free to reach out on Slack. I sometimes leave slack messages unread if I'm concentrating on code. If its urgent, just call me on Slack.
@@ -51,6 +52,7 @@ public static ColumnName of(final String name) { | |||
return new ColumnName(name); | |||
} | |||
|
|||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to the public factory method.
@@ -27,6 +28,7 @@ public static FunctionName of(final String name) { | |||
return new FunctionName(name); | |||
} | |||
|
|||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to the public factory method.
@@ -28,7 +29,7 @@ | |||
*/ | |||
@Immutable | |||
public abstract class Name<T extends Name<?>> { | |||
|
|||
@JsonValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to the public name
getter. (Though you may need to rename the method to getName
).
@@ -27,6 +28,7 @@ public static SourceName of(final String name) { | |||
return new SourceName(name); | |||
} | |||
|
|||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to the public factory method.
@@ -41,10 +42,11 @@ public static FormatInfo of( | |||
return new FormatInfo(format, avroFullSchemaName, valueDelimiter); | |||
} | |||
|
|||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to the public factory method.
} | ||
|
||
public List<String> getContext() { | ||
return context; | ||
} | ||
|
||
@JsonValue | ||
public String formatContext() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public String formatContext() { | |
public String toString() { |
???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^^ Outstanding suggestion from last review
|
||
@JsonCreator | ||
private QueryContext(final String context) { | ||
this(Arrays.asList(context.split(DELIMITER))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this(Arrays.asList(context.split(DELIMITER))); | |
this(ImmutableList.copyOf(context.split(DELIMITER))); |
ksql-execution/src/main/java/io/confluent/ksql/execution/context/QueryContext.java
Show resolved
Hide resolved
@JsonProperty("sourceName") final SourceName sourceName, | ||
@JsonProperty("schema") final LogicalSchema schema, | ||
@JsonProperty("keyField") final Optional<ColumnName> keyField, | ||
@JsonProperty("timestampExtractionPolicy") final TimestampExtractionPolicy extractionPolicy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JsonProperty("timestampExtractionPolicy") final TimestampExtractionPolicy extractionPolicy, | |
@JsonProperty("timestampExtractor") final TimestampExtractionPolicy timestampExtractor, |
Summarizing our discussion from last week:
|
fa55570
to
0aa3b17
Compare
@@ -113,6 +113,20 @@ | |||
<scope>test</scope> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>com.kjetland</groupId> | |||
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the JSON schema generation library. I played with a few, and this was the only one that correctly handles interfaces and Jackson annotations. It uses the MIT license.
0aa3b17
to
e851f35
Compare
<dependency> | ||
<groupId>javax.validation</groupId> | ||
<artifactId>validation-api</artifactId> | ||
<version>${javax-validation.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the schema generator relies on a newer version of javax-validation than we pull in by default.
@@ -0,0 +1,1062 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the schema includes:
Plan Nodes:
KsqlPlanV1
QueryPlan
PhysicalPlan[Object]
StreamAggregate
DefaultExecutionStepProperties
StreamFilter
StreamFlatMap
StreamGroupBy
StreamGroupByKey
StreamMapValues
StreamSelectKey
StreamSink
StreamSource
WindowedStreamSource
StreamStreamJoin
StreamTableJoin
StreamToTable
StreamWindowedAggregate
TableAggregate
TableFilter
TableGroupBy
TableMapValues
TableSink
TableTableJoin
ExecutionStep
DDL commands:
CreateStreamCommand
KsqlTopic
CreateTableCommand
RegisterTypeCommand
DropSourceCommand
DropTypeCommand
Timestamp Extractors:
MetadataTimestampExtractionPolicy
StringTimestampExtractionPolicy
LongColumnTimestampExtractionPolicy
Formats:
KeyFormat
FormatInfo
WindowInfo
I'm working on follow-ups to clean up the timestamp extractors and format types from this schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove KsqlTopic
from this list for sure. That's pure implementation detail.
e851f35
to
7683410
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai
The fact that there are no tests to test that these types can be serialized to/from JSON seems very wrong to me. The PR is all about adding the annotations to serialize to/from JSON, but we don't know if the code is wright. IMHO, each annotated class should have suitable unit tests to ensure correct serialization and deserialization.
Aside from that, the PR LGTM.
@@ -23,7 +26,6 @@ | |||
*/ | |||
@Immutable | |||
public final class ValueFormat { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^^ Outstanding question/nit from last review.
@@ -41,14 +43,16 @@ public static KeyFormat windowed( | |||
return new KeyFormat(format, Optional.of(windowInfo)); | |||
} | |||
|
|||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^^^ Outstanding question / issue from the late review.
private final WindowType type; | ||
private final Optional<Duration> size; | ||
|
||
public static WindowInfo of(final WindowType type, final Optional<Duration> size) { | ||
return new WindowInfo(type, size); | ||
} | ||
|
||
private WindowInfo(final WindowType type, final Optional<Duration> size) { | ||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move annotations to the public factory method please
|
||
private final ColumnRef timestampField; | ||
@JsonProperty(FORMAT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^^^ Outstanding issue from last review
@JsonProperty(value = "physicalPlan", required = true) final ExecutionStep<T> physicalPlan, | ||
@JsonProperty(value = "planSummary", required = true) final String planSummary | ||
) { | ||
this(queryId, physicalPlan, planSummary, Optional.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we're no serializing the key field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not needed - its already in the ddl. Cleaning this up in an upcoming pr.
"$ref" : "#/definitions/FormatInfo" | ||
}, | ||
"ksqlSink" : { | ||
"type" : "boolean" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely shouldn't be exposing this. (follow up PR to fix)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"$ref" : "#/definitions/FormatInfo" | ||
}, | ||
"windowInfo" : { | ||
"$ref" : "#/definitions/WindowInfo" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meh... this is going to make it hard to move window info to the shema...
"delimiter" : { | ||
"type" : "string" | ||
}, | ||
"avroFullSchemaName" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make this just fullSchemaName
so that its not Avro specific. (Follow up PR)
"type" : "string" | ||
} | ||
}, | ||
"required" : [ "format", "delimiter" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delimiter is NOT required. Only required for DELIMITER format.
}, | ||
"required" : [ "sources", "sink", "physicalPlan" ] | ||
}, | ||
"PhysicalPlan[Object]" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove the generics on PhysicalPlan so that we drop the [Object]
here. The generics aren't actually of any use.
Collections.emptySet(), | ||
// the schema generator doesn't play nice with custom serializers, so we add a | ||
// config to remap the custom-serialized types to their underlying primitive | ||
new ImmutableMap.Builder<Class<?>, Class<?>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only a temporary thing? What does this mean in terms of:
a) the schema generated: are we losing schema validation on these types?
b) the serialized form: how do these types look when serialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a) the schema generated: are we losing schema validation on these types?
Yes - from a jsonschema POV we can't do much better than defining these schemas as strings. I think we can add a contentMediaType
that describes what the string contains (e.g. text/sqlexpression
, text/sqlschema
, etc). Let me give that a go.
b) the serialized form: how do these types look when serialized?
These are all serialized as text parseable by the ksql parser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm approving purely to unblock future work. Personally, I still feel strongly that each annotated type should have associated unit tests to test serialization to/from JSON.
Please also get approval from @agavra or @purplefox
I definitely agree we need these tests. I'm just proposing we put them in when the final format is settled, but before actually writing these models out anywhere. |
This patch adds jackson annotations needed to serialize execution plans: - query context / query ID - DDLs - execution steps - name types It also adds annotations for a few types that the above types depend on: - timestamp extractors - format pojos needed for serdes These will be cleaned up in a later change. Finally, this patch includes a tool for generating a JsonSchema spec from the annotated types, a first cut of this schema, and a test for checking that the schema isn't changed.
7683410
to
3fc73ba
Compare
This patch adds jackson annotations needed to serialize execution plans:
- query context
- DDLs
- execution steps
- name types
It also adds annotations for a few types that the above types depend on:
- timestamp extractors
- format pojos needed for serdes
These will be cleaned up in a later change.
Finally, this patch includes a tool for generating a JsonSchema spec from the
annotated types, a first cut of this schema, and a test for checking that the
schema isn't changed.