Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

IPv6 #167

Merged
merged 6 commits into from
Aug 17, 2018
Merged

IPv6 #167

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions executor/runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -388,6 +389,19 @@ func setShares(logEntry *log.Entry, c *runtimeTypes.Container, hostCfg *containe
hostCfg.CPUShares = shares
}

func stableSecret() string {
ipBuf := make([]byte, 16)
// We can use math/rand here because this doesn't have to be cryptographically secure
n, err := rand.Read(ipBuf) // nolint: gas
if err != nil {
panic(err)
}
if n != len(ipBuf) {
panic(fmt.Sprintf("rand.Read only read %d bytes, not %d bytes", n, len(ipBuf)))
}
return net.IP(ipBuf).String()
}

func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string, imageSize int64) (*container.Config, *container.HostConfig, error) {
// Extract the entrypoint from the proto. If the proto is empty, pass
// an empty entrypoint and let Docker extract it from the image.
Expand Down Expand Up @@ -423,10 +437,11 @@ func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string,
Binds: binds,
ExtraHosts: []string{fmt.Sprintf("%s:%s", hostname, c.Allocation.IPV4Address)},
Sysctls: map[string]string{
"net.ipv4.tcp_ecn": "1",
"net.ipv6.conf.all.disable_ipv6": "0",
"net.ipv6.conf.default.disable_ipv6": "0",
"net.ipv6.conf.lo.disable_ipv6": "0",
"net.ipv4.tcp_ecn": "1",
"net.ipv6.conf.all.disable_ipv6": "0",
"net.ipv6.conf.default.disable_ipv6": "0",
"net.ipv6.conf.lo.disable_ipv6": "0",
"net.ipv6.conf.default.stable_secret": stableSecret(), // This is to ensure each container sets their addresses differently
},
Init: &useInit,
}
Expand Down Expand Up @@ -652,6 +667,14 @@ func prepareNetworkDriver(parentCtx context.Context, cfg Config, c *runtimeTypes
"--batch-size", strconv.Itoa(cfg.batchSize),
}

assignIPv6Address, err := c.AssignIPv6Address()
if err != nil {
return err
}
if assignIPv6Address {
args = append(args, "--allocate-ipv6-address=true")
}

// This blocks, and ignores kills.
if !c.TitusInfo.GetIgnoreLaunchGuard() {
args = append(args, "--wait-for-sg-lock-timeout", cfg.waitForSecurityGroupLockTimeout.String())
Expand Down
17 changes: 16 additions & 1 deletion executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
const (
hostnameStyleParam = "titusParameter.agent.hostnameStyle"
// FuseEnabledParam is a container atttribute set to enable FUSE
FuseEnabledParam = "titusParameter.agent.fuseEnabled"
FuseEnabledParam = "titusParameter.agent.fuseEnabled"
assignIPv6AddressParam = "titusParameter.agent.assignIPv6Address"
)

// ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role
Expand Down Expand Up @@ -263,6 +264,20 @@ func (c *Container) GetFuseEnabled() (bool, error) {
return val, nil
}

// AssignIPv6Address determines whether the container should be assigned an IPv6 address
func (c *Container) AssignIPv6Address() (bool, error) {
assignIPv6AddressStr, ok := c.TitusInfo.GetPassthroughAttributes()[assignIPv6AddressParam]
if !ok {
return false, nil
}
val, err := strconv.ParseBool(assignIPv6AddressStr)
if err != nil {
return false, err
}

return val, nil
}

// Resources specify constraints to be applied to a Container
type Resources struct {
Mem int64 // in MiB
Expand Down
20 changes: 20 additions & 0 deletions executor/runtime/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,23 @@ func TestInvalidHostnameStyle(t *testing.T) {
assert.NotNil(t, err)
assert.IsType(t, &InvalidConfigurationError{}, err)
}

func TestDefaultIPv6AddressAssignment(t *testing.T) {
c := Container{
TitusInfo: &titus.ContainerInfo{},
}
assignIPv6Address, err := c.AssignIPv6Address()
assert.NoError(t, err)
assert.False(t, assignIPv6Address)
}

func TestIPv6AddressAssignment(t *testing.T) {
c := Container{
TitusInfo: &titus.ContainerInfo{
PassthroughAttributes: map[string]string{assignIPv6AddressParam: "true"},
},
}
assignIPv6Address, err := c.AssignIPv6Address()
assert.NoError(t, err)
assert.True(t, assignIPv6Address)
}
69 changes: 50 additions & 19 deletions vpc/allocate/allocate_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ var AllocateNetwork = cli.Command{ // nolint: golint
Usage: "How long to wait for AWS to give us IP addresses",
Value: 10 * time.Second,
},
cli.BoolFlag{
Name: "allocate-ipv6-address",
Usage: "Allocate IPv6 Address for container",
},
},
}

func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]struct{}, batchSize, deviceIdx int, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, retErr error) {
func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]struct{}, batchSize, deviceIdx int, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool, retErr error) {
var err error

deviceIdx = parentCtx.CLIContext.Int("device-idx")
Expand Down Expand Up @@ -109,14 +113,15 @@ func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]st
retErr = cli.NewExitError("IP Refresh timeout must be at least 1 second", 1)
return
}
allocateIPv6Address = parentCtx.CLIContext.Bool("allocate-ipv6-address")

return
}

func allocateNetwork(parentCtx *context.VPCContext) error {
var err error

securityGroups, batchSize, deviceIdx, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, err := getCommandLine(parentCtx)
securityGroups, batchSize, deviceIdx, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, allocateIPv6Address, err := getCommandLine(parentCtx)
if err != nil {
return err
}
Expand All @@ -128,9 +133,10 @@ func allocateNetwork(parentCtx *context.VPCContext) error {
"securityConvergenceTimeout": securityConvergenceTimeout,
"waitForSgLockTimeout": waitForSgLockTimeout,
"ipRefreshTimeout": ipRefreshTimeout,
"allocateIPv6Address": allocateIPv6Address,
}).Debug()

allocation, err := doAllocateNetwork(parentCtx, deviceIdx, batchSize, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout)
allocation, err := doAllocateNetwork(parentCtx, deviceIdx, batchSize, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, allocateIPv6Address)
if err != nil {
errors := []error{cli.NewExitError("Unable to setup networking", 1), err}
err = json.NewEncoder(os.Stdout).Encode(types.Allocation{Success: false, Error: err.Error()})
Expand All @@ -139,10 +145,20 @@ func allocateNetwork(parentCtx *context.VPCContext) error {
}
return cli.NewMultiError(errors...)
}
ctx := parentCtx.WithField("ip", allocation.ipAddress)
ctx := parentCtx.WithField("ip4", allocation.ip4Address)
if allocateIPv6Address {
ctx = ctx.WithField("ip6", allocation.ip6Address)
}
ctx.Logger.Info("Network setup")
// TODO: Output JSON as to new network settings
err = json.NewEncoder(os.Stdout).Encode(types.Allocation{IPV4Address: allocation.ipAddress, DeviceIndex: deviceIdx, Success: true, ENI: allocation.eni})
err = json.NewEncoder(os.Stdout).
Encode(
types.Allocation{
IPV4Address: allocation.ip4Address,
IPV6Address: allocation.ip6Address,
DeviceIndex: deviceIdx,
Success: true,
ENI: allocation.eni})
if err != nil {
return cli.NewMultiError(cli.NewExitError("Unable to write allocation record", 1), err)
}
Expand Down Expand Up @@ -172,19 +188,27 @@ exit:
}

type allocation struct {
sharedSGLock *fslocker.SharedLock
exclusiveIPLock *fslocker.ExclusiveLock
ipAddress string
eni string
sharedSGLock *fslocker.SharedLock
exclusiveIP4Lock *fslocker.ExclusiveLock
exclusiveIP6Lock *fslocker.ExclusiveLock
ip4Address string
ip6Address string
eni string
}

func (a *allocation) refresh() error {
a.exclusiveIPLock.Bump()
a.exclusiveIP4Lock.Bump()
if a.exclusiveIP6Lock != nil {
a.exclusiveIP6Lock.Bump()
}
return nil
}

func (a *allocation) deallocate(ctx *context.VPCContext) {
a.exclusiveIPLock.Unlock()
a.exclusiveIP4Lock.Unlock()
if a.exclusiveIP6Lock != nil {
a.exclusiveIP6Lock.Bump()
}
a.sharedSGLock.Unlock()
}

Expand All @@ -200,7 +224,7 @@ func getDefaultSecurityGroups(parentCtx *context.VPCContext) (map[string]struct{
return primaryInterface.SecurityGroupIds, nil
}

func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration) (*allocation, error) {
func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool) (*allocation, error) {
// 1. Ensure security groups are setup
ctx, cancel := parentCtx.WithTimeout(5 * time.Minute)
defer cancel()
Expand All @@ -210,21 +234,28 @@ func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int,
ctx.Logger.Warning("Unable to get interface by idx: ", err)
return nil, err
}
sharedSGLock, err := setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout)
allocation := &allocation{
eni: networkInterface.InterfaceID,
}
allocation.sharedSGLock, err = setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout)
if err != nil {
ctx.Logger.Warning("Unable to setup security groups: ", err)
return nil, err
}
// 2. Get a (free) IP
ip, ipLock, err := NewIPPoolManager(networkInterface).allocate(ctx, batchSize, ipRefreshTimeout)
ipPoolManager := NewIPPoolManager(networkInterface)
allocation.ip4Address, allocation.exclusiveIP4Lock, err = ipPoolManager.allocateIPv4(ctx, batchSize, ipRefreshTimeout)
if err != nil {
return nil, err
}
allocation := &allocation{
sharedSGLock: sharedSGLock,
exclusiveIPLock: ipLock,
ipAddress: ip,
eni: networkInterface.InterfaceID,

// Optionally, get an IPv6 address
if allocateIPv6Address {
allocation.ip6Address, allocation.exclusiveIP6Lock, err = ipPoolManager.allocateIPv6(ctx, networkInterface)
if err != nil {
allocation.deallocate(ctx)
return nil, err
}
}

return allocation, nil
Expand Down
47 changes: 40 additions & 7 deletions vpc/allocate/ip_pool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocate

import (
"errors"
"math/rand"
"path/filepath"
"time"

Expand All @@ -17,6 +18,7 @@ import (
var (
errIPRefreshFailed = errors.New("IP refresh failed")
errMaxIPAddressesAllocated = errors.New("Maximum number of ip addresses allocated")
errNoFreeIPAddressFound = errors.New("No free IP address found")
)

// IPPoolManager encapsulates all management, and locking for a given interface. It must be constructed with NewIPPoolManager
Expand Down Expand Up @@ -81,7 +83,37 @@ func (mgr *IPPoolManager) assignMoreIPs(ctx *context.VPCContext, batchSize int,
return errIPRefreshFailed
}

func (mgr *IPPoolManager) allocate(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) {
func (mgr *IPPoolManager) allocateIPv6(ctx *context.VPCContext, networkinterface *ec2wrapper.EC2NetworkInterface) (string, *fslocker.ExclusiveLock, error) {
configLock, err := mgr.lockConfiguration(ctx)
if err != nil {
ctx.Logger.Warning("Unable to get lock during allocation: ", err)
return "", nil, err
}
defer configLock.Unlock()

iface, err := ctx.Cache.DescribeInterface(ctx, networkinterface.InterfaceID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we assume each interface will always have enough IPv6s? A short comment about how we expect to allocate IPs to each ENI would be good here

if err != nil {
return "", nil, err
}
ipv6Addresses := iface.Ipv6Addresses
rand.Shuffle(len(ipv6Addresses), func(i, j int) {
ipv6Addresses[i], ipv6Addresses[j] = ipv6Addresses[j], ipv6Addresses[i]
})
for _, ipAddress := range iface.Ipv6Addresses {
lock, err := mgr.tryAllocate(ctx, *ipAddress.Ipv6Address)
if err != nil {
ctx.Logger.Warning("Unable to do allocation: ", err)
return "", nil, err
}
if lock != nil {
lock.Bump()
return *ipAddress.Ipv6Address, lock, nil
}
}
return "", nil, errNoFreeIPAddressFound
}

func (mgr *IPPoolManager) allocateIPv4(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) {
configLock, err := mgr.lockConfiguration(ctx)
if err != nil {
ctx.Logger.Warning("Unable to get lock during allocation: ", err)
Expand All @@ -96,11 +128,12 @@ func (mgr *IPPoolManager) allocate(ctx *context.VPCContext, batchSize int, ipRef
}

ip, lock, err := mgr.doAllocate(ctx)
// Did we successfully get an IP, or was there an error?
if err != nil || lock != nil {
if err != nil {
ctx.Logger.Warning("Unable to allocate IP: ", err)
}
if err == errNoFreeIPAddressFound {

} else if err != nil {
ctx.Logger.WithError(err).Warning("Unable to allocate IP")
return ip, lock, err
} else if lock != nil { // We assume we only get a non-nil lock when we get a non-nil IP address
return ip, lock, err
}

Expand Down Expand Up @@ -128,7 +161,7 @@ func (mgr *IPPoolManager) doAllocate(ctx *context.VPCContext) (string, *fslocker
return ipAddress, lock, nil
}
}
return "", nil, nil
return "", nil, errNoFreeIPAddressFound
}

func (mgr *IPPoolManager) ipAddressesPath() string {
Expand Down
Loading