Skip to content

Commit

Permalink
77-aws-dynamodb-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmowen authored and brian committed Jan 10, 2017
1 parent f664447 commit 23fe018
Show file tree
Hide file tree
Showing 21 changed files with 956 additions and 0 deletions.
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ lazy val cassandra = project
Dependencies.Cassandra
)

lazy val dynamodb = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-dynamodb",
resolvers += "DynamoDBLocal" at "http://dynamodb-local.s3-website-us-west-2.amazonaws.com/release/",
Dependencies.DynamoDB,
parallelExecution in Test := false
)

lazy val files = project // The name file is taken by `sbt.file`!
.in(file("file"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [AMQP Connector](amqp.md)
* [Cassandra Connector](cassandra.md)
* [DynamoDB Connector](dynamodb.md)
* [File Connectors](file.md)
* [HBase Connectors](hbase.md)
* [MQTT Connector](mqtt.md)
Expand Down
61 changes: 61 additions & 0 deletions docs/src/main/paradox/dynamodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# AWS DynamoDB Connector

The AWS DynamoDB connector provides a flow for streaming DynamoDB requests. For more information about DynamoDB please visit the [official documentation](https://aws.amazon.com/dynamodb/).

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.typesafe.akka" %% "akka-stream-alpakka-dynamodb" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-alpakka-dynamodb_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.typesafe.akka", name: "akka-stream-alpakka-dynamodb_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

This connector uses the [default credential provider chain](http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html) provided by the [DynamoDB Java SDK](http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/basics.html) to retrieve credentials.

Before you can construct the client, you need an @scaladoc[ActorSystem](akka.actor.ActorSystem), @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer), and @scaladoc[ExecutionContext](scala.concurrent.ExecutionContext).

Scala
: @@snip (../../../../dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/ExampleSpec.scala) { #init-client }

Java
: @@snip (../../../../dynamodb/src/test/java/akka/stream/alpakka/dynamodb/ExampleJavaSpec.scala) { #init-client }

You can then create the client with a settings object.

Scala
: @@snip (../../../../dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/ExampleSpec.scala) { #client-construct }

Java
: @@snip (../../../../dynamodb/src/test/java/akka/stream/alpakka/dynamodb/ExampleJavaSpec.scala) { #client-construct }

We can now send requests to DynamoDB across the connection.

Scala
: @@snip (../../../../dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/ExampleSpec.scala) { #simple-request }

Java
: @@snip (../../../../dynamodb/src/test/java/akka/stream/alpakka/dynamodb/ExampleSpec.scala) { #simple-request }

13 changes: 13 additions & 0 deletions dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
akka.stream.alpakka.dynamodb {
# The AWS region
region = ""

# The AWS host
host = ""

# The AWS port
port: -1

# Max number of in flight requests from the AwsClient
parallelism = 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.dynamodb.impl

import java.io.{ ByteArrayInputStream, InputStream }
import java.util.concurrent.atomic.AtomicInteger

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http.HostConnectionPool
import akka.http.scaladsl.model.{ ContentType, HttpEntity, _ }
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.impl.AwsClient.{ AwsConnect, AwsRequestMetadata }
import akka.stream.scaladsl.Flow
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import com.amazonaws.auth.{ AWS4Signer, DefaultAWSCredentialsProviderChain }
import com.amazonaws.http.{ HttpMethodName, HttpResponseHandler, HttpResponse => AWSHttpResponse }
import com.amazonaws.{ DefaultRequest, HttpMethod => _, _ }

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

private[alpakka] object AwsClient {

case class AwsRequestMetadata(id: Long, op: AwsOp)

type AwsConnect =
Flow[(HttpRequest, AwsRequestMetadata), (Try[HttpResponse], AwsRequestMetadata), HostConnectionPool]

}

private[alpakka] trait AwsClient[S <: ClientSettings] {

protected implicit def system: ActorSystem

protected implicit def materializer: ActorMaterializer

protected implicit def ec: ExecutionContext

protected val settings: S
protected val connection: AwsConnect
protected val service: String
protected val defaultContentType: ContentType
protected val errorResponseHandler: HttpResponseHandler[AmazonServiceException]

private val requestId = new AtomicInteger()
private val credentials = new DefaultAWSCredentialsProviderChain()

private lazy val signer = {
val s = new AWS4Signer()
s.setServiceName(service)
s.setRegionName(settings.region)
s
}

private implicit def method(method: HttpMethodName): HttpMethod = method match {
case HttpMethodName.POST => HttpMethods.POST
case HttpMethodName.GET => HttpMethods.GET
case HttpMethodName.PUT => HttpMethods.PUT
case HttpMethodName.DELETE => HttpMethods.DELETE
case HttpMethodName.HEAD => HttpMethods.HEAD
case HttpMethodName.OPTIONS => HttpMethods.OPTIONS
case HttpMethodName.PATCH => HttpMethods.PATCH
}

private val signableUrl = Uri("https://" + settings.host + "/")

private val decider: Supervision.Decider = { case _ => Supervision.Stop }

def flow: Flow[AwsOp, AmazonWebServiceResult[ResponseMetadata], NotUsed] =
Flow[AwsOp]
.map(toAwsRequest)
.via(connection)
.mapAsync(settings.parallelism) {
case (Success(response), i) => toAwsResult(response, i)
case (Failure(ex), i) => Future.failed(ex)
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))

private def toAwsRequest(s: AwsOp): (HttpRequest, AwsRequestMetadata) = {
val original = s.marshaller.marshall(s.request)
original.setEndpoint(new java.net.URI("https://" + settings.host + "/"))
original.getHeaders.remove("Content-Type")
original.getHeaders.remove("Content-Length")
signer.sign(original, credentials.getCredentials)

val amzHeaders = original.getHeaders
val body = read(original.getContent)

val httpr = HttpRequest(uri = signableUrl, method = original.getHttpMethod,
headers = List(
headers.RawHeader("x-amz-date", amzHeaders.get("X-Amz-Date")),
headers.RawHeader("authorization", amzHeaders.get("Authorization")),
headers.RawHeader("x-amz-target", amzHeaders.get("X-Amz-Target"))
),
entity = HttpEntity(defaultContentType, body))

httpr -> AwsRequestMetadata(requestId.getAndIncrement(), s)
}

private def toAwsResult(response: HttpResponse,
metadata: AwsRequestMetadata): Future[AmazonWebServiceResult[ResponseMetadata]] = {
val req = new DefaultRequest(this.service)
val awsResp = new AWSHttpResponse(req, null) //
response.entity.dataBytes.runFold(Array.emptyByteArray)(_ ++ _).flatMap { bytes =>
awsResp.setContent(new ByteArrayInputStream(bytes))
awsResp.setStatusCode(response.status.intValue)
awsResp.setStatusText(response.status.defaultMessage)
if (200 <= awsResp.getStatusCode && awsResp.getStatusCode < 300) {
val handle = metadata.op.handler.handle(awsResp)
val resp = handle.getResult
Future.successful(resp)
} else {
response.headers.foreach { h =>
awsResp.addHeader(h.name, h.value)
}
Future.failed(errorResponseHandler.handle(awsResp))
}
}
}

private def read(in: InputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte).toArray

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.dynamodb.impl

abstract class ClientSettings {
val region: String
val host: String
val port: Int
val parallelism: Int
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.dynamodb.impl

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.MediaType.NotCompressible
import akka.http.scaladsl.model.{ ContentType, MediaType }
import akka.stream.ActorMaterializer
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.impl.AwsClient.{ AwsConnect, AwsRequestMetadata }
import akka.stream.scaladsl.{ Sink, Source }
import com.amazonaws.AmazonServiceException
import com.amazonaws.http.HttpResponseHandler

class DynamoClientImpl(val settings: DynamoSettings,
val errorResponseHandler: HttpResponseHandler[AmazonServiceException])(
implicit protected val system: ActorSystem,
implicit protected val materializer: ActorMaterializer)
extends AwsClient[DynamoSettings] {

override protected val service = "dynamodb"
override protected val defaultContentType =
ContentType.Binary(MediaType.customBinary("application", "x-amz-json-1.0", NotCompressible))
override protected implicit val ec = system.dispatcher

override protected val connection: AwsConnect =
if (settings.port == 443)
Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host)(materializer)
else
Http().cachedHostConnectionPool[AwsRequestMetadata](settings.host, settings.port)(materializer)

def single(op: AwsOp) = Source.single(op).via(flow).map(_.asInstanceOf[op.B]).runWith(Sink.head)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.dynamodb.impl

import com.amazonaws.AmazonServiceException
import com.amazonaws.http.HttpResponseHandler
import com.amazonaws.protocol.json._
import com.amazonaws.services.dynamodbv2.model._
import com.amazonaws.services.dynamodbv2.model.transform._

private[alpakka] trait DynamoProtocol {

val meta = new JsonOperationMetadata().withPayloadJson(true)

val protocol: SdkJsonProtocolFactory = new SdkJsonProtocolFactory(
new JsonClientMetadata()
.addAllErrorMetadata(
new JsonErrorShapeMetadata()
.withErrorCode("ItemCollectionSizeLimitExceededException")
.withModeledClass(classOf[ItemCollectionSizeLimitExceededException]),
new JsonErrorShapeMetadata()
.withErrorCode("ResourceInUseException")
.withModeledClass(classOf[ResourceInUseException]),
new JsonErrorShapeMetadata()
.withErrorCode("ResourceNotFoundException")
.withModeledClass(classOf[ResourceNotFoundException]),
new JsonErrorShapeMetadata()
.withErrorCode("ProvisionedThroughputExceededException")
.withModeledClass(classOf[ProvisionedThroughputExceededException]),
new JsonErrorShapeMetadata()
.withErrorCode("ConditionalCheckFailedException")
.withModeledClass(classOf[ConditionalCheckFailedException]),
new JsonErrorShapeMetadata()
.withErrorCode("InternalServerError")
.withModeledClass(classOf[InternalServerErrorException]),
new JsonErrorShapeMetadata()
.withErrorCode("LimitExceededException")
.withModeledClass(classOf[LimitExceededException]))
.withBaseServiceExceptionClass(classOf[com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException]))

val errorResponseHandler: HttpResponseHandler[AmazonServiceException] =
protocol.createErrorResponseHandler(new JsonErrorResponseMetadata())

protected val batchGetItemM = new BatchGetItemRequestMarshaller(protocol)
protected val batchWriteItemM = new BatchWriteItemRequestMarshaller(protocol)
protected val createTableM = new CreateTableRequestMarshaller(protocol)
protected val deleteItemM = new DeleteItemRequestMarshaller(protocol)
protected val deleteTableM = new DeleteTableRequestMarshaller(protocol)
protected val describeLimitsM = new DescribeLimitsRequestMarshaller(protocol)
protected val describeTableM = new DescribeTableRequestMarshaller(protocol)
protected val getItemM = new GetItemRequestMarshaller(protocol)
protected val listTablesM = new ListTablesRequestMarshaller(protocol)
protected val putItemM = new PutItemRequestMarshaller(protocol)
protected val queryM = new QueryRequestMarshaller(protocol)
protected val scanM = new ScanRequestMarshaller(protocol)
protected val updateItemM = new UpdateItemRequestMarshaller(protocol)
protected val updateTableM = new UpdateTableRequestMarshaller(protocol)

protected val batchGetItemU = protocol.createResponseHandler(meta, new BatchGetItemResultJsonUnmarshaller)
protected val batchWriteItemU = protocol.createResponseHandler(meta, new BatchWriteItemResultJsonUnmarshaller)
protected val createTableU = protocol.createResponseHandler(meta, new CreateTableResultJsonUnmarshaller)
protected val deleteItemU = protocol.createResponseHandler(meta, new DeleteItemResultJsonUnmarshaller)
protected val deleteTableU = protocol.createResponseHandler(meta, new DeleteTableResultJsonUnmarshaller)
protected val describeLimitsU = protocol.createResponseHandler(meta, new DescribeLimitsResultJsonUnmarshaller)
protected val describeTableU = protocol.createResponseHandler(meta, new DescribeTableResultJsonUnmarshaller)
protected val getItemU = protocol.createResponseHandler(meta, new GetItemResultJsonUnmarshaller)
protected val listTablesU = protocol.createResponseHandler(meta, new ListTablesResultJsonUnmarshaller)
protected val putItemU = protocol.createResponseHandler(meta, new PutItemResultJsonUnmarshaller)
protected val queryU = protocol.createResponseHandler(meta, new QueryResultJsonUnmarshaller)
protected val scanU = protocol.createResponseHandler(meta, new ScanResultJsonUnmarshaller)
protected val updateItemU = protocol.createResponseHandler(meta, new UpdateItemResultJsonUnmarshaller)
protected val updateTableU = protocol.createResponseHandler(meta, new UpdateTableResultJsonUnmarshaller)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.dynamodb.impl

import akka.actor.ActorSystem
import com.typesafe.config.Config

object DynamoSettings {
def apply(system: ActorSystem): DynamoSettings = {
val config = system.settings.config.getConfig("akka.stream.alpakka.dynamodb")
DynamoSettings(
region = config getString "region",
host = config getString "host",
port = config getInt "port",
parallelism = config getInt "parallelism"
)
}
}

case class DynamoSettings(region: String, host: String, port: Int, parallelism: Int) extends ClientSettings {
require(host.nonEmpty, "A host name must be provided.")
require(port > -1, "A port number must be provided.")
}
Loading

0 comments on commit 23fe018

Please sign in to comment.