Skip to content

Commit

Permalink
Introduce Scalariform and reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed Jun 30, 2015
1 parent 78b6593 commit 3ffcf77
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 72 deletions.
10 changes: 10 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import SonatypeKeys._

import scalariform.formatter.preferences.{SpacesAroundMultiImports, CompactControlReadability, PreserveSpaceBeforeArguments, DoubleIndentClassDeclaration}

sonatypeSettings

name := "reactive-kafka"
Expand Down Expand Up @@ -52,3 +54,11 @@ pomExtra := (
</developer>
</developers>
)

scalariformSettings

ScalariformKeys.preferences := ScalariformKeys.preferences.value
.setPreference(DoubleIndentClassDeclaration, true)
.setPreference(PreserveSpaceBeforeArguments, true)
.setPreference(CompactControlReadability, true)
.setPreference(SpacesAroundMultiImports, false)
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ resolvers += Classpaths.sbtPluginReleases
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-pgp" % "0.8.3")

addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.4.0")
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer, decoder: De

@tailrec
private def readDemandedItems() {
tryReadingSingleElement() match {
case Success(None) =>
if (demand_?) self ! Poll
case Success(Some(element)) =>
onNext(element)
if (demand_?) readDemandedItems()
case Failure(ex) => onError(ex)
tryReadingSingleElement() match {
case Success(None) =>
if (demand_?) self ! Poll
case Success(Some(element)) =>
onNext(element)
if (demand_?) readDemandedItems()
case Failure(ex) => onError(ex)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import kafka.producer.KafkaProducer
import kafka.serializer.Encoder

private[kafka] class KafkaActorSubscriber[T](
val producer: KafkaProducer,
val encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None)
extends ActorSubscriber with ActorLogging {
val producer: KafkaProducer,
val encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None
)
extends ActorSubscriber with ActorLogging {

protected def requestStrategy = WatermarkRequestStrategy(10)

Expand Down
14 changes: 8 additions & 6 deletions src/main/scala/com/softwaremill/react/kafka/ReactiveKafka.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package com.softwaremill.react.kafka

import akka.actor.{ ActorRef, Props, ActorSystem }
import akka.stream.actor.{ ActorSubscriber, ActorPublisher }
import akka.actor.{ActorRef, Props, ActorSystem}
import akka.stream.actor.{ActorSubscriber, ActorPublisher}
import kafka.consumer._
import kafka.producer._
import kafka.serializer.{ Encoder, Decoder }
import org.reactivestreams.{ Publisher, Subscriber }
import kafka.serializer.{Encoder, Decoder}
import org.reactivestreams.{Publisher, Subscriber}

class ReactiveKafka(val host: String, val zooKeeperHost: String) {

def publish[T](
topic: String,
groupId: String,
encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None)(implicit actorSystem: ActorSystem): Subscriber[T] = {
partitionizer: T => Option[Array[Byte]] = (_: T) => None
)(implicit actorSystem: ActorSystem): Subscriber[T] = {
val props = ProducerProps(host, topic, groupId)
ActorSubscriber[T](producerActor(props, encoder, partitionizer))
}
Expand All @@ -30,7 +31,8 @@ class ReactiveKafka(val host: String, val zooKeeperHost: String) {
def producerActor[T](
props: ProducerProps,
encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None)(implicit actorSystem: ActorSystem): ActorRef = {
partitionizer: T => Option[Array[Byte]] = (_: T) => None
)(implicit actorSystem: ActorSystem): ActorRef = {
val producer = new KafkaProducer(props)
actorSystem.actorOf(Props(new KafkaActorSubscriber(producer, encoder, partitionizer)).withDispatcher("kafka-subscriber-dispatcher"))
}
Expand Down
20 changes: 11 additions & 9 deletions src/main/scala/kafka/consumer/ConsumerProps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ object ConsumerProps {
*/
def apply(brokerList: String, zooKeeperHost: String, topic: String, groupId: String = UUID.randomUUID().toString): ConsumerProps = {
val props = Map[String, String](
("metadata.broker.list" -> brokerList),
("group.id" -> groupId),
("zookeeper.connect" -> zooKeeperHost),
"metadata.broker.list" -> brokerList,
"group.id" -> groupId,
"zookeeper.connect" -> zooKeeperHost,

// defaults
("auto.offset.reset" -> "smallest"),
("consumer.timeout.ms" -> "1500"),
("offsets.storage" -> "zookeeper"))
"auto.offset.reset" -> "smallest",
"consumer.timeout.ms" -> "1500",
"offsets.storage" -> "zookeeper"
)

new ConsumerProps(props, topic, groupId)
}
}

case class ConsumerProps(private val params: Map[String, String], val topic: String, val groupId: String) {
case class ConsumerProps(private val params: Map[String, String], topic: String, groupId: String) {

/**
* Consumer Timeout
Expand Down Expand Up @@ -90,8 +91,9 @@ case class ConsumerProps(private val params: Map[String, String], val topic: Str
*/
def kafkaOffsetsStorage(dualCommit: Boolean): ConsumerProps = {
val p = params + (
("offsets.storage" -> "kafka"),
("dual.commit.enabled" -> dualCommit.toString))
"offsets.storage" -> "kafka",
"dual.commit.enabled" -> dualCommit.toString
)
ConsumerProps(p, topic, groupId)
}
/**
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/kafka/producer/KafkaProducer.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package kafka.producer

import java.util.{ Properties, UUID }

import kafka.message.{ DefaultCompressionCodec, NoCompressionCodec }

/**
* Copied from https://github.com/stealthly/scala-kafka, 0.8.2-beta (not released at the moment)
*/
Expand Down
22 changes: 12 additions & 10 deletions src/main/scala/kafka/producer/ProducerProps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ object ProducerProps {
*/
def apply(brokerList: String, topic: String, clientId: String = UUID.randomUUID().toString): ProducerProps = {
val props = Map[String, String](
("metadata.broker.list" -> brokerList),
"metadata.broker.list" -> brokerList,

// defaults
("compression.codec" -> DefaultCompressionCodec.codec.toString),
("client.id" -> clientId),
("message.send.max.retries" -> 3.toString),
("request.required.acks" -> -1.toString),
("producer.type" -> "sync"))
"compression.codec" -> DefaultCompressionCodec.codec.toString,
"client.id" -> clientId,
"message.send.max.retries" -> 3.toString,
"request.required.acks" -> -1.toString,
"producer.type" -> "sync"
)

new ProducerProps(props, topic, clientId)
}
}

case class ProducerProps(private val params: Map[String, String], val topic: String, val clientId: String) {
case class ProducerProps(private val params: Map[String, String], topic: String, clientId: String) {

/**
* Asynchronous Mode
Expand All @@ -54,9 +55,10 @@ case class ProducerProps(private val params: Map[String, String], val topic: Str
*/
def asynchronous(batchSize: Int = 200, bufferMaxMs: Int = 500): ProducerProps = {
val p = params + (
("producer.type" -> "async"),
("batch.num.messages" -> batchSize.toString),
("queue.buffering.max.ms" -> bufferMaxMs.toString))
"producer.type" -> "async",
"batch.num.messages" -> batchSize.toString,
"queue.buffering.max.ms" -> bufferMaxMs.toString
)
ProducerProps(p, topic, clientId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import scala.concurrent.duration._
import scala.language.postfixOps

class ReactiveKafkaIntegrationSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike
with Matchers with BeforeAndAfterAll {
with Matchers with BeforeAndAfterAll {

def this() = this(ActorSystem("ReactiveKafkaIntegrationSpec"))

def uuid() = UUID.randomUUID().toString
implicit val timeout = Timeout(1 second)

def parititonizer(in: String): Option[Array[Byte]] = Some(in.hashCode().toInt.toString.getBytes)

override def afterAll {
TestKit.shutdownActorSystem(system)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import scala.concurrent.duration.{FiniteDuration, _}
import scala.language.postfixOps

class ReactiveKafkaPublisherSpec(defaultTimeout: FiniteDuration)
extends PublisherVerification[String](new TestEnvironment(defaultTimeout.toMillis), defaultTimeout.toMillis)
with TestNGSuiteLike with ReactiveStreamsTckVerificationBase with BaseSpec {
extends PublisherVerification[String](new TestEnvironment(defaultTimeout.toMillis), defaultTimeout.toMillis)
with TestNGSuiteLike with ReactiveStreamsTckVerificationBase with BaseSpec {

def this() = this(1300 millis)

Expand Down Expand Up @@ -43,7 +43,8 @@ class ReactiveKafkaPublisherSpec(defaultTimeout: FiniteDuration)
override def createFailedPublisher(): Publisher[String] = {
new Publisher[String] {
override def subscribe(subscriber: Subscriber[_ >: String]): Unit = {
subscriber.onSubscribe(new Subscription {override def cancel() {}
subscriber.onSubscribe(new Subscription {
override def cancel() {}

override def request(l: Long) {}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import scala.concurrent.duration.{FiniteDuration, _}
import scala.language.postfixOps

class ReactiveKafkaSubscriberBlackboxSpec(defaultTimeout: FiniteDuration)
extends SubscriberBlackboxVerification[String](new TestEnvironment(defaultTimeout.toMillis))
with TestNGSuiteLike with ReactiveStreamsTckVerificationBase {
extends SubscriberBlackboxVerification[String](new TestEnvironment(defaultTimeout.toMillis))
with TestNGSuiteLike with ReactiveStreamsTckVerificationBase {

def this() = this(300 millis)

def partitionizer(in: String): Option[Array[Byte]] = Some(Option(in) getOrElse (UUID.randomUUID().toString) getBytes)

override def createSubscriber(): Subscriber[String] = {
val topic = UUID.randomUUID().toString
kafka.publish(topic, "group", new StringEncoder(), partitionizer)
}

def createHelperSource(elements: Long) : Source[String, _] = elements match {
def createHelperSource(elements: Long): Source[String, _] = elements match {
case 0 => Source.empty
case Long.MaxValue => Source(initialDelay = 10 millis, interval = 10 millis, tick = message)
case n if n <= Int.MaxValue => Source(List.fill(n.toInt)(message))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import scala.concurrent.duration.{FiniteDuration, _}
import scala.language.postfixOps

class ReactiveKafkaSubscriberWhiteboxSpec(defaultTimeout: FiniteDuration)
extends SubscriberWhiteboxVerification[String](new TestEnvironment(defaultTimeout.toMillis))
with TestNGSuiteLike with ReactiveStreamsTckVerificationBase {
extends SubscriberWhiteboxVerification[String](new TestEnvironment(defaultTimeout.toMillis))
with TestNGSuiteLike with ReactiveStreamsTckVerificationBase {

def this() = this(300 millis)

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/kafka/producer/ProducerPropsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kafka.producer
import org.scalatest._
import java.util.UUID

class ProducerPropsTest extends WordSpecLike with Matchers {
class ProducerPropsTest extends WordSpecLike with Matchers {

def uuid() = UUID.randomUUID().toString
val brokerList = "localhost:9092"
Expand Down
42 changes: 22 additions & 20 deletions src/test/scala/ly/stealth/testing/BaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,27 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.{KafkaProducer => NewKafkaProducer}

trait BaseSpec {
def createNewKafkaProducer(brokerList: String,
acks: Int = -1,
metadataFetchTimeout: Long = 3000L,
blockOnBufferFull: Boolean = true,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0): NewKafkaProducer[Array[Byte], Array[Byte]] = {
def createNewKafkaProducer(
brokerList: String,
acks: Int = -1,
metadataFetchTimeout: Long = 3000L,
blockOnBufferFull: Boolean = true,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0
): NewKafkaProducer[Array[Byte], Array[Byte]] = {

val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")

new NewKafkaProducer[Array[Byte], Array[Byte]](producerProps)
}
}
new NewKafkaProducer[Array[Byte], Array[Byte]](producerProps)
}
}

0 comments on commit 3ffcf77

Please sign in to comment.