Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Commit

Permalink
Import source from alpakka
Browse files Browse the repository at this point in the history
  • Loading branch information
aserrallerios committed Jan 21, 2018
1 parent 65f5050 commit 4267c60
Show file tree
Hide file tree
Showing 24 changed files with 1,278 additions and 1 deletion.
20 changes: 19 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,20 @@
*.class
.idea*
*.env
*.log
*.iml
target/
.target/
.DS_Store
.cache*
.classpath
.project
.settings
.tmpBin/
*.sublime-project
/bin/
ext-lib-src/
.ensime
.ensime_cache/
moquette_store.mapdb
moquette_store.mapdb.p
moquette_store.mapdb.t
49 changes: 49 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
language: scala

sudo: false

scala:
- "2.11.11"
- "2.12.2"

jdk:
- oraclejdk8

script:
- sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION ";test:compile"
# make 'git branch' work again
- git branch -f "$TRAVIS_BRANCH" && git checkout "$TRAVIS_BRANCH"
# make comparing to origin/master work
- git remote set-branches --add origin master
- git fetch
# check unformatted code
- git diff --exit-code --color || { echo "[error] Unformatted code found. Please run 'test:compile' and commit the reformatted code."; false; }

before_cache:
- find $HOME/.ivy2 -name "ivydata-*.properties" -print -delete
- find $HOME/.sbt -name "*.lock" -print -delete

cache:
directories:
- $HOME/.ivy2/cache
- $HOME/.sbt/boot

deploy:
provider: script
skip_cleanup: true
script:
- sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION publish
on:
tags: true
repo: akka/alpakka
condition: $AKKA_SERIES = 2.4

env:
matrix:
- AKKA_SERIES=2.4
- AKKA_SERIES=2.5
global:
# encrypt with: travis encrypt BINTRAY_USER=...
- secure: "foo"
# encrypt with: travis encrypt BINTRAY_PASS=...
- secure: "foo"
15 changes: 15 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
This software is licensed under the Apache 2 license, quoted below.

Copyright 2018 Albert Serrallé

Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at

[http://www.apache.org/licenses/LICENSE-2.0]

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
83 changes: 83 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Akka Stream Source for Kinesis Client Library [![travis-badge][]][travis]

[travis]: https://travis-ci.org/aserrallerios/kcl-akka-stream
[travis-badge]: https://travis-ci.org/aserrallerios/kcl-akka-stream.svg?branch=master

For more information about Kinesis please visit the [official documentation](https://aws.amazon.com/documentation/kinesis/).

The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.

For more information about KCL please visit the [official documentation](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

## Installation

TODO bintray publish

## Usage

### AWS KCL Worker Source & checkpointer

The KCL Worker Source needs to create and manage Worker instances in order to consume records from Kinesis Streams.

In order to use it, you need to provide a Worker builder and the Source settings:

```scala
val workerSourceSettings = KinesisWorkerSourceSettings(
bufferSize = 1000,
checkWorkerPeriodicity = 1 minute)
val builder: IRecordProcessorFactory => Worker = { recordProcessorFactory =>
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(
new KinesisClientLibConfiguration(
"myApp",
"myStreamName",
DefaultAWSCredentialsProviderChain.getInstance(),
s"${
import scala.sys.process._
"hostname".!!.trim()
}:${java.util.UUID.randomUUID()}"
)
)
.build()
}
```

The Source also needs an `ExecutionContext` to run the Worker's thread and to execute record checkpoints. Then the Source can be created as usual:

```scala
implicit val _ =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1000))
KinesisWorker(builder, workerSourceSettings).to(Sink.ignore)
```

### Committing records

The KCL Worker Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow.

In order to use the Flow you can provide additional settings:

```scala
val checkpointSettings = KinesisWorkerCheckpointSettings(100, 30 seconds)
KinesisWorker(builder, workerSourceSettings)
.via(KinesisWorker.checkpointRecordsFlow(checkpointSettings))
.to(Sink.ignore)
KinesisWorker(builder, workerSourceSettings).to(
KinesisWorker.checkpointRecordsSink(checkpointSettings))
```

## License

Copyright (c) 2018 Albert Serrallé

This version of *kcl-akka-stream* is released under the Apache License, Version 2.0 (see LICENSE.txt).
By downloading and using this software you agree to the
[End-User License Agreement (EULA)](LICENSE).

We build on a number of third-party software tools, with the following licenses:

#### Java Libraries

Third-Party software | License
----------------------------|-----------------------
amazon-kinesis-client | Amazon Software License
21 changes: 21 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
lazy val `kcl-akka-stream` =
Project(id = "kcl-akka-stream", base = file("."))
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "kcl-akka-stream",
// By default scalatest futures time out in 150 ms, dilate that to 600ms.
// This should not impact the total test time as we don't expect to hit this
// timeout.
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest,
"-F",
"4")
)
.settings(
Dependencies.Kinesis,
// For mockito
parallelExecution in Test := false,
onLoadMessage :=
"""
|** Welcome to the sbt build definition for kcl-akka-stream! **
""".stripMargin
)
52 changes: 52 additions & 0 deletions project/Common.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sbt._
import sbt.Keys._
import sbt.plugins.JvmPlugin
import com.lucidchart.sbt.scalafmt.ScalafmtCorePlugin.autoImport._
import de.heikoseeberger.sbtheader._
import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._
import sbtunidoc.BaseUnidocPlugin.autoImport._

object Common extends AutoPlugin {

override def trigger = allRequirements

override def requires = JvmPlugin && HeaderPlugin

override lazy val projectSettings =
Dependencies.Common ++ Seq(
// organization := "com.lightbend.akka",
// organizationName := "Lightbend Inc.",
homepage := Some(url("https://github.com/aserrallerios/kcl-akka-stream")),
scmInfo := Some(ScmInfo(url("https://github.com/aserrallerios/kcl-akka-stream"), "git@github.com:aserrallerios/kcl-akka-stream.git")),
developers += Developer("aserralle",
"Albert Serrallé",
"aserrallerios@gmail.com",
url("https://github.com/aserrallerios")),
licenses := Seq(("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))),
crossVersion := CrossVersion.binary,
scalacOptions ++= Seq(
"-encoding",
"UTF-8",
"-feature",
"-unchecked",
"-deprecation",
//"-Xfatal-warnings",
"-Xlint",
"-Yno-adapted-args",
"-Ywarn-dead-code",
"-Xfuture"
),
javacOptions in compile ++= Seq(
"-Xlint:unchecked"
),
// autoAPIMappings := true,
// apiURL := Some(url()),
// show full stack traces and test case durations
testOptions in Test += Tests.Argument("-oDF"),
// -v Log "test run started" / "test started" / "test run finished" events on log level "info" instead of "debug".
// -a Show stack traces and exception class name for AssertionErrors.
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
scalafmtOnCompile := true,
headerLicense := Some(HeaderLicense.Custom("Copyright (C) 2018 Albert Serrallé"))
)
}
29 changes: 29 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import sbt._, Keys._

object Dependencies {

val AkkaVersion = sys.env.get("AKKA_SERIES") match {
case Some("2.5") => "2.5.6"
case _ => "2.4.19"
}

val AwsSdkVersion = "1.11.226"

val Common = Seq(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.0.1" % Test, // ApacheV2
"com.novocode" % "junit-interface" % "0.11" % Test, // BSD-style
"junit" % "junit" % "4.12" % Test // Eclipse Public License 1.0
)
)

val Kinesis = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-kinesis" % AwsSdkVersion, // ApacheV2
"com.amazonaws" % "amazon-kinesis-client" % "1.8.8", // Amazon Software License
"org.mockito" % "mockito-core" % "2.7.11" % Test // MIT
)
)
}
47 changes: 47 additions & 0 deletions project/Publish.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sbt._, Keys._

/**
* For projects that are not to be published.
*/
object NoPublish extends AutoPlugin {
override def requires = plugins.JvmPlugin

override def projectSettings = Seq(
publishArtifact := false,
publish := {},
publishLocal := {}
)
}

object Publish extends AutoPlugin {
import bintray.BintrayPlugin
import bintray.BintrayPlugin.autoImport._

override def trigger = allRequirements
override def requires = BintrayPlugin

override def projectSettings = Seq(
bintrayOrganization := Some("aserralle"),
bintrayPackage := "kcl-akka-stream"
)
}

object PublishUnidoc extends AutoPlugin {
import sbtunidoc.BaseUnidocPlugin._
import sbtunidoc.BaseUnidocPlugin.autoImport._
import sbtunidoc.ScalaUnidocPlugin.autoImport.ScalaUnidoc

override def requires = sbtunidoc.ScalaUnidocPlugin

def publishOnly(artifactType: String)(config: PublishConfiguration) = {
val newArts = config.artifacts.filter(_._1.`type` == artifactType)
config.withArtifacts(newArts)
}

override def projectSettings = Seq(
doc in Compile := (doc in ScalaUnidoc).value,
target in unidoc in ScalaUnidoc := crossTarget.value / "api",
publishConfiguration ~= publishOnly(Artifact.DocType),
publishLocalConfiguration ~= publishOnly(Artifact.DocType)
)
}
55 changes: 55 additions & 0 deletions project/TestChanged.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package akka.stream.alpakka

import scala.collection.immutable
import scala.sys.process._

import sbt._
import sbt.Keys._

object TestChanged extends AutoPlugin {
override def trigger = allRequirements
override def requires = plugins.JvmPlugin

val changedDirectories = taskKey[immutable.Set[String]]("List of touched modules in this PR branch")
val testChanged = taskKey[Unit]("Test all subprojects with changes compared to master")

override lazy val buildSettings = Seq(
changedDirectories := {
val log = streams.value.log
val target = "origin/master"

// TODO could use jgit
val diffOutput = s"git diff $target --name-only".!!.split("\n")
val changedDirectories =
diffOutput
.map(l => l.trim)
.map(l l.takeWhile(_ != '/'))
.map(new File(_))
.map(file => if (file.isDirectory) file.toString else "")
.toSet

log.info("Detected changes in directories: " + changedDirectories.mkString("[", ", ", "]"))
changedDirectories
}
)

override lazy val projectSettings = Seq(
testChanged := Def.taskDyn {
val skip = Def.setting { task(()) }
if (shouldBuild(name.value, changedDirectories.value)) test in Test
else skip
}.value
)

implicit class RegexHelper(val sc: StringContext) extends AnyVal {
def re: scala.util.matching.Regex = sc.parts.mkString.r
}

private def shouldBuild(projectName: String, changedDirectories: Set[String]) = projectName match {
case "alpakka" => false
case re"akka-stream-alpakka-(.+)$subproject" =>
changedDirectories.contains(subproject) || changedDirectories.contains("") || changedDirectories.contains(
"project"
)
}
}
Loading

0 comments on commit 4267c60

Please sign in to comment.