Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Wait For Titus Isolate to Isolate Workload Before Starting Workload
Browse files Browse the repository at this point in the history
Titus isolate waits for Docker events, and such. This sometimes
can take a moment. In order to avoid racing, and have the application
read the CPU state, and then configure itself based on that CPU
state, we wait for titus-isolate up to some timeout.
  • Loading branch information
sargun committed Feb 8, 2019
1 parent a48e49e commit 6a21352
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 22 deletions.
85 changes: 63 additions & 22 deletions executor/runtime/docker/docker.go
Expand Up @@ -83,6 +83,9 @@ type Config struct { // nolint: maligned
bumpTiniSchedPriority bool
waitForSecurityGroupLockTimeout time.Duration
ipRefreshTimeout time.Duration

titusIsolateBlockTime time.Duration
enableTitusIsolateBlock bool
}

// NewConfig generates a configuration, with a set of flags tied to it for the docker runtime
Expand Down Expand Up @@ -138,6 +141,19 @@ func NewConfig() (*Config, []cli.Flag) {
Destination: &cfg.ipRefreshTimeout,
Value: time.Second * 10,
},
cli.DurationFlag{
Name: "titus.executor.titusIsolateBlockTime",
EnvVar: "TITUS_EXECUTOR_TITUS_ISOLATE_BLOCK_TIME",
// The default value inside of the Titus Isolate code is 10 seconds.
// we can wait longer than it
Value: 30 * time.Second,
Destination: &cfg.titusIsolateBlockTime,
},
cli.BoolFlag{
Name: "titus.executor.enableTitusIsolateBlock",
EnvVar: "ENABLE_TITUS_ISOLATE_BLOCK",
Destination: &cfg.enableTitusIsolateBlock,
},
// Allow the usage of a realtime scheduling policy to be optional on systems that don't have it properly configured
// by default, i.e.: docker-for-mac.
cli.BoolTFlag{
Expand Down Expand Up @@ -801,7 +817,7 @@ func (r *DockerRuntime) doSetupSSHdContainer(ctx context.Context, containerName

// createVolumeContainer creates a container to be used as a source for volumes to be mounted via VolumesFrom
func (r *DockerRuntime) createVolumeContainer(ctx context.Context, containerName *string, cfg *container.Config, hostConfig *container.HostConfig) error { // nolint: gocyclo
var image string = cfg.Image
image := cfg.Image
tmpImageInfo, err := imageExists(ctx, r.client, image)
if err != nil {
return err
Expand Down Expand Up @@ -1265,6 +1281,27 @@ func (r *DockerRuntime) processEFSMounts(c *runtimeTypes.Container) ([]efsMountI
return efsMountInfos, nil
}

func (r *DockerRuntime) waitForTini(ctx context.Context, listener *net.UnixListener, efsMountInfos []efsMountInfo, c *runtimeTypes.Container) (string, error) {
// This can block for up to the full ctx timeout
logDir, containerCred, rootFile, unixConn, err := r.setupPostStartLogDirTini(ctx, listener, c)
if err != nil {
return logDir, err
}

if len(efsMountInfos) > 0 {
err = r.setupEFSMounts(ctx, c, rootFile, containerCred, efsMountInfos)
if err != nil {
return logDir, err
}
}

err = launchTini(unixConn)
if err != nil {
shouldClose(unixConn)
}
return logDir, err
}

// Start runs an already created container. A watcher is created that monitors container state. The Status Message Channel is ONLY
// valid if err == nil, otherwise it will block indefinitely.
func (r *DockerRuntime) Start(parentCtx context.Context, c *runtimeTypes.Container) (string, *runtimeTypes.Details, <-chan runtimeTypes.StatusMessage, error) {
Expand All @@ -1289,6 +1326,9 @@ func (r *DockerRuntime) Start(parentCtx context.Context, c *runtimeTypes.Contain
return "", nil, statusMessageChan, err
}
} else {
if len(efsMountInfos) > 0 {
entry.Fatal("Cannot perform EFS mounts without Tini")
}
entry.Warning("Starting Without Tini, no logging (globally disabled)")
}

Expand Down Expand Up @@ -1333,27 +1373,15 @@ func (r *DockerRuntime) Start(parentCtx context.Context, c *runtimeTypes.Contain
}

if r.tiniEnabled {
// This can block for up the the full ctx timeout
logDir, containerCred, rootFile, unixConn, err := r.setupPostStartLogDirTini(ctx, listener, c)
if err != nil {
eventCancel()
return "", nil, statusMessageChan, err
}
err = r.setupEFSMounts(ctx, c, rootFile, containerCred, efsMountInfos)
if err != nil {
eventCancel()
return "", nil, statusMessageChan, err
}

err = launchTini(unixConn)
logDir, err := r.waitForTini(ctx, listener, efsMountInfos, c)
if err != nil {
shouldClose(unixConn)
eventCancel()
return "", nil, statusMessageChan, err
} else {
go r.statusMonitor(eventCancel, c, eventChan, eventErrChan, statusMessageChan)
}
go r.statusMonitor(eventCancel, c, eventChan, eventErrChan, statusMessageChan)
return logDir, details, statusMessageChan, nil
return logDir, details, statusMessageChan, err
}

go r.statusMonitor(eventCancel, c, eventChan, eventErrChan, statusMessageChan)
// We already logged above that we aren't using Tini
// This means that the log watcher is not started
Expand Down Expand Up @@ -1620,11 +1648,24 @@ func (r *DockerRuntime) setupPostStartLogDirTiniHandleConnection(parentCtx conte
return r.logDir(c), &cred, rootFile, err
}

func (r *DockerRuntime) setupPostStartLogDirTiniHandleConnection2(parentCtx context.Context, c *runtimeTypes.Container, cred ucred, rootFile *os.File) error {
func (r *DockerRuntime) setupPostStartLogDirTiniHandleConnection2(parentCtx context.Context, c *runtimeTypes.Container, cred ucred, rootFile *os.File) error { // nolint: gocyclo
group, errGroupCtx := errgroup.WithContext(parentCtx)

if r.cfg.UseNewNetworkDriver && c.Allocation.IPV4Address != "" {
if err := setupNetworking(r.dockerCfg.burst, c, cred); err != nil {
return err
}
group.Go(func() error {
return setupNetworking(r.dockerCfg.burst, c, cred)
})
}

if r.dockerCfg.enableTitusIsolateBlock {
group.Go(func() error {
waitForTitusIsolate(errGroupCtx, c.TaskID, r.dockerCfg.titusIsolateBlockTime)
return nil
})
}

if err := group.Wait(); err != nil {
return err
}

if r.dockerCfg.bumpTiniSchedPriority {
Expand Down
58 changes: 58 additions & 0 deletions executor/runtime/docker/docker_isolate.go
@@ -0,0 +1,58 @@
package docker

import (
"context"
"fmt"
"net/http"
"net/url"
"time"

"github.com/sirupsen/logrus"
)

const (
titusIsolateHost = "localhost:7500"
)

func waitForTitusIsolate(parentCtx context.Context, taskID string, timeout time.Duration) {
waitForTitusIsolateWithHost(parentCtx, taskID, titusIsolateHost, timeout)
}

func waitForTitusIsolateWithHost(parentCtx context.Context, taskID, host string, timeout time.Duration) {
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
requestURL := &url.URL{
Scheme: "http",
Host: host,
Path: fmt.Sprintf("/isolate/%s", taskID),
}

rq, err := http.NewRequest("GET", requestURL.String(), http.NoBody)
if err != nil {
logrus.WithError(err).Warn("Could not form HTTP Request to Isolate")
return
}
rq = rq.WithContext(ctx)
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
resp, err := client.Do(rq)
// Check if context timed out
if err != nil {
logrus.WithError(err).Warn("Error calling titus isolate")
return
}

defer shouldClose(resp.Body)

if resp.StatusCode == 200 {
return
}

logrus.WithFields(map[string]interface{}{
"statusCode": resp.StatusCode,
"status": resp.Status,
}).Warn("Titus Isolate did not return code 200")
}
65 changes: 65 additions & 0 deletions executor/runtime/docker/docker_isolate_test.go
@@ -0,0 +1,65 @@
package docker

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTitusIsolateTimeout(t *testing.T) {
t.Parallel()
handler := func(w http.ResponseWriter, r *http.Request) {
// This function will not return quickly.
t := time.NewTimer(30 * time.Second)
defer t.Stop()
select {
case <-t.C:
case <-r.Context().Done():
}
w.WriteHeader(200)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
now := time.Now()
waitForTitusIsolateWithHost(ctx, "foo", server.Listener.Addr().String(), 1*time.Second)
assert.True(t, time.Since(now) < 15*time.Second)
}

func TestTitusIsolateSuccess(t *testing.T) {
t.Parallel()
handler := func(w http.ResponseWriter, r *http.Request) {
// This function will return quickly.
w.WriteHeader(200)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
now := time.Now()
waitForTitusIsolateWithHost(ctx, "foo", server.Listener.Addr().String(), 1*time.Second)
assert.True(t, time.Since(now) < 5*time.Second)
}

func TestTitusIsolate404(t *testing.T) {
t.Parallel()
handler := func(w http.ResponseWriter, r *http.Request) {
// This function will return quickly.
w.WriteHeader(404)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
now := time.Now()
waitForTitusIsolateWithHost(ctx, "foo", server.Listener.Addr().String(), 1*time.Second)
assert.True(t, time.Since(now) < 5*time.Second)
}

0 comments on commit 6a21352

Please sign in to comment.