-
Notifications
You must be signed in to change notification settings - Fork 171
APEXCORE-525 fail validation if statefull streamcodec does not implement newInstance method. #387
Conversation
|
Can we change |
| statefulSerde = ((StatefulStreamCodec<Object>)StreamContext.CODEC.defaultValue).newInstance(); | ||
| } else if (codec instanceof StatefulStreamCodec) { | ||
| statefulSerde = ((StatefulStreamCodec<Object>)codec).newInstance(); | ||
| if (!codec.getClass().isAssignableFrom(statefulSerde.getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the check necessary if validation already checks for the correct implementation of newInstance()?
|
|
8ab11fa to
9028a76
Compare
|
@tushargosavi Please fix checkstyle, add unit test and comment on BufferServerPublisher changes. |
|
don't merge this pull request yet. I will do some tests early next week, and fix the checkstyle errors and comment on BufferServerPublisher changes |
9028a76 to
bd860bc
Compare
|
@vrozov I have updated the pull request. |
|
@tushargosavi Please add unit test. |
bd860bc to
5746b44
Compare
|
Added a unit test checking for type of instance returned by newInstance method. |
vrozov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good.
| Object inner = outer.new InnerClass(); | ||
|
|
||
| for (Object o: new Object[] {outer, inner}) { | ||
| for (Object o : new Object[]{outer, inner}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the change necessary? Does it impact checkstyle?
| } | ||
| } | ||
|
|
||
| static class MyStreamCodec extends DefaultStatefulStreamCodec<String> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be private? Can the name be changed to StatefulStreamCodec to avoid My...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked, it can't be private as it will lead to instantiation error. For the name I suggest:
static class NoNewInstanceStatefulStreamCodec<T> extends DefaultStatefulStreamCodec<T>
{
}
| { | ||
| MyStreamCodec codec = new MyStreamCodec(); | ||
| DefaultStatefulStreamCodec newCodec = codec.newInstance(); | ||
| Assert.assertEquals("The class type is correct", newCodec.getClass(), codec.getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add codec != newCodec assertion and checks for DefaultStatefulStreamCodec?
…mCodec.newInstance
5746b44 to
0514b46
Compare
|
@vrozov I have done the suggested changes |
|
@tushargosavi commit is merged to apache master. @asfgit did not close PR due to forced push. Please close it manually. |
|
closing the pull request, as it did not got closed because of forced push |
If user implements a streamcodec extending DefaultStatefulStreamCodec they need to also remember to implement newInstance method as platform use object returned by newInstance as codec. If use forget to override newInstance then platform keep on using default codec without any warning to user and
such error are caught very late mostly resulting incorrect data distribution and incorrect result at final
operator.