Skip to content

Commit

Permalink
Added support for sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
gkatzioura committed May 28, 2018
0 parents commit 0358ff0
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 0 deletions.
70 changes: 70 additions & 0 deletions .gitignore
@@ -0,0 +1,70 @@
# Created by .ignore support plugin (hsz.mobi)
### Scala template
*.class
*.log
### SBT template
# Simple Build Tool
# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control

dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
.history
.cache
.lib/
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839

# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/dictionaries
.idea/**/shelf

# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml

# Gradle
.idea/**/gradle.xml
.idea/**/libraries

# CMake
cmake-build-debug/
cmake-build-release/

# Mongo Explorer plugin
.idea/**/mongoSettings.xml

# File-based project format
*.iws

# IntelliJ
out/

# mpeltonen/sbt-idea plugin
.idea_modules/

# JIRA plugin
atlassian-ide-plugin.xml

# Cursive Clojure plugin
.idea/replstate.xml

# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties

# Editor-based Rest Client
.idea/httpRequests
.idea/*
32 changes: 32 additions & 0 deletions README.md
@@ -0,0 +1,32 @@
# container-queue-worker

The container-queue-worker projects aims to provide an easy way to migrate your elastic beanstalk workers to a docker orchestration system.

The projects works by creating a configuration file and mounting it to your container on the '/etc/worker/worker.conf' path.

```
worker {
type = sqs
server-endpoint = http://{docker-service}
aws {
queue-endpoint = http://{amazon queue endpoint}
region = {aws region}
accessKey = {aws access key}
secretKey = {aws secret key}
}
}
```

Another way is to configure the queue worker using environmental variables

```
WORKER_TYPE=sqs
WORKER_SERVER_ENDPOINT=http://{docker-service}
WORKER_AWS_QUEUE_ENDPOINT=http://{amazon queue endpoint}
WORKER_AWS_REGION={aws region}
WORKER_AWS_ACCESS_KEY={aws access key}
WORKER_AWS_SECRET_KEY={aws secret key}
```


Licensed under Apache, Version 2.0
39 changes: 39 additions & 0 deletions build.sbt
@@ -0,0 +1,39 @@
/*
* Copyright 2018 Emmanouil Gkatziouras
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

name := "queue-worker"

version := "0.1"

scalaVersion := "2.12.5"

lazy val akkaVersion = "2.5.3"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.18",
"org.scalactic" %% "scalactic" % "3.0.5",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.typesafe.akka" %% "akka-http" % "10.1.1",
"com.typesafe.akka" %% "akka-stream" % "2.5.11",
"com.google.code.gson" % "gson" % "2.8.5"
)

mainClass in Compile := Some("com.gkatzioura.queue.worker.WorkerQueueApp")

enablePlugins(JavaAppPackaging)
enablePlugins(DockerPlugin)
1 change: 1 addition & 0 deletions project/build.properties
@@ -0,0 +1 @@
sbt.version = 1.1.4
19 changes: 19 additions & 0 deletions project/plugins.sbt
@@ -0,0 +1,19 @@
/*
* Copyright 2018 Emmanouil Gkatziouras
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

logLevel := Level.Warn

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.4")
12 changes: 12 additions & 0 deletions src/main/resources/worker.conf
@@ -0,0 +1,12 @@
worker {
type = sqs
type = ${?WORKER_TYPE}
server-endpoint = ${?WORKER_SERVER_ENDPOINT}
aws {
queue-endpoint = ${?WORKER_AWS_QUEUE_ENDPOINT}
region = "eu-west-1"
region = ${?WORKER_AWS_REGION}
accessKey = ${?WORKER_AWS_ACCESS_KEY}
secretKey = ${?WORKER_AWS_SECRET_KEY}
}
}
34 changes: 34 additions & 0 deletions src/main/scala/com/gkatzioura/queue/worker/QueueWorker.scala
@@ -0,0 +1,34 @@
/*
* Copyright 2018 Emmanouil Gkatziouras
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.gkatzioura.queue.worker

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.gkatzioura.queue.worker.WorkerType.WorkerType
import com.typesafe.config.Config

trait QueueWorker {
}

object QueueWorker {

def apply(workerType: WorkerType,config: Config) (implicit actorSystem: ActorSystem, actorMaterializer: ActorMaterializer): QueueWorker = {

if(workerType==WorkerType.AMAZON_SQS) new SQSWorker(config)
else throw new IllegalArgumentException("Provide a valid worker");
}
}
105 changes: 105 additions & 0 deletions src/main/scala/com/gkatzioura/queue/worker/SQSWorker.scala
@@ -0,0 +1,105 @@
/*
* Copyright 2018 Emmanouil Gkatziouras
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.gkatzioura.queue.worker

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream._
import akka.stream.alpakka.sqs.MessageAction
import akka.stream.alpakka.sqs.scaladsl.{SqsAckFlow, SqsAckSink, SqsSource}
import akka.stream.scaladsl.{Flow, GraphDSL, Partition, RestartSource, RunnableGraph, Sink}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.model.Message
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}
import com.google.gson.Gson
import com.typesafe.config.Config

import scala.concurrent.Future
import scala.concurrent.duration._

class SQSWorker(configuration: Config) (implicit actorSystem: ActorSystem,actorMaterializer: ActorMaterializer) extends QueueWorker {

implicit val executionContext = actorSystem.dispatcher

val endpoint = configuration.getString("worker.aws.queue-endpoint")
val region = configuration.getString("worker.aws.region")
val serverEndpoint = configuration.getString("worker.server-endpoint")

implicit val sqs : AmazonSQSAsync = createSQS()

val source =
RestartSource.withBackoff(minBackoff = 1 second,maxBackoff = 30 seconds,randomFactor = 0.2) { () =>
SqsSource(endpoint)
.log("before-map")
.withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel))
.mapAsync(parallelism = 2) (
request => {
Http().singleRequest((messageToRequest(serverEndpoint, request)))
.map(response => {requestToMessage(request,response)})
.recoverWith {
case e => {
Logging.getLogger(actorSystem,this).error("Could not process message {} ignoring due to error {}",request.getMessageId,e);
Future.successful((request,false))
}
}
}
)
}

val g = GraphDSL.create() { implicit b =>

import GraphDSL.Implicits._
val partition = b.add(Partition[(Message,Boolean)](2, mex => if (mex._2) 0 else 1))
source ~> partition.in
partition.out(0) ~> Flow[(Message,Boolean)].map { me => (me._1, MessageAction.Delete)} ~> SqsAckSink(endpoint)
partition.out(1) ~> Flow[(Message,Boolean)].map { me => (me._1, MessageAction.Ignore)} ~> SqsAckFlow(endpoint) ~> Sink.ignore

ClosedShape
}

RunnableGraph.fromGraph(g).run

def createSQS() : AmazonSQSAsync = {

AmazonSQSAsyncClientBuilder
.standard()
.withEndpointConfiguration(new EndpointConfiguration(endpoint, region))
.build()
}

def messageToRequest(serverEndpoint: String, message: Message) : HttpRequest = {

val httpEntity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, new Gson().toJson(message) )
HttpRequest(HttpMethods.POST,serverEndpoint,List(),httpEntity)
}

def requestToMessage(message: Message,httpResponse: HttpResponse): (Message,Boolean) = {

val logger = Logging.getLogger(actorSystem,this)

if(httpResponse.status==StatusCodes.OK) {
logger.info("Successfully processed message {}",message.getMessageId)
(message,true)
} else {
logger.error("Could not process message {} status {}",message.getMessageId,httpResponse.status)
(message,false)
}
}

}
48 changes: 48 additions & 0 deletions src/main/scala/com/gkatzioura/queue/worker/WorkerQueueApp.scala
@@ -0,0 +1,48 @@
/*
* Copyright 2018 Emmanouil Gkatziouras
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.gkatzioura.queue.worker

import java.io.File

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}

object WorkerQueueApp extends App {


val configuration = getConfiguration()
val workerTypeVal = configuration.getString("worker.type")
val workerType = WorkerType.withName(workerTypeVal)

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

QueueWorker(workerType,configuration)

def getConfiguration(): Config = {

val workerFile = new File("/etc/worker/worker.conf");

if(workerFile.exists) {
ConfigFactory.parseFile(workerFile)
} else {
ConfigFactory.parseResources("worker.conf").resolve()
}
}

}
24 changes: 24 additions & 0 deletions src/main/scala/com/gkatzioura/queue/worker/WorkerType.scala
@@ -0,0 +1,24 @@
/*
* Copyright 2018 Emmanouil Gkatziouras
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.gkatzioura.queue.worker

object WorkerType extends Enumeration {
type WorkerType = Value
val AMAZON_SQS = Value("sqs")
val AZURE_STORAGE_QUEUE = Value("azure_storage_queue")
val GOOGLE_PUB_SUB = Value("google_pub_sub")
}

0 comments on commit 0358ff0

Please sign in to comment.