Skip to content

Commit

Permalink
First stage of adding support for dynamic invoker id assignment
Browse files Browse the repository at this point in the history
Add a handshake between the invoker and controller during invoker
startup to allow the controller an opportunity to dynamically assign
the invokerId (the topic to which the invoker listens to accept
work).  The invoker provides a UUID and optionally a suggested
invokerId to the controller via the invokerReg topic of the messaging
provider. The controller responds with an assigned invokerID on a
topic named by the UUID.  After receiving its invokerId, the rest of
invoker startup proceeds as before.

To support different deployment scenarios, both the UUID and the
suggested invokerId may be provided externally to the invoker. This
can supports options such as the current completely static assignment
of invokerIds (which is the default for this PR), as well as dynamic,
but stable, invokerId assignments via a stable UUID per invoker VM,
and purely dynamic assignments via randomUUIDs per dynamic invoker
instance.
  • Loading branch information
dgrove-oss committed Sep 11, 2017
1 parent 5f84bb9 commit 20f11b0
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 5 deletions.
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ kafka:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
retentionMS: 172800000
invokerReg:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_invokerReg_retentionBytes | default(1073741824) }}"
retentionMS: 3600000

zookeeper:
version: 3.4
Expand Down
1 change: 1 addition & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
-e INVOKER_CONTAINER_DNS='{{ invoker_container_network_dns_servers | default()}}'
-e INVOKER_NUMCORE='{{ invoker.numcore }}'
-e INVOKER_CORESHARE='{{ invoker.coreshare }}'
-e INVOKER_ASSIGNED_UUID='{{ groups['invokers'].index(inventory_hostname) }}'
-e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
-v /sys/fs/cgroup:/sys/fs/cgroup
-v /run/runc:/run/runc
Expand Down
6 changes: 6 additions & 0 deletions ansible/roles/kafka/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
- health
- cacheInvalidation

- name: create the invoker reg topic
shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic invokerReg --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.invokerReg.retentionBytes }} --config retention.ms={{ kafka.topics.invokerReg.retentionMS }} --config segment.bytes={{ kafka.topics.invokerReg.segmentBytes }}'"
register: command_result
failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
changed_when: "'Created topic' in command_result.stdout"

- name: create the active-ack topics
shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic completed{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.completed.retentionBytes }} --config retention.ms={{ kafka.topics.completed.retentionMS }} --config segment.bytes={{ kafka.topics.completed.segmentBytes }}'"
with_indexed_items: "{{ groups['controllers'] }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ object TransactionId {
val loadbalancer = TransactionId(-120) // Loadbalancer thread
val invokerHealth = TransactionId(-121) // Invoker supervision
val controller = TransactionId(-130) // Controller startup
val registrar = TransactionId(-140) // Invoker registration

def apply(tid: BigDecimal): TransactionId = {
Try {
Expand Down
18 changes: 18 additions & 0 deletions common/scala/src/main/scala/whisk/core/connector/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,21 @@ object PingMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(PingMessage.apply _, "name")
}

case class InvokerIdRequestMessage(uuid: String, proposedId: Int) extends Message {
override def serialize = InvokerIdRequestMessage.serdes.write(this).compactPrint
}

object InvokerIdRequestMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(InvokerIdRequestMessage.apply _, "uuid", "proposedId")
}

case class InvokerIdResponseMessage(uuid: String, assignedId: Int) extends Message {
override def serialize = InvokerIdResponseMessage.serdes.write(this).compactPrint
}

object InvokerIdResponseMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(InvokerIdResponseMessage.apply _, "uuid", "assignedId")
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,70 @@ object InvokerActor {

val timerName = "testActionTimer"
}

/*
* Actor to handle invoker registration
*/
class Registrar(sendIdToInvoker: (InvokerIdResponseMessage) => Future[RecordMetadata],
registerConsumer: MessageConsumer)
extends Actor {

implicit val transid = TransactionId.registrar
implicit val logging = new AkkaLogging(context.system.log)
implicit val timeout = Timeout(5.seconds)
implicit val ec = context.dispatcher

// State of the actor. It's important not to close over these
// references directly, so they don't escape the Actor.
var nextInvokerId = 0
val invokerUUIDToId = mutable.Map[String, Int]()

def receive = {
case m: InvokerIdRequestMessage => {
val assignedId = invokerUUIDToId.getOrElseUpdate(
m.uuid,
// haven't seen this invoker UUID yet; need to assign it an invokerID
if (m.proposedId == -1) {
// dynamic id assignment protocol
val id = nextInvokerId
nextInvokerId += 1
id
} else {
// static id assignment prototcol -- always accept proposedId
m.proposedId
})
val response = InvokerIdResponseMessage(m.uuid, assignedId)
sendIdToInvoker(response)
}
}

/** Receive invokerReg messages from invokers. */
val regPollDuration = 1.second
val regFeed = context.system.actorOf(Props {
new MessageFeed(
"invokerReg",
logging,
registerConsumer,
registerConsumer.maxPeek,
regPollDuration,
processInvokerReg,
logHandoff = false)
})

def processInvokerReg(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
InvokerIdRequestMessage.parse(raw) match {
case Success(r: InvokerIdRequestMessage) =>
self ! r
regFeed ! MessageFeed.Processed

case Failure(t) =>
regFeed ! MessageFeed.Processed
logging.error(this, s"failed processing message: $raw with $t")
}
}
}

object Registrar {
def props(p: (InvokerIdResponseMessage) => Future[RecordMetadata], c: MessageConsumer) = Props(new Registrar(p, c))
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
import whisk.core.connector.{ActivationMessage, CompletionMessage}
import whisk.core.connector.MessagingProvider
import whisk.core.connector.{ActivationMessage, CompletionMessage, InvokerIdResponseMessage}
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
import whisk.core.connector.MessagingProvider
Expand Down Expand Up @@ -266,6 +267,28 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
}
}

/**
* Subscribes to invokerReg (registration requests from newly started invokers),
* and handles requests by sending back the assigned invokerId to the waiting invoker.
*/
private val invokerRegConsumer =
messagingProvider.getConsumer(config, s"invokerReg${instance.toInt}", "invokerReg", maxPeek = 128)
val registrar = actorSystem.actorOf(Registrar.props((m) => sendIdToInvoker(messageProducer, m), invokerRegConsumer))

private def sendIdToInvoker(producer: MessageProducer, msg: InvokerIdResponseMessage): Future[RecordMetadata] = {
implicit val transid = msg.transid

val topic = s"invokerReg-${msg.uuid}"
val start =
transid.started(this, LoggingMarkers.CONTROLLER_KAFKA, s"posting topic '$topic' with id '${msg.assignedId}'")

producer.send(topic, msg).andThen {
case Success(status) =>
transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]")
case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
}
}

/** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)

Expand Down
42 changes: 38 additions & 4 deletions core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package whisk.core.invoker

import java.nio.charset.StandardCharsets

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.{Failure, Success}

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
Expand All @@ -28,14 +30,17 @@ import whisk.common.Scheduler
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
import whisk.core.connector.MessagingProvider
import whisk.core.connector.InvokerIdResponseMessage
import whisk.core.connector.PingMessage
import whisk.core.entity.ExecManifest
import whisk.core.entity.InstanceId
import whisk.core.entity.UUID
import whisk.core.entity.WhiskActivationStore
import whisk.core.entity.WhiskEntityStore
import whisk.http.BasicHttpService
import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
import whisk.core.connector.InvokerIdRequestMessage

object Invoker {

Expand All @@ -57,9 +62,6 @@ object Invoker {
invokerContainerNetwork -> null)

def main(args: Array[String]): Unit = {
require(args.length == 1, "invoker instance required")
val invokerInstance = InstanceId(args(0).toInt)

implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
Expand Down Expand Up @@ -87,6 +89,38 @@ object Invoker {

val msgProvider = SpiLoader.get[MessagingProvider]
val producer = msgProvider.getProducer(config, ec)

val invokerUUID = sys.env.getOrElse("INVOKER_ASSIGNED_UUID", UUID().toString)
val proposedInvokerId = if (args.length == 1) { args(0).toInt } else { -1 }
val assignedInvokerId = {
producer.send("invokerReg", InvokerIdRequestMessage(invokerUUID, proposedInvokerId)).andThen {
case Success(_) => {
logger.info(this, s"sent invokerReg message with UUID ${invokerUUID} and proposedId ${proposedInvokerId}")
}
case Failure(t) => {
logger.error(this, s"failed to send invokerReg message: $t")
abort()
}
}
val topic = s"invokerReg-${invokerUUID}"
val consumer = msgProvider.getConsumer(config, topic, topic)
val records = consumer.peek(30.second).toSeq
if (records.length < 1) {
logger.error(this, s"Failed to get a response from controller on expected topic: ${topic}")
abort()
}
val (_, _, _, bytes) = records(0)
val raw = new String(bytes, StandardCharsets.UTF_8)
InvokerIdResponseMessage.parse(raw) match {
case Success(r: InvokerIdResponseMessage) =>
logger.info(this, s"controller assigned id ${r.assignedId}")
r.assignedId
case Failure(t) =>
logger.error(this, s"failed processing message: $raw with $t")
abort()
}
}
val invokerInstance = InstanceId(assignedInvokerId);
val invoker = new InvokerReactive(config, invokerInstance, producer)

Scheduler.scheduleWaitAtMost(1.seconds)(() => {
Expand Down

0 comments on commit 20f11b0

Please sign in to comment.