Add jobs orchestrator to controlapi integration tests
Adds the beginnings of the integration tests between the controlapi and
the jobs orchestrator. These tests don't actually check much more than
creation right now, as update handling for the jobs orchestrator is
pending, but further tests will be able to leverage the groundwork here
to a high degree.

Signed-off-by: Drew Erny <>
dperny committed Oct 15, 2019
package jobs

import (
. ""
. ""



stateutils ""

var _ = Describe("Integration between the controlapi and jobs orchestrator", func() {
// These tests verify that the control api and the jobs orchestrator
// cooperate and work correctly.

var (
// the object store is shared between the controlapi server and the
// orchestrator. makes an end-run around the manager, allowing us to
// avoid having to start the whole manager apparatus to test this one
// integration point
s *store.MemoryStore
o *Orchestrator

client api.ControlClient

// we need all of these variables captured so that we can close them
// out in AfterEach
server *controlapi.Server
grpcServer *grpc.Server
conn *grpc.ClientConn
tempUnixSocket string

// serverDone is a channel that is closed when the goroutine running
// the server exits.
serverDone <-chan struct{}
// orchestratorDone is likewise closed when the orchestrator exits
orchestratorDone <-chan struct{}

BeforeEach(func() {
// By statements are just extra documentation of what's going on, no
// functional difference besides some test log output
By("setting up the controlapi client and server")
// a lot of this setup is taken from manager/controlapi/server_test.go,
// but it's unexported. in any case, re-writing it in ginkgo isn't
// unwarranted.
s = store.NewMemoryStore(&stateutils.MockProposer{})
server = controlapi.NewServer(s, nil, nil, nil, nil)

// we need a temporary unix socket to server on
temp, err := ioutil.TempFile("", "test-socket")
// this is probably to make sure that the socket can be created
// successfully.

tempUnixSocket = temp.Name()
lis, err := net.Listen("unix", tempUnixSocket)

grpcServer = grpc.NewServer()
api.RegisterControlServer(grpcServer, server)

serverDone = testutils.EnsureRuns(func() {
_ = grpcServer.Serve(lis)

// the controlapi tests use the older grpc.Dial with a manually entered
// timeout. avoid that mess by just using the DialTimeout function
// instead.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// cancel after dial has completed is a no-op, but if we don't cancel,
// linters will (probably) complain about a leaked context.
defer cancel()
conn, err = grpc.DialContext(
ctx, "unix:"+tempUnixSocket,
// block on making this connection, to avoid the tests failing for
// funny reasons related to this connection being established async

client = api.NewControlClient(conn)

By("setting up the orchestrator")
o = NewOrchestrator(s)
orchestratorDone = testutils.EnsureRuns(func() {

AfterEach(func() {
// wait for the server to stop


It("should create the requisite tasks for a new replicated job", func() {
spec := &api.ServiceSpec{
Annotations: api.Annotations{
Name: "testService",
Mode: &api.ServiceSpec_ReplicatedJob{
ReplicatedJob: &api.ReplicatedJob{
MaxConcurrent: 3,
TotalCompletions: 5,
Task: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Image: "image",

resp, err := client.CreateService(context.Background(), &api.CreateServiceRequest{
Spec: spec,

// Eventually is like Expect but it does polling, which is what we want
// If the function has two return values, this will additionally assert
// that the second value is nil
Eventually(func() ([]*api.Task, error) {
// make sure that we can read the tasks back out through the
// controlapi, not just through the store
taskListResp, err := client.ListTasks(
Filters: &api.ListTasksRequest_Filters{
ServiceIDs: []string{resp.Service.ID},
DesiredStates: []api.TaskState{
return taskListResp.Tasks, err

