Finatra integration with Kafka Streams to easily build Kafka Streams applications on top of a TwitterServer.
Note: Versions of finatra-kafka and finatra-kafka-streams that are published against Scala 2.12 use Kafka 2.2, versions of that are published against Scala 2.13 use Kafka 2.5. This simplified cross-version support is ephemeral until we can drop Kafka 2.2.
Finatra-Kafka was migrated out of Finatra core library as a stand-alone project in 2022. Currently, Twitter has no plans to develop, maintain or support Finatra-Kafka in any form in the future. If your organization is interested in maintaining this framework, please file an issue or open a discussion to engage the community.
The main branch in Github tracks the latest code. If you want to contribute a patch or fix, please use this branch as the basis of your Pull Request.
- Intuitive DSL for topology creation, compatible with the Kafka Streams DSL
- Full Kafka Streams metric integration, exposed as TwitterServer Metrics
- RocksDB integration
- Queryable State
- Rich testing functionality
With KafkaStreamsTwitterServer,
a fully functional service can be written by simply configuring the Kafka Streams Builder via the configureKafkaStreams()
lifecycle method. See the examples section.
Implement custom transformers using FinatraTransformer.
There are several included aggregating transformers, which may be used when configuring a StreamsBuilder
: - aggregate
- sample
- sum
In addition to using state stores, you may also use a RocksDB-backed store. This affords all of the advantages of using RocksDB, including efficient range scans.
Finatra Kafka Streams supports directly querying the state from a store. This can be useful for creating a service that serves data aggregated within a local Topology. You can use static partitioning to query an instance deterministically known to hold a key.
See how the queryable state is used in the following example.
The integration tests serve as a good collection of example Finatra Kafka Streams servers.
We can build a lightweight server that counts the unique words from an input topic, storing the results in RocksDB.
class WordCountRocksDbServer extends KafkaStreamsTwitterServer {
override val name = "wordcount"
private val countStoreName = "CountsStore"
override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = {
builder.asScala
.stream[Bytes, String]("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
.flatMapValues(_.split(' '))
.groupBy((_, word) => word)(Serialized.`with`(Serdes.String, Serdes.String))
.count()(Materialized.as(countStoreName))
.toStream
.to("WordsWithCountsTopic")(Produced.`with`(Serdes.String, ScalaSerdes.Long))
}
}
We can then expose a Thrift endpoint enabling clients to directly query the state via interactive queries.
class WordCountRocksDbServer extends KafkaStreamsTwitterServer with QueryableState {
...
final override def configureThrift(router: ThriftRouter): Unit = {
router
.add(
new WordCountQueryService(
queryableFinatraKeyValueStore[String, Long](
storeName = countStoreName,
primaryKeySerde = Serdes.String
)
)
)
}
}
In this example, WordCountQueryService
is an underlying Thrift service.
Finatra Kafka Streams includes tooling that simplifies the process of writing highly testable services. See TopologyFeatureTest, which includes a FinatraTopologyTester that integrates Kafka Streams' TopologyTestDriver with a KafkaStreamsTwitterServer.
Licensed under the Apache License, Version 2.0: https://www.apache.org/licenses/LICENSE-2.0