Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kstreams #143

Merged
merged 1 commit into from Jun 14, 2019
Merged

Fix kstreams #143

merged 1 commit into from Jun 14, 2019

Commits on Jun 12, 2019

  1. Fix kstreams

    Before this change, trying to use `kstreams` would fail because it was
    missing the Consumed argument.
    The newly added test would throw an exception like this:
    
    ```
    Error in KStream
    Uncaught exception, not in assertion
       error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=topic-a, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: clojure.lang.Symbol / value type: clojure.lang.Symbol). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
        at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:406)
        at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
        at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
        at jackdaw.streams.mock$producer$produce_BANG___8206.invoke(mock.clj:74)
        at jackdaw.streams.mock$publish.invokeStatic(mock.clj:80)
        at jackdaw.streams.mock$publish.invoke(mock.clj:78)
        at clojure.core$partial$fn__5561.invoke(core.clj:2618)
        at jackdaw.streams_test$fn__11189.invokeStatic(streams_test.clj:482)
        at jackdaw.streams_test$fn__11189.invoke(streams_test.clj:60)
        at cider.nrepl.middleware.test$test_var$fn__9569.invoke(test.clj:202)
        at cider.nrepl.middleware.test$test_var.invokeStatic(test.clj:202)
        at cider.nrepl.middleware.test$test_var.invoke(test.clj:194)
        at cider.nrepl.middleware.test$test_vars$fn__9573$fn__9578.invoke(test.clj:217)
        at clojure.test$default_fixture.invokeStatic(test.clj:686)
        at clojure.test$default_fixture.invoke(test.clj:682)
        at cider.nrepl.middleware.test$test_vars$fn__9573.invoke(test.clj:217)
        at clojure.test$default_fixture.invokeStatic(test.clj:686)
        at clojure.test$default_fixture.invoke(test.clj:682)
        at cider.nrepl.middleware.test$test_vars.invokeStatic(test.clj:214)
        at cider.nrepl.middleware.test$test_vars.invoke(test.clj:208)
        at cider.nrepl.middleware.test$test_ns.invokeStatic(test.clj:230)
        at cider.nrepl.middleware.test$test_ns.invoke(test.clj:221)
        at cider.nrepl.middleware.test$test_var_query.invokeStatic(test.clj:241)
        at cider.nrepl.middleware.test$test_var_query.invoke(test.clj:234)
        at cider.nrepl.middleware.test$handle_test_var_query_op$fn__9617$fn__9618.invoke(test.clj:279)
        at clojure.lang.AFn.applyToHelper(AFn.java:152)
        at clojure.lang.AFn.applyTo(AFn.java:144)
        at clojure.core$apply.invokeStatic(core.clj:657)
        at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965)
        at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965)
        at clojure.lang.RestFn.invoke(RestFn.java:425)
        at cider.nrepl.middleware.test$handle_test_var_query_op$fn__9617.invoke(test.clj:271)
        at clojure.lang.AFn.run(AFn.java:22)
        at nrepl.middleware.session$session_exec$main_loop__1031$fn__1035.invoke(session.clj:171)
        at nrepl.middleware.session$session_exec$main_loop__1031.invoke(session.clj:170)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ClassCastException: clojure.lang.Symbol cannot be cast to java.lang.Long
        at org.apache.kafka.common.serialization.LongSerializer.serialize(LongSerializer.java:21)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
        ... 45 more
    ```
    
    Signed-off-by: Sam Brauer <sam.brauer@gmail.com>
    sbrauer committed Jun 12, 2019
    Configuration menu
    Copy the full SHA
    f36e401 View commit details
    Browse the repository at this point in the history