Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Merge pull request #167 from Netflix/ipv6
Browse files Browse the repository at this point in the history
IPv6
  • Loading branch information
sargun committed Aug 17, 2018
2 parents 10c1b85 + 194fae9 commit b993c46
Show file tree
Hide file tree
Showing 16 changed files with 522 additions and 243 deletions.
31 changes: 27 additions & 4 deletions executor/runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -388,6 +389,19 @@ func setShares(logEntry *log.Entry, c *runtimeTypes.Container, hostCfg *containe
hostCfg.CPUShares = shares
}

func stableSecret() string {
ipBuf := make([]byte, 16)
// We can use math/rand here because this doesn't have to be cryptographically secure
n, err := rand.Read(ipBuf) // nolint: gas
if err != nil {
panic(err)
}
if n != len(ipBuf) {
panic(fmt.Sprintf("rand.Read only read %d bytes, not %d bytes", n, len(ipBuf)))
}
return net.IP(ipBuf).String()
}

func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string, imageSize int64) (*container.Config, *container.HostConfig, error) {
// Extract the entrypoint from the proto. If the proto is empty, pass
// an empty entrypoint and let Docker extract it from the image.
Expand Down Expand Up @@ -423,10 +437,11 @@ func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string,
Binds: binds,
ExtraHosts: []string{fmt.Sprintf("%s:%s", hostname, c.Allocation.IPV4Address)},
Sysctls: map[string]string{
"net.ipv4.tcp_ecn": "1",
"net.ipv6.conf.all.disable_ipv6": "0",
"net.ipv6.conf.default.disable_ipv6": "0",
"net.ipv6.conf.lo.disable_ipv6": "0",
"net.ipv4.tcp_ecn": "1",
"net.ipv6.conf.all.disable_ipv6": "0",
"net.ipv6.conf.default.disable_ipv6": "0",
"net.ipv6.conf.lo.disable_ipv6": "0",
"net.ipv6.conf.default.stable_secret": stableSecret(), // This is to ensure each container sets their addresses differently
},
Init: &useInit,
}
Expand Down Expand Up @@ -652,6 +667,14 @@ func prepareNetworkDriver(parentCtx context.Context, cfg Config, c *runtimeTypes
"--batch-size", strconv.Itoa(cfg.batchSize),
}

assignIPv6Address, err := c.AssignIPv6Address()
if err != nil {
return err
}
if assignIPv6Address {
args = append(args, "--allocate-ipv6-address=true")
}

// This blocks, and ignores kills.
if !c.TitusInfo.GetIgnoreLaunchGuard() {
args = append(args, "--wait-for-sg-lock-timeout", cfg.waitForSecurityGroupLockTimeout.String())
Expand Down
17 changes: 16 additions & 1 deletion executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
const (
hostnameStyleParam = "titusParameter.agent.hostnameStyle"
// FuseEnabledParam is a container atttribute set to enable FUSE
FuseEnabledParam = "titusParameter.agent.fuseEnabled"
FuseEnabledParam = "titusParameter.agent.fuseEnabled"
assignIPv6AddressParam = "titusParameter.agent.assignIPv6Address"
)

// ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role
Expand Down Expand Up @@ -263,6 +264,20 @@ func (c *Container) GetFuseEnabled() (bool, error) {
return val, nil
}

// AssignIPv6Address determines whether the container should be assigned an IPv6 address
func (c *Container) AssignIPv6Address() (bool, error) {
assignIPv6AddressStr, ok := c.TitusInfo.GetPassthroughAttributes()[assignIPv6AddressParam]
if !ok {
return false, nil
}
val, err := strconv.ParseBool(assignIPv6AddressStr)
if err != nil {
return false, err
}

return val, nil
}

// Resources specify constraints to be applied to a Container
type Resources struct {
Mem int64 // in MiB
Expand Down
20 changes: 20 additions & 0 deletions executor/runtime/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,23 @@ func TestInvalidHostnameStyle(t *testing.T) {
assert.NotNil(t, err)
assert.IsType(t, &InvalidConfigurationError{}, err)
}

func TestDefaultIPv6AddressAssignment(t *testing.T) {
c := Container{
TitusInfo: &titus.ContainerInfo{},
}
assignIPv6Address, err := c.AssignIPv6Address()
assert.NoError(t, err)
assert.False(t, assignIPv6Address)
}

func TestIPv6AddressAssignment(t *testing.T) {
c := Container{
TitusInfo: &titus.ContainerInfo{
PassthroughAttributes: map[string]string{assignIPv6AddressParam: "true"},
},
}
assignIPv6Address, err := c.AssignIPv6Address()
assert.NoError(t, err)
assert.True(t, assignIPv6Address)
}
69 changes: 50 additions & 19 deletions vpc/allocate/allocate_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ var AllocateNetwork = cli.Command{ // nolint: golint
Usage: "How long to wait for AWS to give us IP addresses",
Value: 10 * time.Second,
},
cli.BoolFlag{
Name: "allocate-ipv6-address",
Usage: "Allocate IPv6 Address for container",
},
},
}

func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]struct{}, batchSize, deviceIdx int, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, retErr error) {
func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]struct{}, batchSize, deviceIdx int, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool, retErr error) {
var err error

deviceIdx = parentCtx.CLIContext.Int("device-idx")
Expand Down Expand Up @@ -109,14 +113,15 @@ func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]st
retErr = cli.NewExitError("IP Refresh timeout must be at least 1 second", 1)
return
}
allocateIPv6Address = parentCtx.CLIContext.Bool("allocate-ipv6-address")

return
}

func allocateNetwork(parentCtx *context.VPCContext) error {
var err error

securityGroups, batchSize, deviceIdx, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, err := getCommandLine(parentCtx)
securityGroups, batchSize, deviceIdx, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, allocateIPv6Address, err := getCommandLine(parentCtx)
if err != nil {
return err
}
Expand All @@ -128,9 +133,10 @@ func allocateNetwork(parentCtx *context.VPCContext) error {
"securityConvergenceTimeout": securityConvergenceTimeout,
"waitForSgLockTimeout": waitForSgLockTimeout,
"ipRefreshTimeout": ipRefreshTimeout,
"allocateIPv6Address": allocateIPv6Address,
}).Debug()

allocation, err := doAllocateNetwork(parentCtx, deviceIdx, batchSize, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout)
allocation, err := doAllocateNetwork(parentCtx, deviceIdx, batchSize, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, allocateIPv6Address)
if err != nil {
errors := []error{cli.NewExitError("Unable to setup networking", 1), err}
err = json.NewEncoder(os.Stdout).Encode(types.Allocation{Success: false, Error: err.Error()})
Expand All @@ -139,10 +145,20 @@ func allocateNetwork(parentCtx *context.VPCContext) error {
}
return cli.NewMultiError(errors...)
}
ctx := parentCtx.WithField("ip", allocation.ipAddress)
ctx := parentCtx.WithField("ip4", allocation.ip4Address)
if allocateIPv6Address {
ctx = ctx.WithField("ip6", allocation.ip6Address)
}
ctx.Logger.Info("Network setup")
// TODO: Output JSON as to new network settings
err = json.NewEncoder(os.Stdout).Encode(types.Allocation{IPV4Address: allocation.ipAddress, DeviceIndex: deviceIdx, Success: true, ENI: allocation.eni})
err = json.NewEncoder(os.Stdout).
Encode(
types.Allocation{
IPV4Address: allocation.ip4Address,
IPV6Address: allocation.ip6Address,
DeviceIndex: deviceIdx,
Success: true,
ENI: allocation.eni})
if err != nil {
return cli.NewMultiError(cli.NewExitError("Unable to write allocation record", 1), err)
}
Expand Down Expand Up @@ -172,19 +188,27 @@ exit:
}

type allocation struct {
sharedSGLock *fslocker.SharedLock
exclusiveIPLock *fslocker.ExclusiveLock
ipAddress string
eni string
sharedSGLock *fslocker.SharedLock
exclusiveIP4Lock *fslocker.ExclusiveLock
exclusiveIP6Lock *fslocker.ExclusiveLock
ip4Address string
ip6Address string
eni string
}

func (a *allocation) refresh() error {
a.exclusiveIPLock.Bump()
a.exclusiveIP4Lock.Bump()
if a.exclusiveIP6Lock != nil {
a.exclusiveIP6Lock.Bump()
}
return nil
}

func (a *allocation) deallocate(ctx *context.VPCContext) {
a.exclusiveIPLock.Unlock()
a.exclusiveIP4Lock.Unlock()
if a.exclusiveIP6Lock != nil {
a.exclusiveIP6Lock.Bump()
}
a.sharedSGLock.Unlock()
}

Expand All @@ -200,7 +224,7 @@ func getDefaultSecurityGroups(parentCtx *context.VPCContext) (map[string]struct{
return primaryInterface.SecurityGroupIds, nil
}

func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration) (*allocation, error) {
func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool) (*allocation, error) {
// 1. Ensure security groups are setup
ctx, cancel := parentCtx.WithTimeout(5 * time.Minute)
defer cancel()
Expand All @@ -210,21 +234,28 @@ func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int,
ctx.Logger.Warning("Unable to get interface by idx: ", err)
return nil, err
}
sharedSGLock, err := setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout)
allocation := &allocation{
eni: networkInterface.InterfaceID,
}
allocation.sharedSGLock, err = setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout)
if err != nil {
ctx.Logger.Warning("Unable to setup security groups: ", err)
return nil, err
}
// 2. Get a (free) IP
ip, ipLock, err := NewIPPoolManager(networkInterface).allocate(ctx, batchSize, ipRefreshTimeout)
ipPoolManager := NewIPPoolManager(networkInterface)
allocation.ip4Address, allocation.exclusiveIP4Lock, err = ipPoolManager.allocateIPv4(ctx, batchSize, ipRefreshTimeout)
if err != nil {
return nil, err
}
allocation := &allocation{
sharedSGLock: sharedSGLock,
exclusiveIPLock: ipLock,
ipAddress: ip,
eni: networkInterface.InterfaceID,

// Optionally, get an IPv6 address
if allocateIPv6Address {
allocation.ip6Address, allocation.exclusiveIP6Lock, err = ipPoolManager.allocateIPv6(ctx, networkInterface)
if err != nil {
allocation.deallocate(ctx)
return nil, err
}
}

return allocation, nil
Expand Down
47 changes: 40 additions & 7 deletions vpc/allocate/ip_pool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocate

import (
"errors"
"math/rand"
"path/filepath"
"time"

Expand All @@ -17,6 +18,7 @@ import (
var (
errIPRefreshFailed = errors.New("IP refresh failed")
errMaxIPAddressesAllocated = errors.New("Maximum number of ip addresses allocated")
errNoFreeIPAddressFound = errors.New("No free IP address found")
)

// IPPoolManager encapsulates all management, and locking for a given interface. It must be constructed with NewIPPoolManager
Expand Down Expand Up @@ -81,7 +83,37 @@ func (mgr *IPPoolManager) assignMoreIPs(ctx *context.VPCContext, batchSize int,
return errIPRefreshFailed
}

func (mgr *IPPoolManager) allocate(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) {
func (mgr *IPPoolManager) allocateIPv6(ctx *context.VPCContext, networkinterface *ec2wrapper.EC2NetworkInterface) (string, *fslocker.ExclusiveLock, error) {
configLock, err := mgr.lockConfiguration(ctx)
if err != nil {
ctx.Logger.Warning("Unable to get lock during allocation: ", err)
return "", nil, err
}
defer configLock.Unlock()

iface, err := ctx.Cache.DescribeInterface(ctx, networkinterface.InterfaceID)
if err != nil {
return "", nil, err
}
ipv6Addresses := iface.Ipv6Addresses
rand.Shuffle(len(ipv6Addresses), func(i, j int) {
ipv6Addresses[i], ipv6Addresses[j] = ipv6Addresses[j], ipv6Addresses[i]
})
for _, ipAddress := range iface.Ipv6Addresses {
lock, err := mgr.tryAllocate(ctx, *ipAddress.Ipv6Address)
if err != nil {
ctx.Logger.Warning("Unable to do allocation: ", err)
return "", nil, err
}
if lock != nil {
lock.Bump()
return *ipAddress.Ipv6Address, lock, nil
}
}
return "", nil, errNoFreeIPAddressFound
}

func (mgr *IPPoolManager) allocateIPv4(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) {
configLock, err := mgr.lockConfiguration(ctx)
if err != nil {
ctx.Logger.Warning("Unable to get lock during allocation: ", err)
Expand All @@ -96,11 +128,12 @@ func (mgr *IPPoolManager) allocate(ctx *context.VPCContext, batchSize int, ipRef
}

ip, lock, err := mgr.doAllocate(ctx)
// Did we successfully get an IP, or was there an error?
if err != nil || lock != nil {
if err != nil {
ctx.Logger.Warning("Unable to allocate IP: ", err)
}
if err == errNoFreeIPAddressFound {

} else if err != nil {
ctx.Logger.WithError(err).Warning("Unable to allocate IP")
return ip, lock, err
} else if lock != nil { // We assume we only get a non-nil lock when we get a non-nil IP address
return ip, lock, err
}

Expand Down Expand Up @@ -128,7 +161,7 @@ func (mgr *IPPoolManager) doAllocate(ctx *context.VPCContext) (string, *fslocker
return ipAddress, lock, nil
}
}
return "", nil, nil
return "", nil, errNoFreeIPAddressFound
}

func (mgr *IPPoolManager) ipAddressesPath() string {
Expand Down
Loading

0 comments on commit b993c46

Please sign in to comment.