Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first stage of support for dynamic invoker id assignment #2689

Merged
merged 1 commit into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
-e PORT='8080'
-e KAFKA_HOST='{{ groups['kafka']|first }}'
-e KAFKA_HOST_PORT='{{ kafka.port }}'
-e REDIS_HOST='{{ groups['redis'] | first }}'
-e REDIS_HOST_PORT='{{ redis.port }}'
-e DB_PROTOCOL='{{ db_protocol }}'
-e DB_PROVIDER='{{ db_provider }}'
-e DB_HOST='{{ db_host }}'
Expand All @@ -140,6 +142,7 @@
-e INVOKER_NUMCORE='{{ invoker.numcore }}'
-e INVOKER_CORESHARE='{{ invoker.coreshare }}'
-e INVOKER_USE_RUNC='{{ invoker.useRunc }}'
-e INVOKER_NAME='{{ groups['invokers'].index(inventory_hostname) }}'
-e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
-v /sys/fs/cgroup:/sys/fs/cgroup
-v /run/runc:/run/runc
Expand Down
2 changes: 2 additions & 0 deletions ansible/templates/whisk.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}

edge.host={{ groups["edge"]|first }}
kafka.host={{ groups["kafka"]|first }}
redis.host={{ groups["redis"]|first }}
router.host={{ groups["edge"]|first }}
zookeeper.host={{ groups["kafka"]|first }}
invoker.hosts={{ groups["invokers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
Expand All @@ -51,6 +52,7 @@ edge.host.apiport=443
zookeeper.host.port={{ zookeeper.port }}
kafka.host.port={{ kafka.port }}
kafkaras.host.port={{ kafka.ras.port }}
redis.host.port={{ redis.port }}
invoker.hosts.baseport={{ invoker.port }}

controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
Expand Down
1 change: 1 addition & 0 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
compile 'ch.qos.logback:logback-classic:1.2.3'
compile 'org.slf4j:jcl-over-slf4j:1.7.25'
compile 'org.slf4j:log4j-over-slf4j:1.7.25'
compile 'net.debasishg:redisclient_2.11:3.4'
compile 'commons-codec:commons-codec:1.9'
compile 'commons-io:commons-io:2.4'
compile 'commons-collections:commons-collections:3.2.2'
Expand Down
7 changes: 7 additions & 0 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
val invokerNumCore = this(WhiskConfig.invokerNumCore)
val invokerCoreShare = this(WhiskConfig.invokerCoreShare)
val invokerUseRunc = this.getAsBoolean(WhiskConfig.invokerUseRunc, true)
val invokerName = this(WhiskConfig.invokerName)

val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(
WhiskConfig.wskApiPort)
Expand All @@ -74,6 +75,8 @@ class WhiskConfig(requiredProperties: Map[String, String],

val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
val redisHostName = this(WhiskConfig.redisHostName)
val redisHostPort = this(WhiskConfig.redisHostPort)

val edgeHostName = this(WhiskConfig.edgeHostName)

Expand Down Expand Up @@ -190,6 +193,7 @@ object WhiskConfig {
val invokerNumCore = "invoker.numcore"
val invokerCoreShare = "invoker.coreshare"
val invokerUseRunc = "invoker.use.runc"
val invokerName = "invoker.name"

val wskApiProtocol = "whisk.api.host.proto"
val wskApiPort = "whisk.api.host.port"
Expand All @@ -205,16 +209,19 @@ object WhiskConfig {

val kafkaHostName = "kafka.host"
private val zookeeperHostName = "zookeeper.host"
val redisHostName = "redis.host"

private val edgeHostApiPort = "edge.host.apiport"
val kafkaHostPort = "kafka.host.port"
val redisHostPort = "redis.host.port"
private val zookeeperHostPort = "zookeeper.host.port"

val invokerHostsList = "invoker.hosts"

val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
val invokerHosts = Map(invokerHostsList -> null)
val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null)
val redisHost = Map(redisHostName -> null, redisHostPort -> null)

val runtimesManifest = "runtimes.manifest"

Expand Down
44 changes: 40 additions & 4 deletions core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure

import com.redis.RedisClient

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import whisk.common.AkkaLogging
Expand Down Expand Up @@ -48,19 +50,18 @@ object Invoker {
WhiskEntityStore.requiredProperties ++
WhiskActivationStore.requiredProperties ++
kafkaHost ++
redisHost ++
wskApiHost ++ Map(
dockerImageTag -> "latest",
invokerNumCore -> "4",
invokerCoreShare -> "2",
invokerContainerPolicy -> "",
invokerContainerDns -> "",
invokerContainerNetwork -> null,
invokerUseRunc -> "true")
invokerUseRunc -> "true") ++
Map(invokerName -> null)

def main(args: Array[String]): Unit = {
require(args.length == 1, "invoker instance required")
val invokerInstance = InstanceId(args(0).toInt)

implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
Expand All @@ -86,6 +87,41 @@ object Invoker {
abort()
}

val proposedInvokerId: Option[Int] = args.headOption.map(_.toInt)
val assignedInvokerId = proposedInvokerId
.map { id =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just a foreach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, get's getOrElsed below. Unfortunately, Option has no method to just apply a side-effect, passing the original value through (like andThen on Futures)

logger.info(this, s"invokerReg: using proposedInvokerId ${id}")
id
}
.getOrElse {
val invokerName = config.invokerName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can invokerName come from an separate environment variable or arg or system property? I think it would be possible from typesafe config, but don't think so with whisk Config. In our case, since the ID is an arg we can generate it with a script, but it cannot be the IP address directly since it is not an Int, or easily translated to an Int that is "small". It would be better to use the invokerName, and leverage your approach, but we cannot do that with config.invokerName afaik without designating a name unique to each instance.

This may be more of a problem of using IndexedSeq[Int] in the controller with padding where a large value for proposedInvokerId will cause havoc due to padding of invoker registration for each id lower than the proposed.

It would be great to document this treatment of instanceid (I couldn't find any) to let devs know that using "any Int" is not acceptable; or else it would be good to have a workaround, I may be doing it wrong - we hoped to use an int derived from IP number, but the result is that, for example, the controller/invokers endpoint displays as massive quantity of invokers, all of which are offline, due to the padding. I'm not sure if it would be possible to change this type from Int to String in controllers invoker registration logic, but that would be one way to solve it, if using a referenced env var (e.g. INVOKER_NAME=${SOME_VAR}) for invoker name is not possible.

Copy link
Member

@rabbah rabbah Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am reading the pr correctly the name and ordinal assigned as the id for load balancing are completely decoupled. The name could be any value/string. The increments through redis make sure you have a monotonically increasing value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is about whether config.invokerName has to be statically assigned as an environment variable INVOKER_NAME=MyInvokerName, or if it can be derived from either a system property or other environment variable, e.g. INVOKER_NAME=${SOME_VALUE} (where SOME_VALUE is another variable). Or, if it was possible to pass it as an arg, that would also be a way to use $SOME_VALUE, but currently it isn't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Thanks for the clarification. My understanding is as long as you can generate a stable name (across restarts) it should no longer matter. The reason the name has to be stable is for the redis negotiation. Every new name would bump the count otherwise. So I think the naming could be generalized accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering fi a separate redis client to get the Invoker Id is an overkill. Isn't just keeping a circular buffer of " InvokerIds& Names" in the Controller sufficient to assign a new id to the Invoker. the same Circular buffer can log data from the Health stats and registration status. "Number of Invokers" per Controller can be set in Limits and higher-level scaling with HA replicated controllers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tysonnorris re config.invokerName, this is being read from the environment variable INVOKER_NAME (@rabbah had asked me to not read sys.env directly, but instead access the container's environment through the WhiskConfig object). Does this work for your scenario?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ServoKvd, I think I failed to actually post something about possible followons to this PR. As Rodric said, this PR was meant as a small step to allow us to decouple the invoker name from the kafka topic that the invoker is assigned to service. Useful for some of the kube work and to give some flexibility in deployment. In the future the decoupling could allow a fancier load balancer that could more dynamically assign invokers to topics and also support an elastic pool of invokers that could grow and shrink depending on overall system load. All details still to be worked out 😁

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgrove-oss unfortunately setting INVOKER_NAME doesn't work in our case, since the env value will be the same for each instance (when running via mesos/marathon). If it was possible to reference another env value as the value for INVOKER_NAME, it would work, e.g. INVOKER_NAME=${SOME_OTHER_ENV}, since there will be a few of these values set per instance automatically (but cannot be used to explicitly set some var unique per instance).

I think a bigger change would be required to support this for our case, like either use TypeSafe Config (instead of whisk Config), or arg flags (e.g. you can specify either id OR name as an arg), or some extension for looking up the name that may be appropriate for particular environments (e.g. in mesos we could lookup a name from some other env vars; other envs may have other ways of associating names, etc).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tysonnorris thanks; I understand the mesos/marathon constraints now. They are different than kube 😞 Would it be acceptable for me to generalize in a followup PR? I have to admit I'd like to get this one merged without more iterations so we can unblock the DaemonSet PR on the kube deploy project.

I don't think it would be that hard for me to do a followup PR to allow a combination of arg flags and environment variables to be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgrove-oss thanks, I understand. We can work with the current requirement to assign instance id, it is just very clunky :) If you get to a followup for the other combinations I'd be happy to test it.

val redisClient = new RedisClient(config.redisHostName, config.redisHostPort.toInt)
val assignedId = redisClient
.hget("controller:registar:idAssignments", invokerName)
.map { oldId =>
logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}")
oldId.toInt
}
.getOrElse {
// If key not present, incr initializes to 0 before applying increment.
// Convert from 1-based to 0-based invokerIds by subtracting 1 from incr's result
val newId = redisClient
.incr("controller:registrar:nextInvokerId")
.map { id =>
id.toInt - 1
}
.getOrElse {
logger.error(this, "Failed to increment invokerId")
abort()
}
redisClient.hset("controller:registar:idAssignments", invokerName, newId)
logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}")
newId
}
redisClient.quit
assignedId
}
val invokerInstance = InstanceId(assignedInvokerId);
val msgProvider = SpiLoader.get[MessagingProvider]
val producer = msgProvider.getProducer(config, ec)
val invoker = new InvokerReactive(config, invokerInstance, producer)
Expand Down