Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Add memory queue for the new scheduler #5110

Merged
merged 22 commits into from
Aug 30, 2021

Conversation

style95
Copy link
Member

@style95 style95 commented May 7, 2021

Description

This PR adds one of the core implementations of the new scheduler.
This queue is dynamically created for each action and buffer activations in an internal queue.
It provisions more containers according to the number of activations in the queue.
This process is called "scheduling" and it is handled by another component SchedulingDecisionMaker.

The queue communicates with ContainerManager to add more containers.
ContainerManager selects proper invokers and sends container creation messages to them.

This queue is designed in a way that activations are not persistent.
But it could be improved in the future by taking a similar approach with what Kafka does.

My changes affect the following components

  • API
  • Controller
  • Message Bus (e.g., Kafka)
  • Loadbalancer
  • Scheduler
  • Invoker
  • Intrinsic actions (e.g., sequences, conductors)
  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • CLI
  • General tooling
  • Documentation

Types of changes

  • Bug fix (generally a non-breaking change which closes an issue).
  • Enhancement or new feature (adds new functionality).
  • Breaking change (a bug fix or enhancement which changes existing behavior).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • My changes require further changes to the documentation.
  • [] I updated the documentation where necessary.

@style95 style95 changed the title Add memory queue for the new scheduler [New Scheduler] Add memory queue for the new scheduler May 7, 2021
@style95 style95 self-assigned this May 7, 2021
@style95
Copy link
Member Author

style95 commented May 7, 2021

According to the order defined in the wiki, this component requires other components to be merged.
https://cwiki.apache.org/confluence/display/OPENWHISK/Component+Design

But there is no big difference in the memory queue logic, so I opened it for reviews.

Since this logic is the core logic and changes on this component are highly likely to introduce huge impacts on the system, I will write down a comprehensive design document.

Once dependent modules are merged into the master branch, I would rebase this PR.

*
* @param maxSize the maximum size of the buffer
*/
class AverageRingBuffer(private val maxSize: Int) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This circular buffer is used to calculate the average execution time of recent N activations for a given action.

Copy link
Contributor

@bdoyle0182 bdoyle0182 Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the reasoning for picking average as the heuristic. Would median be a better heuristic here? i.e. if all activations take 100 milliseconds and then one activation has a slow call to a db that takes 10 seconds it will heavily skew the average.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense.
The ring size in our case was relatively small such as 10, so we calculated the average of recent 10 activations. Even if there was a skew at some point, it quickly gets back to the "normal" average if the slow activation is a transient issue.
Also, it only affects the timing to add more containers, and activations are still being processed by existing containers. So there was no critical impact.

Basically, the median would be better, but the average is a much simpler and cheaper solution so I chose it.

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class SchedulingDecisionMaker(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This component decides whether to add more containers or not for the given action.
It uses the average execution time of an action.

There are three cases to calculate the average execution.

  1. Initial execution.
    In this case, we just assume the execution time as tick time(100ms by default).

  2. Queue is newly created.
    In this case, activations exist in the DB, but no data in the AverageRingBuffer.
    So we fetch the average execution time from DB.

  3. An action is continuously executed.
    Each container sends the last execution time along with FetchRequest when it pulls a new activation from a queue.
    And our scheduler keeps N execution times in the AverageRingBuffer.
    So we can easily calculate the average execution time of recent N executions.

@codecov-commenter
Copy link

codecov-commenter commented May 7, 2021

Codecov Report

Merging #5110 (6661b33) into master (bf62f74) will increase coverage by 30.72%.
The diff coverage is 80.62%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #5110       +/-   ##
===========================================
+ Coverage   43.88%   74.60%   +30.72%     
===========================================
  Files         231      234        +3     
  Lines       12807    13381      +574     
  Branches      528      513       -15     
===========================================
+ Hits         5620     9983     +4363     
+ Misses       7187     3398     -3789     
Impacted Files Coverage Δ
...rg/apache/openwhisk/common/AverageRingBuffer.scala 27.27% <27.27%> (ø)
...core/scheduler/queue/SchedulingDecisionMaker.scala 73.17% <73.17%> (ø)
...e/openwhisk/core/scheduler/queue/MemoryQueue.scala 83.73% <83.73%> (ø)
...in/scala/org/apache/openwhisk/common/Logging.scala 78.99% <100.00%> (+6.53%) ⬆️
.../scala/org/apache/openwhisk/core/WhiskConfig.scala 95.65% <100.00%> (ø)
.../org/apache/openwhisk/core/entity/CreationId.scala 60.00% <100.00%> (+37.77%) ⬆️
...cala/org/apache/openwhisk/http/ErrorResponse.scala 88.00% <100.00%> (+20.65%) ⬆️
...whisk/connector/kafka/KafkaProducerConnector.scala 55.00% <0.00%> (-32.50%) ⬇️
...whisk/connector/kafka/KafkaConsumerConnector.scala 59.15% <0.00%> (-22.54%) ⬇️
...pache/openwhisk/core/invoker/InvokerReactive.scala 57.75% <0.00%> (-21.56%) ⬇️
... and 142 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bf62f74...6661b33. Read the comment docs.

@ningyougang
Copy link
Contributor

It is better to add below configuration to scheduler's application.conf, e.g.

    queue {
      idle-grace = "20 seconds"
      stop-grace = "20 seconds"
      flush-grace = "60 seconds"
      graceful-shutdown-timeout = "5 seconds"
      max-retention-size = "10000"
      max-retention-ms = "60000"
      throttling-fraction = "0.9"
      duration-buffer-size = "10"
    }

Or when open another new pr which include ansible/roles/schedulers/tasks/deploy.yml, add there.

.travis.yml Outdated
@@ -27,6 +27,7 @@ env:
global:
- ANSIBLE_CMD="ansible-playbook -i environments/local -e docker_image_prefix=testing"
- GRADLE_PROJS_SKIP=""
- OW_SCALA_VERSION=2.13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scala compilation version was not the culprit.

@ningyougang
Copy link
Contributor

Need rebase.

case Event(msg: DecisionResults, _) =>
val DecisionResults(result, num) = msg
result match {
case AddInitialContainer if num > 0 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be if num = 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, there is no case where SchedulingDecisionMaker sends AddInitialContainer with num == 0?

takeUncompletedRequest()
.map { res =>
res.trySuccess(Right(msg))
in -= 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this isn't thread safe if takeUncompletedRequest is a promise

}
}

private def addServersIfPossible(existing: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called addContainersIfPossible

@bdoyle0182
Copy link
Contributor

bdoyle0182 commented Jun 23, 2021

This is probably the most complicated PR, but did the best I could to understand at a high level I think LGTM apart from my comments.

One comment on the persistence nature of the queue. I think queueing with kafka doesn't add anything to openwhisk if openwhisk does not provide at least once guarantee of activation execution. It's unnecessary overhead to queue up through kafka if the activation can get lost elsewhere anyways.

@style95
Copy link
Member Author

style95 commented Jul 12, 2021

One comment on the persistence nature of the queue. I think queueing with kafka doesn't add anything to openwhisk if openwhisk does not provide at least once guarantee of activation execution. It's unnecessary overhead to queue up through kafka if the activation can get lost elsewhere anyways.

Agree.

I think the immediate next step could be removing Kafka from the critical path completely as currently, container creation messages and activation messages from controller to scheduler are delivered through Kafka.

This queue is designed in a way that activations are not persistent.
But it could be improved in the future by taking a similar approach with what Kafka does.

I mentioned above that if we really need to support the at-least-once semantic, we can consider this approach.

@style95
Copy link
Member Author

style95 commented Jul 16, 2021

After rebasing this PR, it started failing to run compile compileScoverageScala.
It seems it works well with scala 2.13 but fails with 2.12.
Not quite sure the reason yet.

@style95
Copy link
Member Author

style95 commented Aug 21, 2021

It’s ready to merge.

@style95
Copy link
Member Author

style95 commented Aug 27, 2021

I will merge this in 48 hours.

@style95 style95 merged commit cf36299 into apache:master Aug 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants