Skip to content

Commit

Permalink
docs
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronp committed May 23, 2020
1 parent dd2c019 commit b32340a
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions readme.md
Expand Up @@ -35,6 +35,8 @@ trait AsProducerRecord[-A] {
type K
type V
def asRecord(value: A): ProducerRecord[K, V]
...
}
```

Then we can expose a [monix consumer](https://monix.io/docs/2x/reactive/consumer.html) which will be able to write an [Observable](https://monix.io/api/3.0/monix/reactive/Observable.html) to Kafka:
Expand Down Expand Up @@ -66,19 +68,18 @@ Can you commit the offset/partition for B? Because if you do and then discover t

Having to perform some blocking IO on the single Kafka consumer thread, however, would be a performance killer.

For this reason, kafka4m doesn't expose what you would expect, which would be:
For this reason, in additional to exposing what you would expect:

```scala
val kafkaData : Observable[ConsumerRecord[K, V]] = ???
val fromKafka: Observable[SomeData] = kafka4m.readRecords[SomeData]()
```

Instead, it wraps the Kafka ConsumerRecords in an 'AckableRecord', which is a data-structure which exposes the underlying ConsumerRecord, but also has a way to commit those messages back
to Kafka in a thread-safe way.

There are additional read* variants which wrap the Kafka ConsumerRecords in an 'AckableRecord', which is a data-structure which exposes the underlying ConsumerRecord, but also has a way to commit those messages back
to Kafka in a thread-safe way:

### Reading basic byte array values
```scala
val fromKafka: Observable[AckBytes] = kafka4m.readByteArray()
val fromKafka: Observable[AckableRecord[ConsumerRecord[String, Array[Byte]]]] = kafka4m.readByteArray()

val keys = fromKafka.zipWithIndex.map {
case (ackable, i) =>
Expand Down Expand Up @@ -134,10 +135,11 @@ Or just the simpler 'Observable[SomeData] using 'kafka4m.readRecords':
val fromKafka: Observable[SomeData] = kafka4m.readRecords[SomeData]()
```


### Performant multi-threaded use

As a software engineer, features like "auto commit" make me nervous. Simply because a process is consuming records doesn't necessarily mean they're safely persisted.
Reading like that is all well and good, but we want to be able to utilize concurrent writes for the data coming from Kafka.

Also, as a software engineer, features like "auto commit" make me nervous. Simply because a process is consuming records doesn't necessarily mean they're safely persisted.

Consider the case when a process consumes ten messages. It should be able to reliably persist those ten messages concurrently, only committing the Kafka offset/partitions when it's safe to do so.

Expand Down

0 comments on commit b32340a

Please sign in to comment.