diff --git a/build.sbt b/build.sbt index c1ee7f8f7..c46fec176 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,7 @@ import SonatypeKeys._ +import scalariform.formatter.preferences.{SpacesAroundMultiImports, CompactControlReadability, PreserveSpaceBeforeArguments, DoubleIndentClassDeclaration} + sonatypeSettings name := "reactive-kafka" @@ -52,3 +54,11 @@ pomExtra := ( ) + +scalariformSettings + +ScalariformKeys.preferences := ScalariformKeys.preferences.value + .setPreference(DoubleIndentClassDeclaration, true) + .setPreference(PreserveSpaceBeforeArguments, true) + .setPreference(CompactControlReadability, true) + .setPreference(SpacesAroundMultiImports, false) \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 323f2f19f..4d03c8239 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,3 +3,5 @@ resolvers += Classpaths.sbtPluginReleases addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1") addSbtPlugin("com.typesafe.sbt" % "sbt-pgp" % "0.8.3") + +addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.4.0") \ No newline at end of file diff --git a/src/main/scala/com/softwaremill/react/kafka/KafkaActorPublisher.scala b/src/main/scala/com/softwaremill/react/kafka/KafkaActorPublisher.scala index ee60ba54c..15e0cdba5 100644 --- a/src/main/scala/com/softwaremill/react/kafka/KafkaActorPublisher.scala +++ b/src/main/scala/com/softwaremill/react/kafka/KafkaActorPublisher.scala @@ -33,13 +33,13 @@ private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer, decoder: De @tailrec private def readDemandedItems() { - tryReadingSingleElement() match { - case Success(None) => - if (demand_?) self ! Poll - case Success(Some(element)) => - onNext(element) - if (demand_?) readDemandedItems() - case Failure(ex) => onError(ex) + tryReadingSingleElement() match { + case Success(None) => + if (demand_?) self ! Poll + case Success(Some(element)) => + onNext(element) + if (demand_?) readDemandedItems() + case Failure(ex) => onError(ex) } } diff --git a/src/main/scala/com/softwaremill/react/kafka/KafkaActorSubscriber.scala b/src/main/scala/com/softwaremill/react/kafka/KafkaActorSubscriber.scala index 444c911de..7de7aa48a 100644 --- a/src/main/scala/com/softwaremill/react/kafka/KafkaActorSubscriber.scala +++ b/src/main/scala/com/softwaremill/react/kafka/KafkaActorSubscriber.scala @@ -6,10 +6,11 @@ import kafka.producer.KafkaProducer import kafka.serializer.Encoder private[kafka] class KafkaActorSubscriber[T]( - val producer: KafkaProducer, - val encoder: Encoder[T], - partitionizer: T => Option[Array[Byte]] = (_: T) => None) - extends ActorSubscriber with ActorLogging { + val producer: KafkaProducer, + val encoder: Encoder[T], + partitionizer: T => Option[Array[Byte]] = (_: T) => None +) + extends ActorSubscriber with ActorLogging { protected def requestStrategy = WatermarkRequestStrategy(10) diff --git a/src/main/scala/com/softwaremill/react/kafka/ReactiveKafka.scala b/src/main/scala/com/softwaremill/react/kafka/ReactiveKafka.scala index 0357fe0ad..7f9d4f550 100644 --- a/src/main/scala/com/softwaremill/react/kafka/ReactiveKafka.scala +++ b/src/main/scala/com/softwaremill/react/kafka/ReactiveKafka.scala @@ -1,11 +1,11 @@ package com.softwaremill.react.kafka -import akka.actor.{ ActorRef, Props, ActorSystem } -import akka.stream.actor.{ ActorSubscriber, ActorPublisher } +import akka.actor.{ActorRef, Props, ActorSystem} +import akka.stream.actor.{ActorSubscriber, ActorPublisher} import kafka.consumer._ import kafka.producer._ -import kafka.serializer.{ Encoder, Decoder } -import org.reactivestreams.{ Publisher, Subscriber } +import kafka.serializer.{Encoder, Decoder} +import org.reactivestreams.{Publisher, Subscriber} class ReactiveKafka(val host: String, val zooKeeperHost: String) { @@ -13,7 +13,8 @@ class ReactiveKafka(val host: String, val zooKeeperHost: String) { topic: String, groupId: String, encoder: Encoder[T], - partitionizer: T => Option[Array[Byte]] = (_: T) => None)(implicit actorSystem: ActorSystem): Subscriber[T] = { + partitionizer: T => Option[Array[Byte]] = (_: T) => None + )(implicit actorSystem: ActorSystem): Subscriber[T] = { val props = ProducerProps(host, topic, groupId) ActorSubscriber[T](producerActor(props, encoder, partitionizer)) } @@ -30,7 +31,8 @@ class ReactiveKafka(val host: String, val zooKeeperHost: String) { def producerActor[T]( props: ProducerProps, encoder: Encoder[T], - partitionizer: T => Option[Array[Byte]] = (_: T) => None)(implicit actorSystem: ActorSystem): ActorRef = { + partitionizer: T => Option[Array[Byte]] = (_: T) => None + )(implicit actorSystem: ActorSystem): ActorRef = { val producer = new KafkaProducer(props) actorSystem.actorOf(Props(new KafkaActorSubscriber(producer, encoder, partitionizer)).withDispatcher("kafka-subscriber-dispatcher")) } diff --git a/src/main/scala/kafka/consumer/ConsumerProps.scala b/src/main/scala/kafka/consumer/ConsumerProps.scala index f31f2cd7a..073a47259 100644 --- a/src/main/scala/kafka/consumer/ConsumerProps.scala +++ b/src/main/scala/kafka/consumer/ConsumerProps.scala @@ -35,20 +35,21 @@ object ConsumerProps { */ def apply(brokerList: String, zooKeeperHost: String, topic: String, groupId: String = UUID.randomUUID().toString): ConsumerProps = { val props = Map[String, String]( - ("metadata.broker.list" -> brokerList), - ("group.id" -> groupId), - ("zookeeper.connect" -> zooKeeperHost), + "metadata.broker.list" -> brokerList, + "group.id" -> groupId, + "zookeeper.connect" -> zooKeeperHost, // defaults - ("auto.offset.reset" -> "smallest"), - ("consumer.timeout.ms" -> "1500"), - ("offsets.storage" -> "zookeeper")) + "auto.offset.reset" -> "smallest", + "consumer.timeout.ms" -> "1500", + "offsets.storage" -> "zookeeper" + ) new ConsumerProps(props, topic, groupId) } } -case class ConsumerProps(private val params: Map[String, String], val topic: String, val groupId: String) { +case class ConsumerProps(private val params: Map[String, String], topic: String, groupId: String) { /** * Consumer Timeout @@ -90,8 +91,9 @@ case class ConsumerProps(private val params: Map[String, String], val topic: Str */ def kafkaOffsetsStorage(dualCommit: Boolean): ConsumerProps = { val p = params + ( - ("offsets.storage" -> "kafka"), - ("dual.commit.enabled" -> dualCommit.toString)) + "offsets.storage" -> "kafka", + "dual.commit.enabled" -> dualCommit.toString + ) ConsumerProps(p, topic, groupId) } /** diff --git a/src/main/scala/kafka/producer/KafkaProducer.scala b/src/main/scala/kafka/producer/KafkaProducer.scala index c65d7c4e9..7272d7978 100644 --- a/src/main/scala/kafka/producer/KafkaProducer.scala +++ b/src/main/scala/kafka/producer/KafkaProducer.scala @@ -1,9 +1,5 @@ package kafka.producer -import java.util.{ Properties, UUID } - -import kafka.message.{ DefaultCompressionCodec, NoCompressionCodec } - /** * Copied from https://github.com/stealthly/scala-kafka, 0.8.2-beta (not released at the moment) */ diff --git a/src/main/scala/kafka/producer/ProducerProps.scala b/src/main/scala/kafka/producer/ProducerProps.scala index e4e8865f4..e169e765c 100644 --- a/src/main/scala/kafka/producer/ProducerProps.scala +++ b/src/main/scala/kafka/producer/ProducerProps.scala @@ -31,20 +31,21 @@ object ProducerProps { */ def apply(brokerList: String, topic: String, clientId: String = UUID.randomUUID().toString): ProducerProps = { val props = Map[String, String]( - ("metadata.broker.list" -> brokerList), + "metadata.broker.list" -> brokerList, // defaults - ("compression.codec" -> DefaultCompressionCodec.codec.toString), - ("client.id" -> clientId), - ("message.send.max.retries" -> 3.toString), - ("request.required.acks" -> -1.toString), - ("producer.type" -> "sync")) + "compression.codec" -> DefaultCompressionCodec.codec.toString, + "client.id" -> clientId, + "message.send.max.retries" -> 3.toString, + "request.required.acks" -> -1.toString, + "producer.type" -> "sync" + ) new ProducerProps(props, topic, clientId) } } -case class ProducerProps(private val params: Map[String, String], val topic: String, val clientId: String) { +case class ProducerProps(private val params: Map[String, String], topic: String, clientId: String) { /** * Asynchronous Mode @@ -54,9 +55,10 @@ case class ProducerProps(private val params: Map[String, String], val topic: Str */ def asynchronous(batchSize: Int = 200, bufferMaxMs: Int = 500): ProducerProps = { val p = params + ( - ("producer.type" -> "async"), - ("batch.num.messages" -> batchSize.toString), - ("queue.buffering.max.ms" -> bufferMaxMs.toString)) + "producer.type" -> "async", + "batch.num.messages" -> batchSize.toString, + "queue.buffering.max.ms" -> bufferMaxMs.toString + ) ProducerProps(p, topic, clientId) } diff --git a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala index 6cb88dc23..9b9d9f9cc 100644 --- a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala +++ b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala @@ -15,7 +15,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps class ReactiveKafkaIntegrationSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike -with Matchers with BeforeAndAfterAll { + with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("ReactiveKafkaIntegrationSpec")) @@ -23,7 +23,7 @@ with Matchers with BeforeAndAfterAll { implicit val timeout = Timeout(1 second) def parititonizer(in: String): Option[Array[Byte]] = Some(in.hashCode().toInt.toString.getBytes) - + override def afterAll { TestKit.shutdownActorSystem(system) } diff --git a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaPublisherSpec.scala b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaPublisherSpec.scala index 896039400..08af6b446 100644 --- a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaPublisherSpec.scala +++ b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaPublisherSpec.scala @@ -13,8 +13,8 @@ import scala.concurrent.duration.{FiniteDuration, _} import scala.language.postfixOps class ReactiveKafkaPublisherSpec(defaultTimeout: FiniteDuration) - extends PublisherVerification[String](new TestEnvironment(defaultTimeout.toMillis), defaultTimeout.toMillis) - with TestNGSuiteLike with ReactiveStreamsTckVerificationBase with BaseSpec { + extends PublisherVerification[String](new TestEnvironment(defaultTimeout.toMillis), defaultTimeout.toMillis) + with TestNGSuiteLike with ReactiveStreamsTckVerificationBase with BaseSpec { def this() = this(1300 millis) @@ -43,7 +43,8 @@ class ReactiveKafkaPublisherSpec(defaultTimeout: FiniteDuration) override def createFailedPublisher(): Publisher[String] = { new Publisher[String] { override def subscribe(subscriber: Subscriber[_ >: String]): Unit = { - subscriber.onSubscribe(new Subscription {override def cancel() {} + subscriber.onSubscribe(new Subscription { + override def cancel() {} override def request(l: Long) {} }) diff --git a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberBlackboxSpec.scala b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberBlackboxSpec.scala index 2a87e4d99..048e21c90 100644 --- a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberBlackboxSpec.scala +++ b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberBlackboxSpec.scala @@ -12,19 +12,19 @@ import scala.concurrent.duration.{FiniteDuration, _} import scala.language.postfixOps class ReactiveKafkaSubscriberBlackboxSpec(defaultTimeout: FiniteDuration) - extends SubscriberBlackboxVerification[String](new TestEnvironment(defaultTimeout.toMillis)) - with TestNGSuiteLike with ReactiveStreamsTckVerificationBase { + extends SubscriberBlackboxVerification[String](new TestEnvironment(defaultTimeout.toMillis)) + with TestNGSuiteLike with ReactiveStreamsTckVerificationBase { def this() = this(300 millis) def partitionizer(in: String): Option[Array[Byte]] = Some(Option(in) getOrElse (UUID.randomUUID().toString) getBytes) - + override def createSubscriber(): Subscriber[String] = { val topic = UUID.randomUUID().toString kafka.publish(topic, "group", new StringEncoder(), partitionizer) } - def createHelperSource(elements: Long) : Source[String, _] = elements match { + def createHelperSource(elements: Long): Source[String, _] = elements match { case 0 => Source.empty case Long.MaxValue => Source(initialDelay = 10 millis, interval = 10 millis, tick = message) case n if n <= Int.MaxValue => Source(List.fill(n.toInt)(message)) diff --git a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberWhiteboxSpec.scala b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberWhiteboxSpec.scala index ed615c5d1..8486e4eea 100644 --- a/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberWhiteboxSpec.scala +++ b/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaSubscriberWhiteboxSpec.scala @@ -12,8 +12,8 @@ import scala.concurrent.duration.{FiniteDuration, _} import scala.language.postfixOps class ReactiveKafkaSubscriberWhiteboxSpec(defaultTimeout: FiniteDuration) - extends SubscriberWhiteboxVerification[String](new TestEnvironment(defaultTimeout.toMillis)) - with TestNGSuiteLike with ReactiveStreamsTckVerificationBase { + extends SubscriberWhiteboxVerification[String](new TestEnvironment(defaultTimeout.toMillis)) + with TestNGSuiteLike with ReactiveStreamsTckVerificationBase { def this() = this(300 millis) diff --git a/src/test/scala/kafka/producer/ProducerPropsTest.scala b/src/test/scala/kafka/producer/ProducerPropsTest.scala index 8fd6016a4..110b5ca04 100644 --- a/src/test/scala/kafka/producer/ProducerPropsTest.scala +++ b/src/test/scala/kafka/producer/ProducerPropsTest.scala @@ -3,7 +3,7 @@ package kafka.producer import org.scalatest._ import java.util.UUID -class ProducerPropsTest extends WordSpecLike with Matchers { +class ProducerPropsTest extends WordSpecLike with Matchers { def uuid() = UUID.randomUUID().toString val brokerList = "localhost:9092" diff --git a/src/test/scala/ly/stealth/testing/BaseSpec.scala b/src/test/scala/ly/stealth/testing/BaseSpec.scala index a5c9b063c..9754ad604 100644 --- a/src/test/scala/ly/stealth/testing/BaseSpec.scala +++ b/src/test/scala/ly/stealth/testing/BaseSpec.scala @@ -6,25 +6,27 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.{KafkaProducer => NewKafkaProducer} trait BaseSpec { - def createNewKafkaProducer(brokerList: String, - acks: Int = -1, - metadataFetchTimeout: Long = 3000L, - blockOnBufferFull: Boolean = true, - bufferSize: Long = 1024L * 1024L, - retries: Int = 0): NewKafkaProducer[Array[Byte], Array[Byte]] = { + def createNewKafkaProducer( + brokerList: String, + acks: Int = -1, + metadataFetchTimeout: Long = 3000L, + blockOnBufferFull: Boolean = true, + bufferSize: Long = 1024L * 1024L, + retries: Int = 0 + ): NewKafkaProducer[Array[Byte], Array[Byte]] = { - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) - producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) - producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") - producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) + producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") + producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - new NewKafkaProducer[Array[Byte], Array[Byte]](producerProps) - } - } + new NewKafkaProducer[Array[Byte], Array[Byte]](producerProps) + } +}