Skip to content

evolution-gaming/kafka-journal

Repository files navigation

Kafka Journal

Build Status Coverage Status Codacy Badge Version Chat

Stream data from two sources where one is eventually consistent and the other one loses its tail

This library provides ability to use kafka as storage for events. Kafka is a perfect fit in case you want to have streaming capabilities for your events However it also uses cassandra to keep data access performance on acceptable level and overcome kafka retention policy Cassandra is a default choice, but you may use any other storage which satisfies following interfaces:

High level idea

Writing events flow:

  1. Journal client publishes events to kafka
  2. Replicator app stores events to cassandra

Reading events flow:

  1. Client publishes special marker to kafka so we can make sure there are no more events to expect
  2. Client reads events from cassandra, however at this point we are not yet sure that all events are replicated from kafka to cassandra
  3. Client read events from kafka using offset of last event found in cassandra
  4. We consider recovery finished when marker found in kafka

Notes

  • Kafka topic may be used for many different entities
  • We don't need to store all events in kafka as long as they are in cassandra
  • We do not cover snapshots yet
  • Replicator is a separate application
  • It is easy to replace cassandra here with some relational database

State recovery performance

Reading events performance depends on finding the closest offset to the marker as well on replication latency (time difference between the moment event has been written to kafka and the moment when event gets into cassandra)

We may share same kafka consumer for many simultaneous recoveries

Read & write capabilities:

  • Client allowed to read + write kafka and read cassandra
  • Replicator allowed to read kafka and read + write cassandra

Hence, we recommend configuring access rights accordingly.

Api

trait Journals[F[_]] {

  def apply(key: Key): Journal[F]
}

trait Journal[F[_]] {

  /**
   * @param expireAfter Define expireAfter in order to expire whole journal for given entity
   */
  def append(
    events: Nel[Event],
    expireAfter: Option[ExpireAfter],
    metadata: Option[JsValue],
    headers: Headers
  ): F[PartitionOffset]

  def read(from: SeqNr): Stream[F, EventRecord]

  def pointer: F[Option[SeqNr]]

  /**
   * Deletes events up to provided SeqNr, consecutive pointer call will return last seen value  
   */
  def delete(to: DeleteTo): F[Option[PartitionOffset]]

  /**
   * Deletes all data with regards to journal, consecutive pointer call will return none
   */
  def purge: F[Option[PartitionOffset]]
}

Troubleshooting

Kafka exceptions in logs

Kafka client tends to log some exceptions at error level, however in reality those are harmless in case of operation retried successfully. Retriable exceptions usually extend RetriableException

Here is the list of known error logs you may ignore:

  • Offset commit failed on partition .. at offset ..: The request timed out.
  • Offset commit failed on partition .. at offset ..: The coordinator is loading and hence can't process requests.
  • Offset commit failed on partition .. at offset ..: This is not the correct coordinator.
  • Offset commit failed on partition .. at offset ..: This server does not host this topic-partition.

Akka persistence plugin

In order to use kafka-journal as akka persistence plugin you would need to add following to your *.conf file:

akka.persistence.journal.plugin = "evolutiongaming.kafka-journal.persistence.journal"

Unfortunately akka persistence snapshot plugin is not implemented yet.

Setup

addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

libraryDependencies += "com.evolutiongaming" %% "kafka-journal" % "0.0.153"

libraryDependencies += "com.evolutiongaming" %% "kafka-journal-persistence" % "0.0.153"

libraryDependencies += "com.evolutiongaming" %% "kafka-journal-replicator" % "0.0.153"

libraryDependencies += "com.evolutiongaming" %% "kafka-journal-eventual-cassandra" % "0.0.153"

Presentations

Development

To run unit-test, have to have Docker environment running (Docker Desktop, Rancher Desktop etc). Some tests expect to have /var/run/docker.sock available. In case of Rancher Desktop, one has to amend local setup with:

sudo ln -s $HOME/.rd/docker.sock /var/run/docker.sock