Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kstreams #143

Merged
merged 1 commit into from Jun 14, 2019
Merged

Fix kstreams #143

merged 1 commit into from Jun 14, 2019

Conversation

sbrauer
Copy link
Contributor

@sbrauer sbrauer commented Jun 12, 2019

Before this change, trying to use kstreams would fail because it was
missing the Consumed argument.
The newly added test would throw an exception like this:

Error in KStream
Uncaught exception, not in assertion
   error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=topic-a, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: clojure.lang.Symbol / value type: clojure.lang.Symbol). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
    at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:406)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
    at jackdaw.streams.mock$producer$produce_BANG___8206.invoke(mock.clj:74)
    at jackdaw.streams.mock$publish.invokeStatic(mock.clj:80)
    at jackdaw.streams.mock$publish.invoke(mock.clj:78)
    at clojure.core$partial$fn__5561.invoke(core.clj:2618)
    at jackdaw.streams_test$fn__11189.invokeStatic(streams_test.clj:482)
    at jackdaw.streams_test$fn__11189.invoke(streams_test.clj:60)
    at cider.nrepl.middleware.test$test_var$fn__9569.invoke(test.clj:202)
    at cider.nrepl.middleware.test$test_var.invokeStatic(test.clj:202)
    at cider.nrepl.middleware.test$test_var.invoke(test.clj:194)
    at cider.nrepl.middleware.test$test_vars$fn__9573$fn__9578.invoke(test.clj:217)
    at clojure.test$default_fixture.invokeStatic(test.clj:686)
    at clojure.test$default_fixture.invoke(test.clj:682)
    at cider.nrepl.middleware.test$test_vars$fn__9573.invoke(test.clj:217)
    at clojure.test$default_fixture.invokeStatic(test.clj:686)
    at clojure.test$default_fixture.invoke(test.clj:682)
    at cider.nrepl.middleware.test$test_vars.invokeStatic(test.clj:214)
    at cider.nrepl.middleware.test$test_vars.invoke(test.clj:208)
    at cider.nrepl.middleware.test$test_ns.invokeStatic(test.clj:230)
    at cider.nrepl.middleware.test$test_ns.invoke(test.clj:221)
    at cider.nrepl.middleware.test$test_var_query.invokeStatic(test.clj:241)
    at cider.nrepl.middleware.test$test_var_query.invoke(test.clj:234)
    at cider.nrepl.middleware.test$handle_test_var_query_op$fn__9617$fn__9618.invoke(test.clj:279)
    at clojure.lang.AFn.applyToHelper(AFn.java:152)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at clojure.core$apply.invokeStatic(core.clj:657)
    at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965)
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965)
    at clojure.lang.RestFn.invoke(RestFn.java:425)
    at cider.nrepl.middleware.test$handle_test_var_query_op$fn__9617.invoke(test.clj:271)
    at clojure.lang.AFn.run(AFn.java:22)
    at nrepl.middleware.session$session_exec$main_loop__1031$fn__1035.invoke(session.clj:171)
    at nrepl.middleware.session$session_exec$main_loop__1031.invoke(session.clj:170)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: clojure.lang.Symbol cannot be cast to java.lang.Long
    at org.apache.kafka.common.serialization.LongSerializer.serialize(LongSerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 45 more

Signed-off-by: Sam Brauer sam.brauer@gmail.com

Before this change, trying to use `kstreams` would fail because it was
missing the Consumed argument.
The newly added test would throw an exception like this:

```
Error in KStream
Uncaught exception, not in assertion
   error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=topic-a, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: clojure.lang.Symbol / value type: clojure.lang.Symbol). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
    at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:406)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
    at jackdaw.streams.mock$producer$produce_BANG___8206.invoke(mock.clj:74)
    at jackdaw.streams.mock$publish.invokeStatic(mock.clj:80)
    at jackdaw.streams.mock$publish.invoke(mock.clj:78)
    at clojure.core$partial$fn__5561.invoke(core.clj:2618)
    at jackdaw.streams_test$fn__11189.invokeStatic(streams_test.clj:482)
    at jackdaw.streams_test$fn__11189.invoke(streams_test.clj:60)
    at cider.nrepl.middleware.test$test_var$fn__9569.invoke(test.clj:202)
    at cider.nrepl.middleware.test$test_var.invokeStatic(test.clj:202)
    at cider.nrepl.middleware.test$test_var.invoke(test.clj:194)
    at cider.nrepl.middleware.test$test_vars$fn__9573$fn__9578.invoke(test.clj:217)
    at clojure.test$default_fixture.invokeStatic(test.clj:686)
    at clojure.test$default_fixture.invoke(test.clj:682)
    at cider.nrepl.middleware.test$test_vars$fn__9573.invoke(test.clj:217)
    at clojure.test$default_fixture.invokeStatic(test.clj:686)
    at clojure.test$default_fixture.invoke(test.clj:682)
    at cider.nrepl.middleware.test$test_vars.invokeStatic(test.clj:214)
    at cider.nrepl.middleware.test$test_vars.invoke(test.clj:208)
    at cider.nrepl.middleware.test$test_ns.invokeStatic(test.clj:230)
    at cider.nrepl.middleware.test$test_ns.invoke(test.clj:221)
    at cider.nrepl.middleware.test$test_var_query.invokeStatic(test.clj:241)
    at cider.nrepl.middleware.test$test_var_query.invoke(test.clj:234)
    at cider.nrepl.middleware.test$handle_test_var_query_op$fn__9617$fn__9618.invoke(test.clj:279)
    at clojure.lang.AFn.applyToHelper(AFn.java:152)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at clojure.core$apply.invokeStatic(core.clj:657)
    at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965)
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965)
    at clojure.lang.RestFn.invoke(RestFn.java:425)
    at cider.nrepl.middleware.test$handle_test_var_query_op$fn__9617.invoke(test.clj:271)
    at clojure.lang.AFn.run(AFn.java:22)
    at nrepl.middleware.session$session_exec$main_loop__1031$fn__1035.invoke(session.clj:171)
    at nrepl.middleware.session$session_exec$main_loop__1031.invoke(session.clj:170)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: clojure.lang.Symbol cannot be cast to java.lang.Long
    at org.apache.kafka.common.serialization.LongSerializer.serialize(LongSerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 45 more
```

Signed-off-by: Sam Brauer <sam.brauer@gmail.com>
@sbrauer sbrauer requested a review from a team as a code owner June 12, 2019 20:01
@sbrauer
Copy link
Contributor Author

sbrauer commented Jun 12, 2019

I see ci/circlci: test failed with one TimeoutException. My hunch is that's an unrelated nondeterministic error since I don't get it running lein test locally. I'd re-run the test if I had permission.

@cddr
Copy link
Contributor

cddr commented Jun 12, 2019

You're correct about the unrelated failure. Just triggered a rebuild and it passes fine. There are a few tests that remain stubbornly flakey so sorry about that.

The deploy_snapshots failure is because it tries to publish a snapshot build to Clojars but because you're not in the funding circle org, the credentials are not made available to your build. If you really wanted the snapshot, you might be able to make a tweak to the project.clj and publish it to your own group in clojars.

Anyway, this looks good to me. I think it must have already been assuming all the topics had the same serde and this was a regression introduced when we upgraded kafka-streams. Thanks for adding the test.

@sbrauer
Copy link
Contributor Author

sbrauer commented Jun 13, 2019

Thanks for the feedback @cddr.
Please let me know if there's anything else I can do to get an official approval (so that github will enable the merge button).

@cddr cddr requested review from creese and 99-not-out June 14, 2019 08:58
@cddr cddr added the bug Something isn't working label Jun 14, 2019
@cddr cddr merged commit d9d0cbc into FundingCircle:master Jun 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants