Skip to content

Commit

Permalink
Merge branch '__rultor'
Browse files Browse the repository at this point in the history
  • Loading branch information
rultor committed Mar 28, 2023
2 parents 77f0886 + f7a5911 commit f4f7b23
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 97 deletions.
167 changes: 74 additions & 93 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,41 +58,29 @@ To create Kafka Producer you can wrap original [KafkaProducer](https://kafka.apa
KafkaProducer origin = ...;
Producer<String, String> producer = new KfProducer<>(origin);
```
Or construct it with [ProducerSettings](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/ProducerSettings.java) or even with XML file:
Or construct it with [KfFlexible](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/KfFlexible.java):
```java
ProducerSettings<String, User> settings = ...;
Producer<String, User> producer = new KfProducer<>(settings);
Producer<String, User> xml =
new KfProducer(
new XMLDocument(
new File(
"producer.xml"
)
)
);
```
To create Kafka Producer Settings (Config):
Using objects:
```java
ProducerSettings<String, String> settings =
new KfProducerSettings<>(
new KfProducerParams(
new KfParams(
new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer"),
new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
new BootstrapServers("localhost:9092")
final Producer<String, String> producer =
new KfProducer<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
)
)
)
);
);
```
Or using XML file:
Or create it with XML file:
```java
ProducerSettings<String, String> settings =
new KfProducerSettings<>(
"producer.xml"
final Producer<String, String> producer =
new KfProducer<>(
new KfXmlFlexible<String, String>("producer.xml") // file with producer config
.producer()
);
```

btw, your [XML](https://en.wikipedia.org/wiki/XML#:~:text=Extensible%20Markup%20Language%20(XML)%20is,%2Dreadable%20and%20machine%2Dreadable.) file should be in the ```resources``` look like:
```xml
<producer>
Expand All @@ -104,29 +92,18 @@ btw, your [XML](https://en.wikipedia.org/wiki/XML#:~:text=Extensible%20Markup%20

To send a [message](#messages-api):
```java
try (
final Producer<String, String> producer =
new KfProducer<>(
new KfProducerSettings<String, String>(
new XMLDocument(
new File(
"params.xml"
)
)
).producer()
)
) {
producer.send(
"key2012",
new KfData<>(
"newRest28",
"orders",
1
)
);
} catch (Exception e) {
try (final Producer<String, String> producer = ...) {
producer.send(
"key2012",
new KfData<>(
"newRest28",
"orders",
1
)
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
```

Expand All @@ -136,47 +113,29 @@ To create Kafka Consumer you can wrap original [KafkaConsumer](https://kafka.apa
KafkaConsumer origin = ...;
Consumer<String, String> producer = new KfConsumer<>(origin);
```

Using XML:
Using [KfFlexible](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/KfFlexible.java):
```java
Consumer<String, String> consumer =
new KfConsumer<>(
new XMLDocument(
new File("consumer.xml")
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
);

```
Also, can be created with [ConsumerSettings](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/ConsumerSettings.java):
```java
ConsumerSettings<String, User> settings = ...;
Consumer<String, User> consumer = new KfConsumer<>(settings);
```

To create ConsumerSettings:
```java
ConsumerSettings<String, String> settings =
new KfConsumerSettings<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
);
);
```

XML File approach:
And XML File approach:
```java
final ConsumerSettings<String, String> settings =
new KfConsumerSettings<>(
new XMLDocument(
new File(
"consumer.xml"
)
)
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfXmlFlexible<String, String>("consumer.xml")
.consumer()
);
```

Expand All @@ -191,17 +150,39 @@ Again, [XML](https://en.wikipedia.org/wiki/XML#:~:text=Extensible%20Markup%20Lan
```

Consuming messages:
TBD
Firstly, you need to be subscribed on a particular topic and only then iterate over data in the topic:
```java
try (
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers(this.severs),
new GroupId("1"),
new AutoOffsetResetConfig("earliest"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
)
) {
consumer.subscribe(new ListOf<>("orders-saga-init")));
List<Dataized<String>> result = consumer.iterate("orders-saga-init", Duration.ofSeconds(5L));
}
}
```

## Config API
| Kafka Property | eo-kafka API |
|----------------------|-------------------------------------------------------------------------------------------------------------------------------------------|
| `bootstrap.servers` | [BootstrapServers](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/settings/BootstrapServers.java) |
| `key.serializer` | [KeySerializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/settings/KeySerializer.java) |
| `value.serializer` | [ValueSerializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/settings/ValueSerializer.java) |
| `key.deserializer` | [KeyDeserializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/settings/KeyDeserializer.java) |
| `value.deserializer` | [ValueDeserializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/settings/ValueDeserializer.java) |
| `group.id` | [GroupId](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/settings/GroupId.java) |
| `bootstrap.servers` | [BootstrapServers](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/BootstrapServers.java) |
| `key.serializer` | [KeySerializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/KeySerializer.java) |
| `value.serializer` | [ValueSerializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/ValueSerializer.java) |
| `key.deserializer` | [KeyDeserializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/KeyDeserializer.java) |
| `value.deserializer` | [ValueDeserializer](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/ValueDeserializer.java) |
| `group.id` | [GroupId](https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/parameters/GroupId.java) |

## How to Contribute

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

/**
* @todo #147:30m/DEV Update documentation
* Add use-cases to README for KfFlexible.
*/
/**
* KfFlexibleSettings allow you to add custom settings.
*
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/github/eocqrs/kafka/consumer/KfConsumerTest.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
/*
* Copyright (c) 2022 Aliaksei Bialiauski, EO-CQRS
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.github.eocqrs.kafka.consumer;

import io.github.eocqrs.kafka.Consumer;
Expand Down

1 comment on commit f4f7b23

@0pdd
Copy link
Collaborator

@0pdd 0pdd commented on f4f7b23 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Puzzle 147-c2522b36 disappeared from src/main/java/io/github/eocqrs/kafka/parameters/KfFlexible.java), that's why I closed #155. Please, remember that the puzzle was not necessarily removed in this particular commit. Maybe it happened earlier, but we discovered this fact only now.

Please sign in to comment.