You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I get this exception and I can't seem to find a solution. This is basically the same as the examples hosted here, instead the names are different.
Code:
var sb = new StreamsBuilder();
var personProfilesStream = sb.stream("trial-person-profile", Consumed.with(new DataSerde<Long>(), new DataSerde<PersonProfile>()));
var personProfilesTable = personProfilesStream
.groupBy((key, value) -> value.Id, new DataSerde<>(), new DataSerde<>())
.reduce((x, y) -> (x.Timestamp > y.Timestamp)? x : y, Materialized.as("store-person-profiles-table"));
var kStreams = new KafkaStreams(sb.build(), streamsConfiguration);
kStreams.start();
DataSerde is a custom Serde that deal with JSON.
Exception:
Exception in thread "trial-streamer-app-044e9f28-dad2-4cdf-a34e-e6f2d08af950-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.base/java.lang.Long
at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:157)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:154)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Kafka version
1.1.0 built with 2.11 Scala
Libraries versions
The text was updated successfully, but these errors were encountered:
You need to pass in the correct Serdes into groupBy() and reduce(). For the error you see, I expect it's the groupBy() that needs a LongSerde for the key.
Hi, I get this exception and I can't seem to find a solution. This is basically the same as the examples hosted here, instead the names are different.
Code:
DataSerde is a custom Serde that deal with JSON.
Exception:
Kafka version
1.1.0 built with 2.11 Scala
Libraries versions
![image](https://user-images.githubusercontent.com/11178713/39653148-262844d4-4ff0-11e8-9f9b-63d7e9894aef.png)
The text was updated successfully, but these errors were encountered: