Skip to content

Commit

Permalink
usage readme
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronp committed Sep 16, 2019
1 parent f473a92 commit cf2fc96
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -6,6 +6,8 @@ script:
jdk:
- oraclejdk11

services:
- docker
after_success: "sbt coverageReport coveralls"

sudo: false
Expand Down
32 changes: 25 additions & 7 deletions src/main/paradox/usage.md
Expand Up @@ -5,14 +5,32 @@ to what Kafka Streams gives you (but w/o relying on the kafka streams API)

## Copy A Kafka Topic:
```$xslt
// write data to kafka (assumes a configuration akin to kafka4m.producer.topic = someNewTopic)
val kafkaWriter: Consumer[Array[Byte], Long] = kafka4m.writeBytes(config)
val config = ConfigFactory.load()
// read data from kafka (assumes a configuration akin to kafka4m.consumer.topic = originalTopic)
val kafkaData: Observable[ConsumerRecord[String, Array[Byte]]] = kafka4m.read(config)
// write data to kafka (assumes a configuration akin to kafka4m.producer.topic = someNewTopic)
val kafkaWriter: Consumer[(String, Array[Byte]), Long] = kafka4m.writeKeyAndBytes(config)
// then we'd write it back into kafka like this.
val task: Task[Long] = kafkaData.map(_.value).consumeWith(kafkaWriter)
// read data from kafka (assumes a configuration akin to kafka4m.consumer.topic = originalTopic)
val kafkaData: Observable[ConsumerRecord[String, Array[Byte]]] = kafka4m.read(config)
// then we'd write it back into kafka like this.
val task: Task[Long] = kafkaData.map(r => (r.key, r.value)).consumeWith(kafkaWriter)
```

That provides the base primitives -- getting data into and out of Kafka.

## ETL
On top of that, kafka4m provides some basic conveniences for getting data into Kafka from some files in a directory and writing
data from kafka to the local filesystem.

Writing data into can be useful for some performance testing, getting test data in, or just a normal loading of application data.

Reading data from Kafka can be a nice convenience for viewing data locally, or as an interim step to sftp-ing or otherwise uploading the data somewhere.

The Observables provided can just as easily provided data to a multi-part request, a websocket, etc.

### Kafka4mApp
The 'Kafka4mApp' serves as the entry-point for the ETL jobs and uses the [args4c](https://porpoiseltd.co.uk/args4c/index.html) library.
That simply means that the first argument should be either 'read' or 'write' (as in read data from kafka or write data to kafka), and the
subsequent args are either key=value pairs or the location of a configuration file.

39 changes: 24 additions & 15 deletions src/main/scala/kafka4m/package.scala
@@ -1,4 +1,4 @@
import com.typesafe.config.Config
import com.typesafe.config.{Config, ConfigFactory}
import kafka4m.admin.RichKafkaAdmin
import kafka4m.consumer.RichKafkaConsumer
import kafka4m.producer.AsProducerRecord._
Expand All @@ -19,6 +19,29 @@ package object kafka4m {
type Bytes = Array[Byte]
type KeyValue = (Key, Bytes)

/** @param config the kafka4m configuration
* @return a consumer which will consume raw text data and write it with null keys
*/
def writeText(config: Config = ConfigFactory.load()): Consumer[String, Long] = write[String](config)(FromString(Props.topic(config, "producer")))

/** @param config the kafka4m configuration
* @return a consumer which will consume a stream of key/byte-array values into kafka and return the number written
*/
def writeKeyAndBytes(config: Config = ConfigFactory.load()): Consumer[(String, Array[Byte]), Long] = {
write[(String, Array[Byte])](config)(FromKeyAndBytes(Props.topic(config, "producer")))
}

/** @param config the kafka4m configuration
* @tparam A any type A which can be converted into a kafka ProducerRecord
* @return a consumer of the 'A' values and produce the number written
*/
def write[A: AsProducerRecord](config: Config): Consumer[A, Long] = {
val apr = AsProducerRecord[A]
val rkp: RichKafkaProducer[apr.K, apr.V] = RichKafkaProducer[apr.K, apr.V](config, null, null)
val fireAndForget = config.getBoolean("kafka4m.producer.fireAndForget")
rkp.asConsumer(fireAndForget)(apr)
}

/**
* @param config the kafka4m configuration which contains the 'kafka4m.consumer' values
* @return an Observable of data coming from kafka. The offsets, etc will be controlled by the kafka4m.consumer configuration, which includes default offset strategy, etc.
Expand Down Expand Up @@ -49,18 +72,4 @@ package object kafka4m {
def ensureTopicBlocking(config: Config)(implicit ec: ExecutionContext): Option[String] = {
RichKafkaAdmin.ensureTopicBlocking(config)
}

def writeText(config: Config): Consumer[String, Long] = write[String](config)(FromString(Props.topic(config, "producer")))

def writeKeyAndBytes(config: Config): Consumer[(String, Array[Byte]), Long] = {
write[(String, Array[Byte])](config)(FromKeyAndBytes(Props.topic(config, "producer")))
}

def write[A: AsProducerRecord](config: Config): Consumer[A, Long] = {
val apr = AsProducerRecord[A]
val rkp: RichKafkaProducer[apr.K, apr.V] = RichKafkaProducer[apr.K, apr.V](config, null, null)
val fireAndForget = config.getBoolean("kafka4m.producer.fireAndForget")
rkp.asConsumer(fireAndForget)(apr)
}

}
2 changes: 1 addition & 1 deletion src/main/scala/kafka4m/producer/AsProducerRecord.scala
Expand Up @@ -35,7 +35,7 @@ object AsProducerRecord {
override type V = Bytes

override def asRecord(value: String) = {
new ProducerRecord[K, V](topic, value, value.getBytes("UTF-8"))
new ProducerRecord[K, V](topic, value.getBytes("UTF-8"))
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/test/resources/example/docker-compose.yml
@@ -0,0 +1,21 @@
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181"
hostname: zookeeper
kafka:
image: wurstmeister/kafka
command: [start-kafka.sh]
ports:
- "9092"
hostname: kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka # docker-machine ip
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- "zookeeper"
19 changes: 19 additions & 0 deletions src/test/scala/Example.scala
@@ -0,0 +1,19 @@
import com.typesafe.config.ConfigFactory
import monix.eval.Task
import monix.reactive.{Consumer, Observable}
import org.apache.kafka.clients.consumer.ConsumerRecord

class Example {

val config = ConfigFactory.load()

// write data to kafka (assumes a configuration akin to kafka4m.producer.topic = someNewTopic)
val kafkaWriter: Consumer[(String, Array[Byte]), Long] = kafka4m.writeKeyAndBytes(config)

// read data from kafka (assumes a configuration akin to kafka4m.consumer.topic = originalTopic)
val kafkaData: Observable[ConsumerRecord[String, Array[Byte]]] = kafka4m.read(config)

// then we'd write it back into kafka like this.
val task: Task[Long] = kafkaData.map(r => (r.key, r.value)).consumeWith(kafkaWriter)

}

0 comments on commit cf2fc96

Please sign in to comment.