Skip to content
This repository has been archived by the owner on Apr 15, 2018. It is now read-only.

Consul support as coordination service #12

Merged
merged 1 commit into from
Nov 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ final class Settings private (system: ExtendedActorSystem) extends Extension {

val backend: Coordination.Backend = {
config.getString("coordination.backend").toLowerCase match {
case "etcd" => Coordination.Backend.Etcd
case other => throw new IllegalArgumentException(s"Unknown coordination backend $other!")
case "etcd" => Coordination.Backend.Etcd
case "consul" => Coordination.Backend.Consul
case other => throw new IllegalArgumentException(s"Unknown coordination backend $other!")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2015 Heiko Seeberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package de.heikoseeberger.constructr.akka

import akka.actor.ActorDSL.{ Act, actor }
import akka.cluster.{ Cluster, ClusterEvent }
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.StatusCodes.{ NotFound, OK }
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.stream.ActorMaterializer
import akka.testkit.TestDuration
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import de.heikoseeberger.constructr.coordination.Coordination.AddressSerialization
import de.heikoseeberger.constructr.coordination._
import org.scalatest.{ BeforeAndAfterAll, FreeSpecLike, Matchers }
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.sys.process.{ ProcessLogger, stringToProcess }

object ConstructrMultiNodeConfigConsul extends MultiNodeConfig {

val host = "docker-machine ip default".!!.trim

val nodes = 1.to(5).to[List].map(n => node(2550 + n))

private def node(port: Int) = {
commonConfig(ConfigFactory.load())
val node = role(port.toString)
nodeConfig(node)(ConfigFactory.parseString(
s"""|akka.actor.provider = akka.cluster.ClusterActorRefProvider
|akka.loggers = ["de.heikoseeberger.akkalog4j.Log4jLogger"]
|akka.loglevel = "DEBUG"
|akka.remote.netty.tcp.hostname = "127.0.0.1"
|akka.remote.netty.tcp.port = $port
|constructr.akka.coordination.backend = "consul"
|constructr.akka.coordination.port = 8500
|constructr.akka.coordination.host = $host""".stripMargin
))
node
}
}

class MultiNodeConstructrSpecConsulMultiJvmNode1 extends MultiNodeConstructrSpecConsul
class MultiNodeConstructrSpecConsulMultiJvmNode2 extends MultiNodeConstructrSpecConsul
class MultiNodeConstructrSpecConsulMultiJvmNode3 extends MultiNodeConstructrSpecConsul
class MultiNodeConstructrSpecConsulMultiJvmNode4 extends MultiNodeConstructrSpecConsul
class MultiNodeConstructrSpecConsulMultiJvmNode5 extends MultiNodeConstructrSpecConsul

abstract class MultiNodeConstructrSpecConsul[A: AddressSerialization] extends MultiNodeSpec(ConstructrMultiNodeConfigConsul)
with FreeSpecLike with Matchers with BeforeAndAfterAll {
import ConstructrMultiNodeConfig._
import RequestBuilding._
import system.dispatcher

implicit val mat = ActorMaterializer()

"Constructr should manage an Akka cluster" in {
runOn(nodes.head) {
"docker rm -f constructr-consul".!(ProcessLogger(_ => ()))
s"""docker run -d -p 8400:8400 -p 8500:8500 -p 8600:53/udp --name constructr-consul progrium/consul -server -bootstrap""".!

within(20.seconds.dilated) {
awaitAssert {
val consulStatus = Await.result(
Http().singleRequest(Delete(s"http://$host:8500/v1/kv/constructr/akka?recurse")).map(_.status),
5.seconds.dilated
)
consulStatus should (be(OK) or be(NotFound))
}
}
}

enterBarrier("consul-started")

ConstructrExtension(system)
val listener = actor(new Act {
var isMember = false
Cluster(context.system).subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.MemberUp])
become {
case ClusterEvent.MemberUp(member) if member.address == Cluster(context.system).selfAddress => isMember = true
case _ => sender() ! isMember
}
})
within(20.seconds.dilated) {
awaitAssert {
implicit val timeout = Timeout(1.second.dilated)
val isMember = Await.result((listener ? "isMember").mapTo[Boolean], 1.second.dilated)
isMember shouldBe true
}
}

enterBarrier("cluster-formed")

within(5.seconds.dilated) {
awaitAssert {
val constructrNodes = Await.result(
Http()
.singleRequest(Get(s"http://$host:8500/v1/kv/constructr/akka/MultiNodeConstructrSpecConsul/nodes/?recurse"))
.flatMap(resp => Unmarshal(resp).to[String].map(toNodes)),
1.second.dilated
)
nodes.to[Set].map(_.name).foreach(node => constructrNodes.toString should include(node))
}
}

enterBarrier("done")

runOn(nodes.head) {
"docker rm -f constructr-consul".!
}
}

private def toNodes(s: String) = {
import rapture.json._
import rapture.json.jsonBackends.spray._
def jsonToNode(json: Json) = {
implicitly[AddressSerialization[A]].fromBytes(decode(json.Key.as[String].substring("constructr/akka/MultiNodeConstructrSpecConsul/nodes/".length)))
}
Json.parse(s).as[List[Json]].map(jsonToNode)
}

override def initialParticipants = roles.size

override protected def beforeAll() = {
super.beforeAll()
multiNodeSpecBeforeAll()
}

override protected def afterAll() = {
multiNodeSpecAfterAll()
super.afterAll()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2015 Heiko Seeberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package de.heikoseeberger.constructr.coordination

import akka.http.scaladsl.client.RequestBuilding.{ Get, Put }
import akka.http.scaladsl.model.StatusCodes.{ NotFound, OK }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, ResponseEntity, Uri }
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }

final class ConsulCoordination(prefix: String, clusterName: String, host: String, port: Int, send: HttpRequest => Future[HttpResponse])
extends Coordination[Coordination.Backend.Consul.type](prefix, clusterName, host, port, send) {
import Coordination._

private val v1Uri = Uri(s"http://$host:$port/v1")

private val kvUri = v1Uri.withPath(v1Uri.path / "kv")

private val baseUri = kvUri.withPath(kvUri.path / "constructr" / prefix / clusterName)

private val nodesUri = baseUri.withPath(baseUri.path / "nodes")

override def getNodes[A: AddressSerialization]()(implicit ec: ExecutionContext, mat: Materializer): Future[List[A]] = {
def unmarshalNodes(entity: ResponseEntity) = {
def toNodes(s: String) = {
import rapture.json._
import rapture.json.jsonBackends.spray._
def jsonToNode(json: Json) = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More nitpicks ;-)
These curly braces aren't needed. Move the next line up, just after the equals sign.

val init = nodesUri.path.toString.stripPrefix(kvUri.path.toString)
val key = json.Key.as[String].substring(init.length)
implicitly[AddressSerialization[A]].fromBytes(decode(key))
}
Json.parse(s).as[List[Json]].map(jsonToNode)
}
Unmarshal(entity).to[String].map(toNodes)
}
val nodesUriRecursiveUri = nodesUri.withQuery(Uri.Query("recurse"))
send(Get(nodesUriRecursiveUri)).flatMap {
case HttpResponse(OK, _, entity, _) => unmarshalNodes(entity)
case HttpResponse(NotFound, _, entity, _) => ignore(entity).map(_ => Nil)
case HttpResponse(other, _, entity, _) => ignore(entity).map(_ => throw UnexpectedStatusCode(other))
}
}

override def lock(ttl: Duration)(implicit ec: ExecutionContext, mat: Materializer): Future[LockResult] = {
val responseAndSession = for {
sessionId <- createSession(ttl)
response <- send(Put(lockUri(sessionId)))
} yield (response, sessionId)
responseAndSession.flatMap {
case (HttpResponse(OK, _, entity, _), _) => {
Unmarshal(entity).to[String]
.map(_.toBoolean)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to directly unmarshal to[Boolean]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that but at a price. We need a compatible Unmarshallerin scope. I'd define this at the beginning of the class:

import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers._
...
implicit def booleanUnmarshaller: FromEntityUnmarshaller[Boolean] = stringUnmarshaller.map(_.toBoolean)

What do you prefer?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I was just hoping that there were a default FromEntityUnmarshaller[Boolean] in scope. Forget it ;-)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, I didn't read the first line. So there is a predfined unmarshaller for Boolean? Then use it (via import).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, there isn't. The import brings an stringUnmarshaller. We'd have to add a booleanUnmarshaller mapping over the previous one. IMHO, it's not worth it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misunderstood. I agree, keep the existing code.

.map(locked => if (locked) LockResult.Success else LockResult.Failure)
}
case (HttpResponse(other, _, entity, _), _) =>
ignore(entity).map(_ => throw UnexpectedStatusCode(other))
}
}

override def addSelf[A: AddressSerialization](self: A, ttl: Duration)(implicit ec: ExecutionContext, mat: Materializer) = {
val responseAndSession = for {
sessionId <- createSession(ttl)
response <- send(Put(addOrRefreshUri(sessionId, self)))
} yield (response, sessionId)
responseAndSession.flatMap {
case (HttpResponse(OK, _, entity, _), sessionId) => ignore(entity).map(_ => SelfAdded[Coordination.Backend.Consul.type](sessionId))
case (HttpResponse(other, _, entity, _), _) => ignore(entity).map(_ => throw UnexpectedStatusCode(other))
}
}

override def refresh[A: AddressSerialization](self: A, ttl: Duration, context: String)(implicit ec: ExecutionContext, mat: Materializer) = {
send(Put(renewSessionUri(context))).flatMap {
case HttpResponse(OK, _, entity, _) => ignore(entity).map(_ => Refreshed[Coordination.Backend.Consul.type](context))
case HttpResponse(other, _, entity, _) => ignore(entity).map(_ => throw UnexpectedStatusCode(other))
}
}

override def initialBackendContext = ""

private def lockUri(sessionId: String) = baseUri
.withPath(baseUri.path / "lock")
.withQuery(Uri.Query("acquire" -> sessionId))

private def addOrRefreshUri[A: AddressSerialization](sessionId: String, self: A) = nodesUri
.withPath(nodesUri.path / encode(implicitly[AddressSerialization[A]].toBytes(self)))
.withQuery(Uri.Query("acquire" -> sessionId))

private def renewSessionUri(sessionId: String) = kvUri.withPath(kvUri.path / "session" / "renew" / sessionId)

private def unmarshalSession(entity: ResponseEntity)(implicit ec: ExecutionContext, mat: Materializer) = {
def toSession(s: String) = {
import rapture.json._
import rapture.json.jsonBackends.spray._
Json.parse(s).ID.as[String]
}
Unmarshal(entity).to[String].map(toSession)
}

private def createSession(ttl: Duration)(implicit ec: ExecutionContext, mat: Materializer) = {
val createSessionUri = v1Uri
.withPath(v1Uri.path / "session" / "create")
.withQuery(Uri.Query("Behaviour" -> "delete", "ttl" -> toSeconds(ttl)))
send(Put(createSessionUri)).flatMap {
case HttpResponse(OK, _, entity, _) =>
unmarshalSession(entity)
case HttpResponse(other, _, entity, _) =>
ignore(entity).map(_ => throw UnexpectedStatusCode(other))
}
}

private def toSeconds(ttl: Duration) = (ttl.toSeconds + 1).toString
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ object Coordination {
case object Etcd extends Backend {
override type Context = None.type
}
case object Consul extends Backend {
type SessionId = String
override type Context = SessionId
}
}

trait AddressSerialization[A] {
Expand All @@ -52,7 +56,8 @@ object Coordination {

def apply[B <: Coordination.Backend](backend: Backend)(prefix: String, clusterName: String, host: String, port: Int, send: HttpRequest => Future[HttpResponse]): Coordination[B] =
backend match {
case Backend.Etcd => new EtcdCoordination(prefix, clusterName, host, port, send).asInstanceOf[Coordination[B]]
case Backend.Etcd => new EtcdCoordination(prefix, clusterName, host, port, send).asInstanceOf[Coordination[B]]
case Backend.Consul => new ConsulCoordination(prefix, clusterName, host, port, send).asInstanceOf[Coordination[B]]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ final class ConstructrMachine[A: Coordination.AddressSerialization, B <: Coordin
}

when(State.AddingSelf, coordinationTimeout) {
case Event(Coordination.SelfAdded(_), _) => goto(State.RefreshScheduled).using(stateData.copy(coordinationRetriesLeft = coordinationRetries))
case Event(Coordination.SelfAdded(context: B#Context @unchecked), _) =>
goto(State.RefreshScheduled).using(stateData.copy(
coordinationRetriesLeft = coordinationRetries,
context = context
))
}

// RefreshScheduled
Expand All @@ -158,6 +162,10 @@ final class ConstructrMachine[A: Coordination.AddressSerialization, B <: Coordin

// Refreshing

onTransition {
case _ -> State.Refreshing => coordination.refresh(selfAddress, addOrRefreshTtl, stateData.context).pipeTo(self)
}

when(State.Refreshing, coordinationTimeout) {
case Event(Coordination.Refreshed(_), _) => goto(State.RefreshScheduled).using(stateData.copy(coordinationRetriesLeft = coordinationRetries))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think that this part is necessary in order to deal with the refreshing stage taking into account the backend context.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, was that really missing? Thanks for catching! We need ways better tests to avoid such mistakes.

Expand Down