Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DeduplicationTransformer example only supports 1 partition #396

Closed
jomach opened this issue Jul 29, 2021 · 2 comments
Closed

DeduplicationTransformer example only supports 1 partition #396

jomach opened this issue Jul 29, 2021 · 2 comments

Comments

@jomach
Copy link

jomach commented Jul 29, 2021

As a developer I would like to have an example of a Event Deduplication pipeline that is robust.

The current example EventDeduplicationLambdaIntegrationTest relies on a simple Transfomer which stores the state locally.
This will not work if we have multiple partitions. How should we proceed here? There are multiple persons with this issues but not proper way of solving it. Can someone change this example?

@mjsax
Copy link
Member

mjsax commented Aug 3, 2021

You can only do event-deduplication if you ensure that duplicate events are written into the same partition. In our example, we assume that the eventId is used to partition the data to meet this property. Thus, the example also works with multiple partitions.

If your original input data is not partitioned by an event-id and you cannot change the upstream application, you can repartition the data based on the event-id using Kafka Streams: stream.selectKey((k,v) -> v.eventId()).repartition().transform().

Closing this question. Feel free to follow up with more questions if necessary.

@mjsax mjsax closed this as completed Aug 3, 2021
@jomach
Copy link
Author

jomach commented Aug 4, 2021

Just for completion. This is what I did:


var compositionWithKey = compositions
.map((k,v) -> new KeyValue<>(v.getEDMATERIALID(), v))
.repartition(Repartitioned.with(new Serdes.LongSerde(), materialCompositionSerde))
.toTable();

        var materialWithKey = material
                .map((k,v) -> new KeyValue<>(v.getEDMATERIALID(), v))
                .repartition(Repartitioned.with(new Serdes.LongSerde(), materialSerde));

        return materialWithKey
                .join(compositionWithKey, valueJoiner, Joined.with(keySerde, materialSerde, materialCompositionSerde))
                .peek((k,v)-> System.out.println("After Join"+k))
                .transformValues(transformerSupplier, MATERIAL_STORE_NAME)

`

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants