Skip to content

Commit

Permalink
Merge pull request #401 from eo-cqrs/331
Browse files Browse the repository at this point in the history
flexible message producing
  • Loading branch information
Aliaksei Bialiauski committed Jun 29, 2023
2 parents 3480a0a + a964e93 commit 42859f7
Show file tree
Hide file tree
Showing 26 changed files with 713 additions and 103 deletions.
91 changes: 72 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,45 @@ dependencies {
```

## Messages API
To create Kafka Message:
To create Kafka Message with **Topic**, **Key** and **Value**:
```java
Data<String> string =
new KfData<>(
"string-data", //data
"strings", //topic
1 //partition
final Message<String, String> msg = new Tkv<>("test.topic", "test-k", "test-v");
```

Creation Kafka Message with **Partition**:
```java
final Message<String, String> msg =
new WithPartition<>(
0,
new Tkv<>(
"test.topic",
"test-k",
"test-v"
)
);
```

Creation Kafka Message with **Timestamp**:
```java
final Message<String, String> msg =
new Timestamped<>(
tmstmp,
new WithPartition<>(
partition,
new Tkv<>(
topic,
key,
value
)
)
);
```

## Producer API
To create Kafka Producer you can wrap original [KafkaProducer](https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html):
```java
KafkaProducer origin = ...;
Producer<String, String> producer = new KfProducer<>(origin);
final KafkaProducer origin = ...;
final Producer<String, String> producer = new KfProducer<>(origin);
```
Or construct it with [KfFlexible](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/KfFlexible.java):
```java
Expand Down Expand Up @@ -115,13 +139,15 @@ To send a [message](#messages-api):
```java
try (final Producer<String, String> producer = ...) {
producer.send(
"key2012",
new KfData<>(
"newRest28",
"orders",
1
new WithPartition<>(
0,
new Tkv<>(
"xyz.topic",
"key",
"message"
)
);
)
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand Down Expand Up @@ -151,8 +177,8 @@ final Producer<String, String> producer =
## Consumer API
To create Kafka Consumer you can wrap original [KafkaConsumer](https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html):
```java
KafkaConsumer origin = ...;
Consumer<String, String> producer = new KfConsumer<>(origin);
final KafkaConsumer origin = ...;
final Consumer<String, String> producer = new KfConsumer<>(origin);
```
Using [KfFlexible](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/KfFlexible.java):
```java
Expand Down Expand Up @@ -311,9 +337,36 @@ final Consumer<Object, String> consumer =
);
final Producer<String, String> producer =
new FkProducer<>(UUID.randomUUID(), this.broker);
producer.send("test1", new KfData<>("test-data-1", topic, 0));
producer.send("test2", new KfData<>("test-data-2", topic, 0));
producer.send("test3", new KfData<>("test-data-3", topic, 0));
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test1",
"test-data-1"
)
)
);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test2",
"test-data-2"
)
)
);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test-data-3",
"test3"
)
)
);
final ConsumerRecords<Object, String> records =
consumer.records(topic, Duration.ofSeconds(1L));
final List<String> datasets = new ListOf<>();
Expand Down
6 changes: 6 additions & 0 deletions src/it/producer-consumer-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ SOFTWARE.
<version>${mockito-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.eo-cqrs</groupId>
<artifactId>eo-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
33 changes: 28 additions & 5 deletions src/it/producer-consumer-api/src/test/java/EntryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.github.eocqrs.kafka.consumer.KfConsumer;
import io.github.eocqrs.kafka.consumer.settings.KfConsumerParams;
import io.github.eocqrs.kafka.data.KfData;
import io.github.eocqrs.kafka.data.Tkv;
import io.github.eocqrs.kafka.data.WithPartition;
import io.github.eocqrs.kafka.parameters.AutoOffsetReset;
import io.github.eocqrs.kafka.parameters.BootstrapServers;
import io.github.eocqrs.kafka.parameters.ClientId;
Expand Down Expand Up @@ -175,8 +177,14 @@ void createsProducerAndSendsData() throws IOException {
) {
Assertions.assertDoesNotThrow(
() -> producer.send(
"fake-key",
new KfData<>("fake-data", "FAKE-TOPIC", 1)
new WithPartition<>(
1,
new Tkv<>(
"FAKE-TOPIC",
"fake-key",
"fake-data"
)
)
)
);
}
Expand Down Expand Up @@ -212,7 +220,16 @@ void createsProducerAndSendsMessage() throws Exception {
)
)
);
producer.send("testcontainers", new KfData<>("rulezzz", "TEST-TOPIC", 0));
producer.send(
new WithPartition<>(
0,
new Tkv<>(
"TEST-TOPIC",
"testcontainers",
"rulezzz"
)
)
);
Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
Expand Down Expand Up @@ -256,8 +273,14 @@ void createsProducerWithCallback() throws Exception {
)
) {
producer.send(
"test-key",
new KfData<>("test-data", "TEST-CALLBACK", 1)
new WithPartition<>(
1,
new Tkv<>(
"TEST-CALLBACK",
"test-key",
"test-data"
)
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.github.eocqrs.kafka.Producer;
import io.github.eocqrs.kafka.consumer.KfConsumer;
import io.github.eocqrs.kafka.consumer.settings.KfConsumerParams;
import io.github.eocqrs.kafka.data.KfData;
import io.github.eocqrs.kafka.parameters.AutoOffsetReset;
import io.github.eocqrs.kafka.parameters.BootstrapServers;
import io.github.eocqrs.kafka.parameters.ClientId;
Expand All @@ -38,6 +37,8 @@
import io.github.eocqrs.kafka.parameters.ValueDeserializer;
import io.github.eocqrs.kafka.parameters.ValueSerializer;
import io.github.eocqrs.kafka.producer.KfProducer;
import io.github.eocqrs.kafka.data.Tkv;
import io.github.eocqrs.kafka.data.WithPartition;
import io.github.eocqrs.kafka.producer.settings.KfProducerParams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -93,7 +94,16 @@ void createsProducerAndSendsMessage() throws Exception {
)
)
);
producer.send(key, new KfData<>(value, topic, 0));
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
key,
value
)
)
);
Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
Expand Down
29 changes: 23 additions & 6 deletions src/it/producer-consumer-api/src/test/java/ProducerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
*/

import io.github.eocqrs.kafka.Producer;
import io.github.eocqrs.kafka.data.KfData;
import io.github.eocqrs.kafka.parameters.BootstrapServers;
import io.github.eocqrs.kafka.parameters.KeySerializer;
import io.github.eocqrs.kafka.parameters.KfFlexible;
import io.github.eocqrs.kafka.parameters.KfParams;
import io.github.eocqrs.kafka.parameters.ValueSerializer;
import io.github.eocqrs.kafka.producer.KfCallback;
import io.github.eocqrs.kafka.producer.KfProducer;
import io.github.eocqrs.kafka.data.Tkv;
import io.github.eocqrs.kafka.data.WithPartition;
import io.github.eocqrs.kafka.producer.settings.KfProducerParams;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
Expand All @@ -41,6 +42,7 @@

/**
* Kafka Producer IT Case
*
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.2.3
*/
Expand All @@ -64,15 +66,24 @@ void createsProducerAndSendsData() throws IOException {
) {
Assertions.assertDoesNotThrow(
() -> producer.send(
"fake-key",
new KfData<>("fake-data", "FAKE-TOPIC", 1)
new WithPartition<>(
1,
new Tkv<>(
"FAKE-TOPIC",
"fake-key",
"fake-data"
)
)
)
);
}
}

@Test
void createsProducerAndSendsDataWithCallback() throws Exception {
final String topic = "TEST-CALLBACK";
final String key = "test-key";
final String value = "test-data";
try (
final Producer<String, String> producer =
new KfCallback<>(
Expand All @@ -88,13 +99,19 @@ void createsProducerAndSendsDataWithCallback() throws Exception {
(recordMetadata, e) ->
MatcherAssert.assertThat(
recordMetadata.topic(),
Matchers.equalTo("TEST-CALLBACK")
Matchers.equalTo(topic)
)
)
) {
producer.send(
"test-key",
new KfData<>("test-data", "TEST-CALLBACK", 1)
new WithPartition<>(
1,
new Tkv<>(
topic,
key,
value
)
)
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/Data.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
* @param <X> The value
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.0.0
* @deprecated since 0.3.6, use {@link Message} instead.
*/
@Deprecated(since = "0.3.6")
public interface Data<X> {

/**
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/io/github/eocqrs/kafka/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.cactoos.Scalar;

/**
* Kafka Message.
*
* @param <K> The key
* @param <X> The value
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.3.6
*/
public interface Message<K, X> extends Scalar<ProducerRecord<K, X>> {

@Override
ProducerRecord<K, X> value() throws Exception;
}
10 changes: 3 additions & 7 deletions src/main/java/io/github/eocqrs/kafka/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @todo #287:30m/DEV Producer send is not flexible enough
*/

/**
* Producer.
Expand All @@ -41,12 +38,11 @@
public interface Producer<K, X> extends Closeable {

/**
* Send data.
* Send message.
*
* @param key message key
* @param data data wrapper to process
* @param message Message
* @return Future with RecordMetadata.
* @throws Exception When something went wrong.
*/
Future<RecordMetadata> send(K key, Data<X> data) throws Exception;
Future<RecordMetadata> send(Message<K, X> message) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/github/eocqrs/kafka/data/KfData.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* @since 0.0.0
*/
@RequiredArgsConstructor
@Deprecated(since = "0.3.6")
public final class KfData<X> implements Data<X> {

/**
Expand Down
Loading

1 comment on commit 42859f7

@0pdd
Copy link
Collaborator

@0pdd 0pdd commented on 42859f7 Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Puzzle 287-f527de84 disappeared from src/main/java/io/github/eocqrs/kafka/Producer.java), that's why I closed #304. Please, remember that the puzzle was not necessarily removed in this particular commit. Maybe it happened earlier, but we discovered this fact only now.

Please sign in to comment.