Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AGPUSH-2163] Use Kafka streams for push message processing #900

Merged
merged 10 commits into from Aug 23, 2017

Conversation

Projects
None yet
3 participants
@dimitraz
Copy link
Member

commented Aug 18, 2017

Description:
Refactor notification routing to use a Kafka Streams based approach

Implementation:

  • Use streams to process push messages streamed from the PushNotificationSenderEndpoint input topic (see AGPUSH-2159) and branch out to various output topics, based on variant type.
  • Add custom serdes for input topic objects (PushApplication and InternalUnifiedPushMessage)
  • Delete NotificationRouter class

Ticket: AGPUSH-2163

@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 18, 2017

@@ -0,0 +1,37 @@
package org.jboss.aerogear.unifiedpush.kafka.serdes;

This comment has been minimized.

Copy link
@polinankoleva

polinankoleva Aug 18, 2017

Member

Add license - valid for all new classes.

This comment has been minimized.

Copy link
@dimitraz

dimitraz Aug 18, 2017

Author Member

done 👍

import org.jboss.aerogear.unifiedpush.message.InternalUnifiedPushMessage;

import java.util.Map;

This comment has been minimized.

Copy link
@polinankoleva

polinankoleva Aug 18, 2017

Member

Add comments - valid for all committed classes.

This comment has been minimized.

Copy link
@dimitraz

dimitraz Aug 18, 2017

Author Member

done 👍

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 19, 2017

@dimitraz I think you need to rebase it, afgter I landed the JMS removal

@dimitraz dimitraz force-pushed the dimitraz:router-streams branch from 4cf1da5 to 8902bde Aug 19, 2017

@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 19, 2017

@matzew thanks Matthias! rebased, but still need to sort out conflicts with the JMS removal PR

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 21, 2017

I get this, on deployment:

11:35:21,140 ERROR [stderr] (StreamThread-1) Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=agpush_pushMessageProcessing, partition=0, offset=0
11:35:21,140 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
11:35:21,140 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
11:35:21,140 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
11:35:21,140 ERROR [stderr] (StreamThread-1) Caused by: org.apache.kafka.common.errors.SerializationException: Unable to serialize object
11:35:21,141 ERROR [stderr] (StreamThread-1) Caused by: com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: org.jboss.aerogear.unifiedpush.api.iOSVariant["certificate"])
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:378)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:338)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:569)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3022)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at net.wessendorf.kafka.serialization.GenericSerializer.serialize(GenericSerializer.java:48)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
11:35:21,141 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:97)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:46)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
11:35:21,142 ERROR [stderr] (StreamThread-1) 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
11:35:21,143 ERROR [stderr] (StreamThread-1) Caused by: java.lang.NullPointerException
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at java.util.Base64$Decoder.decode(Base64.java:549)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at org.jboss.aerogear.unifiedpush.api.iOSVariant.getCertificate(iOSVariant.java:80)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at java.lang.reflect.Method.invoke(Method.java:498)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:633)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
11:35:21,143 ERROR [stderr] (StreamThread-1) 	... 28 more
@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 21, 2017

@matzew oh no, jackson again

@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 21, 2017

I'm going to do some tests with the mock data loader and get serialisation working for each variant

@dimitraz dimitraz force-pushed the dimitraz:router-streams branch from 16aaa46 to 2501dcc Aug 21, 2017

@@ -86,6 +86,12 @@
</dependency>

<dependency>
<groupId>org.jboss.aerogear.unifiedpush</groupId>
<artifactId>unifiedpush-push-sender</artifactId>
<version>${project.version}</version>

This comment has been minimized.

Copy link
@matzew

matzew Aug 21, 2017

Member

we do need the sender here ?

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 21, 2017

@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 21, 2017

@matzew do you mind testing with a different variant type for now (preferably android)? I'm still working on serialisation issues w/ iOS (and possibly others)

* and streamed to an output topic based on its variant type.
*/
@PostConstruct
private void startup() {

This comment has been minimized.

Copy link
@matzew

matzew Aug 21, 2017

Member

Can we use something like: public void init(@Observes @Initialized(ApplicationScoped.class) Object init) instead of PostConstruct ?

*/
@Singleton
@Startup
public class NotificationRouterStreamsHook {

This comment has been minimized.

Copy link
@matzew

matzew Aug 21, 2017

Member

and here, let's use CDI

@javax.enterprise.context.ApplicationScoped annotation

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 21, 2017

I got it working, by using a different topic (which is OK), I guess I had some other stuff (due to bug) in that topic.

One thing I am now wondering... is it really smart to use the streaming APIs for the push processing? versus normal producer/consumer ? Is the routing the reason ?

Or should streaming be used for the metrics stuff that's coming allong ?

@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 21, 2017

@matzew, good news! Hopefully tomorrow I'll get testing with the mock loader and we can test all variant types. I guess now there isn't much of an advantage, but probably because we haven't completely leveraged all the features of streams.. We can try and improve on this, and if it doesn't work out just use normal producers and consumers. In that case I think it was still good to play around with the concept, getting us used to the idea of streams (it was just a poc anyway)

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 22, 2017

I think w/ the streaming API, eventually, this gives us a bit more flexiblity - also I like the compactness of the routing... I guess

@matzew matzew force-pushed the aerogear:GSOC_2017_kafka branch from c3a9cf5 to 5b268fc Aug 22, 2017

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 22, 2017

@dimitraz please rebase this :-)

@dimitraz dimitraz force-pushed the dimitraz:router-streams branch from 3789144 to bac18d1 Aug 22, 2017

@dimitraz

This comment has been minimized.

Copy link
Member Author

commented Aug 22, 2017

@matzew done

matzew and others added some commits Aug 22, 2017

Merge pull request #2 from matzew/router-streams_tweaks
Using CDI bean and some clean ups
@matzew

This comment has been minimized.

Copy link
Member

commented Aug 22, 2017

I really like to have CDI events for a "domain level" eventing, where than the actual IMPL takes care of dealing w/ the technology, e.g. HTTP or JMS or Kafka or Amazon-kinesis etc :)

@matzew matzew merged commit ff3f368 into aerogear:GSOC_2017_kafka Aug 23, 2017

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@matzew

This comment has been minimized.

Copy link
Member

commented Aug 24, 2017

@dimitraz @polinankoleva So, for me the push message are no longer submitted to the 3rd parties.

What mobile service did you use for testing ? APNS? or FCM? or none?

@matzew

This comment has been minimized.

Copy link
Member

commented Aug 24, 2017

restarting the WF server fixed this... something wrong w/ the config - will test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.