diff --git a/executor/runtime/docker/docker.go b/executor/runtime/docker/docker.go index 2bf5be6cd..57503ef3a 100644 --- a/executor/runtime/docker/docker.go +++ b/executor/runtime/docker/docker.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net" "os" "os/exec" @@ -388,6 +389,19 @@ func setShares(logEntry *log.Entry, c *runtimeTypes.Container, hostCfg *containe hostCfg.CPUShares = shares } +func stableSecret() string { + ipBuf := make([]byte, 16) + // We can use math/rand here because this doesn't have to be cryptographically secure + n, err := rand.Read(ipBuf) // nolint: gas + if err != nil { + panic(err) + } + if n != len(ipBuf) { + panic(fmt.Sprintf("rand.Read only read %d bytes, not %d bytes", n, len(ipBuf))) + } + return net.IP(ipBuf).String() +} + func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string, imageSize int64) (*container.Config, *container.HostConfig, error) { // Extract the entrypoint from the proto. If the proto is empty, pass // an empty entrypoint and let Docker extract it from the image. @@ -423,10 +437,11 @@ func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string, Binds: binds, ExtraHosts: []string{fmt.Sprintf("%s:%s", hostname, c.Allocation.IPV4Address)}, Sysctls: map[string]string{ - "net.ipv4.tcp_ecn": "1", - "net.ipv6.conf.all.disable_ipv6": "0", - "net.ipv6.conf.default.disable_ipv6": "0", - "net.ipv6.conf.lo.disable_ipv6": "0", + "net.ipv4.tcp_ecn": "1", + "net.ipv6.conf.all.disable_ipv6": "0", + "net.ipv6.conf.default.disable_ipv6": "0", + "net.ipv6.conf.lo.disable_ipv6": "0", + "net.ipv6.conf.default.stable_secret": stableSecret(), // This is to ensure each container sets their addresses differently }, Init: &useInit, } @@ -652,6 +667,14 @@ func prepareNetworkDriver(parentCtx context.Context, cfg Config, c *runtimeTypes "--batch-size", strconv.Itoa(cfg.batchSize), } + assignIPv6Address, err := c.AssignIPv6Address() + if err != nil { + return err + } + if assignIPv6Address { + args = append(args, "--allocate-ipv6-address=true") + } + // This blocks, and ignores kills. if !c.TitusInfo.GetIgnoreLaunchGuard() { args = append(args, "--wait-for-sg-lock-timeout", cfg.waitForSecurityGroupLockTimeout.String()) diff --git a/executor/runtime/types/types.go b/executor/runtime/types/types.go index a5401d314..77fa8cae8 100644 --- a/executor/runtime/types/types.go +++ b/executor/runtime/types/types.go @@ -24,7 +24,8 @@ import ( const ( hostnameStyleParam = "titusParameter.agent.hostnameStyle" // FuseEnabledParam is a container atttribute set to enable FUSE - FuseEnabledParam = "titusParameter.agent.fuseEnabled" + FuseEnabledParam = "titusParameter.agent.fuseEnabled" + assignIPv6AddressParam = "titusParameter.agent.assignIPv6Address" ) // ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role @@ -263,6 +264,20 @@ func (c *Container) GetFuseEnabled() (bool, error) { return val, nil } +// AssignIPv6Address determines whether the container should be assigned an IPv6 address +func (c *Container) AssignIPv6Address() (bool, error) { + assignIPv6AddressStr, ok := c.TitusInfo.GetPassthroughAttributes()[assignIPv6AddressParam] + if !ok { + return false, nil + } + val, err := strconv.ParseBool(assignIPv6AddressStr) + if err != nil { + return false, err + } + + return val, nil +} + // Resources specify constraints to be applied to a Container type Resources struct { Mem int64 // in MiB diff --git a/executor/runtime/types/types_test.go b/executor/runtime/types/types_test.go index e52d31b2d..606a516f5 100644 --- a/executor/runtime/types/types_test.go +++ b/executor/runtime/types/types_test.go @@ -134,3 +134,23 @@ func TestInvalidHostnameStyle(t *testing.T) { assert.NotNil(t, err) assert.IsType(t, &InvalidConfigurationError{}, err) } + +func TestDefaultIPv6AddressAssignment(t *testing.T) { + c := Container{ + TitusInfo: &titus.ContainerInfo{}, + } + assignIPv6Address, err := c.AssignIPv6Address() + assert.NoError(t, err) + assert.False(t, assignIPv6Address) +} + +func TestIPv6AddressAssignment(t *testing.T) { + c := Container{ + TitusInfo: &titus.ContainerInfo{ + PassthroughAttributes: map[string]string{assignIPv6AddressParam: "true"}, + }, + } + assignIPv6Address, err := c.AssignIPv6Address() + assert.NoError(t, err) + assert.True(t, assignIPv6Address) +} diff --git a/vpc/allocate/allocate_network.go b/vpc/allocate/allocate_network.go index c46717d23..e9f887e34 100644 --- a/vpc/allocate/allocate_network.go +++ b/vpc/allocate/allocate_network.go @@ -61,10 +61,14 @@ var AllocateNetwork = cli.Command{ // nolint: golint Usage: "How long to wait for AWS to give us IP addresses", Value: 10 * time.Second, }, + cli.BoolFlag{ + Name: "allocate-ipv6-address", + Usage: "Allocate IPv6 Address for container", + }, }, } -func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]struct{}, batchSize, deviceIdx int, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, retErr error) { +func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]struct{}, batchSize, deviceIdx int, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool, retErr error) { var err error deviceIdx = parentCtx.CLIContext.Int("device-idx") @@ -109,6 +113,7 @@ func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]st retErr = cli.NewExitError("IP Refresh timeout must be at least 1 second", 1) return } + allocateIPv6Address = parentCtx.CLIContext.Bool("allocate-ipv6-address") return } @@ -116,7 +121,7 @@ func getCommandLine(parentCtx *context.VPCContext) (securityGroups map[string]st func allocateNetwork(parentCtx *context.VPCContext) error { var err error - securityGroups, batchSize, deviceIdx, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, err := getCommandLine(parentCtx) + securityGroups, batchSize, deviceIdx, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, allocateIPv6Address, err := getCommandLine(parentCtx) if err != nil { return err } @@ -128,9 +133,10 @@ func allocateNetwork(parentCtx *context.VPCContext) error { "securityConvergenceTimeout": securityConvergenceTimeout, "waitForSgLockTimeout": waitForSgLockTimeout, "ipRefreshTimeout": ipRefreshTimeout, + "allocateIPv6Address": allocateIPv6Address, }).Debug() - allocation, err := doAllocateNetwork(parentCtx, deviceIdx, batchSize, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout) + allocation, err := doAllocateNetwork(parentCtx, deviceIdx, batchSize, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout, allocateIPv6Address) if err != nil { errors := []error{cli.NewExitError("Unable to setup networking", 1), err} err = json.NewEncoder(os.Stdout).Encode(types.Allocation{Success: false, Error: err.Error()}) @@ -139,10 +145,20 @@ func allocateNetwork(parentCtx *context.VPCContext) error { } return cli.NewMultiError(errors...) } - ctx := parentCtx.WithField("ip", allocation.ipAddress) + ctx := parentCtx.WithField("ip4", allocation.ip4Address) + if allocateIPv6Address { + ctx = ctx.WithField("ip6", allocation.ip6Address) + } ctx.Logger.Info("Network setup") // TODO: Output JSON as to new network settings - err = json.NewEncoder(os.Stdout).Encode(types.Allocation{IPV4Address: allocation.ipAddress, DeviceIndex: deviceIdx, Success: true, ENI: allocation.eni}) + err = json.NewEncoder(os.Stdout). + Encode( + types.Allocation{ + IPV4Address: allocation.ip4Address, + IPV6Address: allocation.ip6Address, + DeviceIndex: deviceIdx, + Success: true, + ENI: allocation.eni}) if err != nil { return cli.NewMultiError(cli.NewExitError("Unable to write allocation record", 1), err) } @@ -172,19 +188,27 @@ exit: } type allocation struct { - sharedSGLock *fslocker.SharedLock - exclusiveIPLock *fslocker.ExclusiveLock - ipAddress string - eni string + sharedSGLock *fslocker.SharedLock + exclusiveIP4Lock *fslocker.ExclusiveLock + exclusiveIP6Lock *fslocker.ExclusiveLock + ip4Address string + ip6Address string + eni string } func (a *allocation) refresh() error { - a.exclusiveIPLock.Bump() + a.exclusiveIP4Lock.Bump() + if a.exclusiveIP6Lock != nil { + a.exclusiveIP6Lock.Bump() + } return nil } func (a *allocation) deallocate(ctx *context.VPCContext) { - a.exclusiveIPLock.Unlock() + a.exclusiveIP4Lock.Unlock() + if a.exclusiveIP6Lock != nil { + a.exclusiveIP6Lock.Bump() + } a.sharedSGLock.Unlock() } @@ -200,7 +224,7 @@ func getDefaultSecurityGroups(parentCtx *context.VPCContext) (map[string]struct{ return primaryInterface.SecurityGroupIds, nil } -func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration) (*allocation, error) { +func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool) (*allocation, error) { // 1. Ensure security groups are setup ctx, cancel := parentCtx.WithTimeout(5 * time.Minute) defer cancel() @@ -210,21 +234,28 @@ func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, ctx.Logger.Warning("Unable to get interface by idx: ", err) return nil, err } - sharedSGLock, err := setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout) + allocation := &allocation{ + eni: networkInterface.InterfaceID, + } + allocation.sharedSGLock, err = setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout) if err != nil { ctx.Logger.Warning("Unable to setup security groups: ", err) return nil, err } // 2. Get a (free) IP - ip, ipLock, err := NewIPPoolManager(networkInterface).allocate(ctx, batchSize, ipRefreshTimeout) + ipPoolManager := NewIPPoolManager(networkInterface) + allocation.ip4Address, allocation.exclusiveIP4Lock, err = ipPoolManager.allocateIPv4(ctx, batchSize, ipRefreshTimeout) if err != nil { return nil, err } - allocation := &allocation{ - sharedSGLock: sharedSGLock, - exclusiveIPLock: ipLock, - ipAddress: ip, - eni: networkInterface.InterfaceID, + + // Optionally, get an IPv6 address + if allocateIPv6Address { + allocation.ip6Address, allocation.exclusiveIP6Lock, err = ipPoolManager.allocateIPv6(ctx, networkInterface) + if err != nil { + allocation.deallocate(ctx) + return nil, err + } } return allocation, nil diff --git a/vpc/allocate/ip_pool_manager.go b/vpc/allocate/ip_pool_manager.go index 8e31d2558..ca2f28e7b 100644 --- a/vpc/allocate/ip_pool_manager.go +++ b/vpc/allocate/ip_pool_manager.go @@ -2,6 +2,7 @@ package allocate import ( "errors" + "math/rand" "path/filepath" "time" @@ -17,6 +18,7 @@ import ( var ( errIPRefreshFailed = errors.New("IP refresh failed") errMaxIPAddressesAllocated = errors.New("Maximum number of ip addresses allocated") + errNoFreeIPAddressFound = errors.New("No free IP address found") ) // IPPoolManager encapsulates all management, and locking for a given interface. It must be constructed with NewIPPoolManager @@ -81,7 +83,37 @@ func (mgr *IPPoolManager) assignMoreIPs(ctx *context.VPCContext, batchSize int, return errIPRefreshFailed } -func (mgr *IPPoolManager) allocate(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) { +func (mgr *IPPoolManager) allocateIPv6(ctx *context.VPCContext, networkinterface *ec2wrapper.EC2NetworkInterface) (string, *fslocker.ExclusiveLock, error) { + configLock, err := mgr.lockConfiguration(ctx) + if err != nil { + ctx.Logger.Warning("Unable to get lock during allocation: ", err) + return "", nil, err + } + defer configLock.Unlock() + + iface, err := ctx.Cache.DescribeInterface(ctx, networkinterface.InterfaceID) + if err != nil { + return "", nil, err + } + ipv6Addresses := iface.Ipv6Addresses + rand.Shuffle(len(ipv6Addresses), func(i, j int) { + ipv6Addresses[i], ipv6Addresses[j] = ipv6Addresses[j], ipv6Addresses[i] + }) + for _, ipAddress := range iface.Ipv6Addresses { + lock, err := mgr.tryAllocate(ctx, *ipAddress.Ipv6Address) + if err != nil { + ctx.Logger.Warning("Unable to do allocation: ", err) + return "", nil, err + } + if lock != nil { + lock.Bump() + return *ipAddress.Ipv6Address, lock, nil + } + } + return "", nil, errNoFreeIPAddressFound +} + +func (mgr *IPPoolManager) allocateIPv4(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) { configLock, err := mgr.lockConfiguration(ctx) if err != nil { ctx.Logger.Warning("Unable to get lock during allocation: ", err) @@ -96,11 +128,12 @@ func (mgr *IPPoolManager) allocate(ctx *context.VPCContext, batchSize int, ipRef } ip, lock, err := mgr.doAllocate(ctx) - // Did we successfully get an IP, or was there an error? - if err != nil || lock != nil { - if err != nil { - ctx.Logger.Warning("Unable to allocate IP: ", err) - } + if err == errNoFreeIPAddressFound { + + } else if err != nil { + ctx.Logger.WithError(err).Warning("Unable to allocate IP") + return ip, lock, err + } else if lock != nil { // We assume we only get a non-nil lock when we get a non-nil IP address return ip, lock, err } @@ -128,7 +161,7 @@ func (mgr *IPPoolManager) doAllocate(ctx *context.VPCContext) (string, *fslocker return ipAddress, lock, nil } } - return "", nil, nil + return "", nil, errNoFreeIPAddressFound } func (mgr *IPPoolManager) ipAddressesPath() string { diff --git a/vpc/allocate/setup_container_linux.go b/vpc/allocate/setup_container_linux.go index 314031746..4da2663f3 100644 --- a/vpc/allocate/setup_container_linux.go +++ b/vpc/allocate/setup_container_linux.go @@ -38,8 +38,8 @@ func doSetupContainer(parentCtx *context.VPCContext, netnsfd int, bandwidth uint return nil, err } - ip := net.ParseIP(allocation.IPV4Address) - + ip4 := net.ParseIP(allocation.IPV4Address) + ip6 := net.ParseIP(allocation.IPV6Address) parentLink, err := getLink(networkInterface) if err != nil { return nil, err @@ -82,10 +82,10 @@ func doSetupContainer(parentCtx *context.VPCContext, netnsfd int, bandwidth uint return nil, err } - return newLink, configureLink(parentCtx, nsHandle, newLink, bandwidth, mtu, burst, networkInterface, ip) + return newLink, configureLink(parentCtx, nsHandle, newLink, bandwidth, mtu, burst, networkInterface, ip4, ip6) } -func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link netlink.Link, bandwidth uint64, mtu int, burst bool, networkInterface *ec2wrapper.EC2NetworkInterface, ip net.IP) error { +func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link netlink.Link, bandwidth uint64, mtu int, burst bool, networkInterface *ec2wrapper.EC2NetworkInterface, ip4, ip6 net.IP) error { // Rename link err := nsHandle.LinkSetName(link, "eth0") if err != nil { @@ -104,7 +104,7 @@ func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link return err } - subnet, err := parentCtx.SubnetCache.DescribeSubnet(parentCtx, networkInterface.SubnetID) + subnet, err := parentCtx.Cache.DescribeSubnet(parentCtx, networkInterface.SubnetID) if err != nil { return err } @@ -116,19 +116,30 @@ func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link } // The netlink package appears to automatically calculate broadcast - newAddr := netlink.Addr{ - IPNet: &net.IPNet{IP: ip, Mask: ipnet.Mask}, + new4Addr := netlink.Addr{ + IPNet: &net.IPNet{IP: ip4, Mask: ipnet.Mask}, } - err = nsHandle.AddrAdd(link, &newAddr) + err = nsHandle.AddrAdd(link, &new4Addr) if err != nil { - parentCtx.Logger.Error("Unable to add addr to link: ", err) + parentCtx.Logger.Error("Unable to add IPv4 addr to link: ", err) return err } + if ip6 != nil { + // Amazon only gives out /128s + new6Addr := netlink.Addr{ + IPNet: &net.IPNet{IP: ip6, Mask: net.CIDRMask(128, 128)}, + } + err = nsHandle.AddrAdd(link, &new6Addr) + if err != nil { + parentCtx.Logger.Error("Unable to add IPv6 addr to link: ", err) + return err + } + } gateway := cidr.Inc(ipnet.IP) newRoute := netlink.Route{ Gw: gateway, - Src: ip, + Src: ip4, LinkIndex: link.Attrs().Index, } err = nsHandle.RouteAdd(&newRoute) @@ -137,7 +148,8 @@ func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link return err } - return setupIFBClasses(parentCtx, bandwidth, burst, ip) + // TODO: Wire up IFB / BPF / Bandwidth limits for IPv6 + return setupIFBClasses(parentCtx, bandwidth, burst, ip4) } func setupIFBClasses(parentCtx *context.VPCContext, bandwidth uint64, burst bool, ip net.IP) error { diff --git a/vpc/context/cache.go b/vpc/context/cache.go new file mode 100644 index 000000000..ac289b9b2 --- /dev/null +++ b/vpc/context/cache.go @@ -0,0 +1,231 @@ +package context + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/Netflix/titus-executor/fslocker" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" +) + +var ( + // ErrorSubnetNotFound indicates EC2 didn't return exactly one subnet + ErrorSubnetNotFound = errors.New("Subnet not found") + // ErrorSubnetCorrupted indicates the data that EC2 returned to us was invalid + ErrorSubnetCorrupted = errors.New("Subnet data corrupted") + // ErrorInterfaceCorrupted indicates the data that EC2 returned to us was invalid + ErrorInterfaceCorrupted = errors.New("Interface data corrupted") +) + +type cacheType string + +const ( + interfaceKey cacheType = "interfaces" + subnetKey = "subnets" +) + +// Cache is a state dir (/run, memory) backed cache +type Cache struct { + fslocker *fslocker.FSLocker + stateDir string +} + +func newCache(fslocker *fslocker.FSLocker, statePath string) (*Cache, error) { + sc := &Cache{ + fslocker: fslocker, + stateDir: statePath, + } + err := os.MkdirAll(sc.getPersistedPath(interfaceKey, ""), 0700) + if err != nil { + return nil, err + } + err = os.MkdirAll(sc.getPersistedPath(subnetKey, ""), 0700) + if err != nil { + return nil, err + } + return sc, nil +} + +// DescribeSubnet fetches the subnet from cache, or EC2, and automatically persists it to the persistent cache +func (sc *Cache) DescribeSubnet(parentCtx *VPCContext, subnetid string) (*ec2.Subnet, error) { + timeout := 30 * time.Second + ctx, cancel := parentCtx.WithField("subnetid", subnetid).WithTimeout(30 * time.Second) + defer cancel() + lockPath := fmt.Sprintf("subnets/%s", subnetid) + exclusiveLock, err := sc.fslocker.ExclusiveLock(lockPath, &timeout) + if err != nil { + ctx.Logger.Warning("Subnet cache unable to retrieve subnet information") + return nil, err + } + defer exclusiveLock.Unlock() + + subnet, err := sc.fetchSubnetFromCache(ctx, subnetid) + if err != nil { + return nil, err + } + if subnet != nil { + ctx.Logger.Info("Subnet successfully loaded from cache") + return subnet, err + } + subnet, err = sc.fetchSubnetFromEC2(ctx, subnetid) + if err != nil { + ctx.Logger.Info("Subnet successfully loaded from EC2") + return nil, err + } + sc.persistSubnetToCache(ctx, *subnet) + + return subnet, nil +} + +// DescribeInterface fetches the subnet from cache, or EC2, and automatically persists it to the persistent cache. +// since interfaces are mutable IT SHOULD BE USED WITH CARE, and should not be used to access mutable interface +// attributes +func (sc *Cache) DescribeInterface(parentCtx *VPCContext, eniID string) (*ec2.NetworkInterface, error) { + timeout := 30 * time.Second + ctx, cancel := parentCtx.WithField("eniID", eniID).WithTimeout(timeout) + defer cancel() + lockPath := fmt.Sprintf("interfaces/%s", eniID) + exclusiveLock, err := sc.fslocker.ExclusiveLock(lockPath, &timeout) + if err != nil { + ctx.Logger.Warning("Subnet cache unable to retrieve subnet information") + return nil, err + } + defer exclusiveLock.Unlock() + + iface, err := sc.fetchInterfaceFromCache(ctx, eniID) + if err != nil { + return nil, err + } + if iface != nil { + ctx.Logger.Info("Interface successfully loaded from cache") + return iface, err + } + iface, err = sc.fetchInterfaceFromEC2(ctx, eniID) + if err != nil { + ctx.Logger.Info("Subnet successfully loaded from EC2") + return nil, err + } + sc.persistInterfaceToCache(ctx, *iface) + + return iface, nil +} + +func (sc *Cache) fetchInterfaceFromCache(ctx *VPCContext, eniID string) (*ec2.NetworkInterface, error) { + path := sc.getPersistedPath(interfaceKey, eniID) + bytes, err := ioutil.ReadFile(path) + if os.IsNotExist(err) { + return nil, nil + } + + if err != nil { + return nil, err + } + + var iface ec2.NetworkInterface + err = json.Unmarshal(bytes, &iface) + if err != nil { + return nil, err + } + return &iface, nil +} + +func (sc *Cache) fetchSubnetFromCache(ctx *VPCContext, subnetid string) (*ec2.Subnet, error) { + path := sc.getPersistedPath(subnetKey, subnetid) + bytes, err := ioutil.ReadFile(path) + if os.IsNotExist(err) { + return nil, nil + } + + if err != nil { + return nil, err + } + + var subnet ec2.Subnet + err = json.Unmarshal(bytes, &subnet) + if err != nil { + return nil, err + } + return &subnet, nil +} + +func (sc *Cache) fetchSubnetFromEC2(ctx *VPCContext, subnetid string) (*ec2.Subnet, error) { + describeSubnetsInput := &ec2.DescribeSubnetsInput{ + SubnetIds: []*string{aws.String(subnetid)}, + } + subnetOutput, err := ec2.New(ctx.AWSSession).DescribeSubnetsWithContext(ctx, describeSubnetsInput) + if err != nil { + return nil, err + } + if len(subnetOutput.Subnets) != 1 { + return nil, ErrorSubnetNotFound + } + subnet := subnetOutput.Subnets[0] + if (*subnet.SubnetId) != subnetid || (*subnet.CidrBlock) == "" { + return nil, ErrorSubnetCorrupted + } + + return subnet, nil +} + +func (sc *Cache) fetchInterfaceFromEC2(ctx *VPCContext, eniID string) (*ec2.NetworkInterface, error) { + describeNetworkInterfacesInput := &ec2.DescribeNetworkInterfacesInput{ + NetworkInterfaceIds: []*string{aws.String(eniID)}, + } + describeNetworkInterfacesOutput, err := ec2.New(ctx.AWSSession).DescribeNetworkInterfacesWithContext(ctx, describeNetworkInterfacesInput) + if err != nil { + return nil, err + } + if len(describeNetworkInterfacesOutput.NetworkInterfaces) != 1 { + return nil, ErrorSubnetNotFound + } + iface := describeNetworkInterfacesOutput.NetworkInterfaces[0] + if (*iface.NetworkInterfaceId) != eniID || (*iface.MacAddress) == "" { + return nil, ErrorInterfaceCorrupted + } + + return iface, nil +} + +func (sc *Cache) persistInterfaceToCache(ctx *VPCContext, iface ec2.NetworkInterface) { + sc.persistToCache(ctx, interfaceKey, *iface.NetworkInterfaceId, iface) +} + +func (sc *Cache) persistSubnetToCache(ctx *VPCContext, subnet ec2.Subnet) { + if *subnet.State != "available" { + ctx.Logger.Warning("Not persisting subnet because not available") + return + } + sc.persistToCache(ctx, subnetKey, *subnet.SubnetId, subnet) +} + +func (sc *Cache) persistToCache(ctx *VPCContext, itemType cacheType, id string, item interface{}) { + // We should be holding an exclusive lock on the subnet ID at this point + path := sc.getPersistedPath(itemType, id) + bytes, err := json.Marshal(item) + if err != nil { + ctx.Logger.Error("Unable to marshal subnet for caching: ", err) + return + } + err = atomicWriteOnce(path, bytes) + if err != nil { + ctx.Logger.Error("Unable to write subnet data: ", err) + return + } + ctx.Logger.Info("Subnet successfully persisted to cache") +} + +func shouldClose(closer io.Closer) { + _ = closer.Close() +} + +func (sc *Cache) getPersistedPath(itemType cacheType, id string) string { + return filepath.Join(sc.stateDir, string(itemType), id) + +} diff --git a/vpc/context/subnet_cache_linux.go b/vpc/context/cache_linux.go similarity index 100% rename from vpc/context/subnet_cache_linux.go rename to vpc/context/cache_linux.go diff --git a/vpc/context/cache_test.go b/vpc/context/cache_test.go new file mode 100644 index 000000000..c23f47f2d --- /dev/null +++ b/vpc/context/cache_test.go @@ -0,0 +1,99 @@ +package context + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/Netflix/titus-executor/fslocker" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testSubnetName1 = "testsubnet" + testSubnetName2 = "not-available-subnet" + testInterfaceID1 = "test-id1" +) + +func TestSubnetCache(t *testing.T) { + testContext := newTestContext() + tempdir, err := ioutil.TempDir("", "subnet-cache") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(tempdir)) + }() + fsLockerDir := filepath.Join(tempdir, "fslocker") + cacheDir := filepath.Join(tempdir, "cache") + fsl, err := fslocker.NewFSLocker(fsLockerDir) + require.NoError(t, err) + require.NoError(t, os.MkdirAll(cacheDir, 0700)) + subnetCache, err := newCache(fsl, cacheDir) + require.NoError(t, err) + subnet1, err := subnetCache.fetchSubnetFromCache(testContext, testSubnetName1) + assert.NoError(t, err) + assert.Nil(t, subnet1) + + // Persist and retrieve subnet + fakeSubnet1 := ec2.Subnet{ + AvailabilityZone: aws.String("us-east-1a"), + CidrBlock: aws.String("1.2.3.0/24"), + SubnetId: aws.String(testSubnetName1), + AvailableIpAddressCount: aws.Int64(32), + State: aws.String("available"), + } + subnetCache.persistSubnetToCache(testContext, fakeSubnet1) + + subnet2, err := subnetCache.fetchSubnetFromCache(testContext, testSubnetName1) + assert.NoError(t, err) + assert.EqualValues(t, fakeSubnet1.AvailabilityZone, subnet2.AvailabilityZone) + assert.EqualValues(t, fakeSubnet1.CidrBlock, subnet2.CidrBlock) + assert.EqualValues(t, fakeSubnet1.SubnetId, subnet2.SubnetId) + assert.EqualValues(t, fakeSubnet1.AvailableIpAddressCount, subnet2.AvailableIpAddressCount) + assert.EqualValues(t, fakeSubnet1.State, subnet2.State) + + // Make sure we don't persist non-available subnets + fakeSubnet2 := ec2.Subnet{ + State: aws.String("not-available"), + } + subnetCache.persistSubnetToCache(testContext, fakeSubnet2) + + subnet3, err := subnetCache.fetchSubnetFromCache(testContext, testSubnetName2) + assert.NoError(t, err) + assert.Nil(t, subnet3) +} + +func TestInterfaceCache(t *testing.T) { + testContext := newTestContext() + tempdir, err := ioutil.TempDir("", "interface-cache") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(tempdir)) + }() + fsLockerDir := filepath.Join(tempdir, "fslocker") + cacheDir := filepath.Join(tempdir, "cache") + fsl, err := fslocker.NewFSLocker(fsLockerDir) + require.NoError(t, err) + require.NoError(t, os.MkdirAll(cacheDir, 0700)) + subnetCache, err := newCache(fsl, cacheDir) + require.NoError(t, err) + iface1, err := subnetCache.fetchSubnetFromCache(testContext, testInterfaceID1) + assert.NoError(t, err) + assert.Nil(t, iface1) + + // Persist and retrieve subnet + fakeInterface1 := ec2.NetworkInterface{ + MacAddress: aws.String("abc"), + NetworkInterfaceId: aws.String(testInterfaceID1), + } + subnetCache.persistInterfaceToCache(testContext, fakeInterface1) + + iface, err := subnetCache.fetchInterfaceFromCache(testContext, testInterfaceID1) + assert.NoError(t, err) + assert.EqualValues(t, iface.MacAddress, fakeInterface1.MacAddress) + assert.EqualValues(t, iface.NetworkInterfaceId, fakeInterface1.NetworkInterfaceId) + +} diff --git a/vpc/context/subnet_cache_unsupported.go b/vpc/context/cache_unsupported.go similarity index 100% rename from vpc/context/subnet_cache_unsupported.go rename to vpc/context/cache_unsupported.go diff --git a/vpc/context/context.go b/vpc/context/context.go index 6749a7390..e6da49cf5 100644 --- a/vpc/context/context.go +++ b/vpc/context/context.go @@ -36,7 +36,7 @@ type VPCContext struct { Logger *logrus.Entry InstanceType string InstanceID string - SubnetCache *SubnetCache + Cache *Cache } func newVPCContext(cliContext *cli.Context) (*VPCContext, error) { @@ -84,13 +84,15 @@ func newVPCContext(cliContext *cli.Context) (*VPCContext, error) { } ret.FSLocker = locker - subnetCachingDirectory := filepath.Join(stateDir, "subnets") - err = os.MkdirAll(subnetCachingDirectory, 0700) + cachingDirectory := filepath.Join(stateDir, "cache") + err = os.MkdirAll(cachingDirectory, 0700) + if err != nil { + return nil, err + } + ret.Cache, err = newCache(locker, cachingDirectory) if err != nil { return nil, err } - ret.SubnetCache = newSubnetCache(locker, subnetCachingDirectory) - return ret, nil } diff --git a/vpc/context/subnet_cache.go b/vpc/context/subnet_cache.go deleted file mode 100644 index 285f1668b..000000000 --- a/vpc/context/subnet_cache.go +++ /dev/null @@ -1,130 +0,0 @@ -package context - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "time" - - "github.com/Netflix/titus-executor/fslocker" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/ec2" -) - -var ( - // ErrorSubnetNotFound indicates EC2 didn't return exactly one subnet - ErrorSubnetNotFound = errors.New("Subnet not found") - // ErrorSubnetCorrupted indicates the data that EC2 returned to us was invalid - ErrorSubnetCorrupted = errors.New("Subnet data corrupted") -) - -// SubnetCache is a state dir (/run, memory) backed cache -type SubnetCache struct { - fslocker *fslocker.FSLocker - stateDir string -} - -func newSubnetCache(fslocker *fslocker.FSLocker, statePath string) *SubnetCache { - return &SubnetCache{ - fslocker: fslocker, - stateDir: statePath, - } -} - -// DescribeSubnet fetches the subnet from cache, or EC2, and automatically persists it to the persistent cache -func (sc *SubnetCache) DescribeSubnet(parentCtx *VPCContext, subnetid string) (*ec2.Subnet, error) { - timeout := 30 * time.Second - ctx, cancel := parentCtx.WithField("subnetid", subnetid).WithTimeout(30 * time.Second) - defer cancel() - lockPath := fmt.Sprintf("subnets/%s", subnetid) - exclusiveLock, err := sc.fslocker.ExclusiveLock(lockPath, &timeout) - if err != nil { - ctx.Logger.Warning("Subnet cache unable to retrieve subnet information") - return nil, err - } - defer exclusiveLock.Unlock() - - subnet, err := sc.fetchFromCache(ctx, subnetid) - if err != nil { - return nil, err - } - if subnet != nil { - ctx.Logger.Info("Subnet successfully loaded from cache") - return subnet, err - } - subnet, err = sc.fetchFromEC2(ctx, subnetid) - if err != nil { - ctx.Logger.Info("Subnet successfully loaded from EC2") - return nil, err - } - sc.persistToCache(ctx, *subnet) - - return subnet, nil -} - -func (sc *SubnetCache) fetchFromCache(ctx *VPCContext, subnetid string) (*ec2.Subnet, error) { - path := filepath.Join(sc.stateDir, subnetid) - bytes, err := ioutil.ReadFile(path) - if os.IsNotExist(err) { - return nil, nil - } - - if err != nil { - return nil, err - } - - var subnet ec2.Subnet - err = json.Unmarshal(bytes, &subnet) - if err != nil { - return nil, err - } - return &subnet, nil -} - -func (sc *SubnetCache) fetchFromEC2(ctx *VPCContext, subnetid string) (*ec2.Subnet, error) { - describeSubnetsInput := &ec2.DescribeSubnetsInput{ - SubnetIds: []*string{aws.String(subnetid)}, - } - subnetOutput, err := ec2.New(ctx.AWSSession).DescribeSubnetsWithContext(ctx, describeSubnetsInput) - if err != nil { - return nil, err - } - if len(subnetOutput.Subnets) != 1 { - return nil, ErrorSubnetNotFound - } - subnet := subnetOutput.Subnets[0] - if (*subnet.SubnetId) != subnetid || (*subnet.CidrBlock) == "" { - return nil, ErrorSubnetCorrupted - } - - return subnet, nil -} - -func (sc *SubnetCache) persistToCache(ctx *VPCContext, subnet ec2.Subnet) { - if *subnet.State != "available" { - ctx.Logger.Warning("Not persisting subnet because not available") - return - } - - // We should be holding an exclusive lock on the subnet ID at this point - path := filepath.Join(sc.stateDir, *subnet.SubnetId) - bytes, err := json.Marshal(subnet) - if err != nil { - ctx.Logger.Error("Unable to marshal subnet for caching: ", err) - return - } - err = atomicWriteOnce(path, bytes) - if err != nil { - ctx.Logger.Error("Unable to write subnet data: ", err) - return - } - ctx.Logger.Info("Subnet successfully persisted to cache") -} - -func shouldClose(closer io.Closer) { - _ = closer.Close() -} diff --git a/vpc/context/subnet_cache_test.go b/vpc/context/subnet_cache_test.go deleted file mode 100644 index d544eef5b..000000000 --- a/vpc/context/subnet_cache_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package context - -import ( - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/Netflix/titus-executor/fslocker" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/ec2" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const testSubnetName1 = "testsubnet" -const testSubnetName2 = "not-available-subnet" - -func TestSubnetCache(t *testing.T) { - testContext := newTestContext() - tempdir, err := ioutil.TempDir("", "subnet-cache") - require.NoError(t, err) - defer func() { - require.NoError(t, os.RemoveAll(tempdir)) - }() - fsLockerDir := filepath.Join(tempdir, "fslocker") - cacheDir := filepath.Join(tempdir, "cache") - fsl, err := fslocker.NewFSLocker(fsLockerDir) - require.NoError(t, err) - require.NoError(t, os.MkdirAll(cacheDir, 0700)) - subnetCache := newSubnetCache(fsl, cacheDir) - subnet1, err := subnetCache.fetchFromCache(testContext, testSubnetName1) - assert.NoError(t, err) - assert.Nil(t, subnet1) - - // Persist and retrieve subnet - fakeSubnet1 := ec2.Subnet{ - AvailabilityZone: aws.String("us-east-1a"), - CidrBlock: aws.String("1.2.3.0/24"), - SubnetId: aws.String(testSubnetName1), - AvailableIpAddressCount: aws.Int64(32), - State: aws.String("available"), - } - subnetCache.persistToCache(testContext, fakeSubnet1) - - subnet2, err := subnetCache.fetchFromCache(testContext, testSubnetName1) - assert.NoError(t, err) - assert.EqualValues(t, fakeSubnet1.AvailabilityZone, subnet2.AvailabilityZone) - assert.EqualValues(t, fakeSubnet1.CidrBlock, subnet2.CidrBlock) - assert.EqualValues(t, fakeSubnet1.SubnetId, subnet2.SubnetId) - assert.EqualValues(t, fakeSubnet1.AvailableIpAddressCount, subnet2.AvailableIpAddressCount) - assert.EqualValues(t, fakeSubnet1.State, subnet2.State) - - // Make sure we don't persist non-available subnets - fakeSubnet2 := ec2.Subnet{ - State: aws.String("not-available"), - } - subnetCache.persistToCache(testContext, fakeSubnet2) - - subnet3, err := subnetCache.fetchFromCache(testContext, testSubnetName2) - assert.NoError(t, err) - assert.Nil(t, subnet3) -} diff --git a/vpc/limits.go b/vpc/limits.go index f6a2e3dad..478b1158d 100644 --- a/vpc/limits.go +++ b/vpc/limits.go @@ -220,6 +220,11 @@ func GetMaxIPv4Addresses(instanceType string) int { return getLimits(instanceType).ipAddressesPerInterface } +// GetMaxIPv6Addresses returns the maximum number of IPv6 addresses that this instance type can handle per interface +func GetMaxIPv6Addresses(instanceType string) int { + return getLimits(instanceType).ip6AddressesPerInterface +} + // GetMaxNetworkMbps returns the maximum network throughput in Megabits per second that this instance type can handle func GetMaxNetworkMbps(instanceType string) int { return getLimits(instanceType).networkThroughput diff --git a/vpc/setup/setup.go b/vpc/setup/setup.go index 6cb1f5d38..abef639c7 100644 --- a/vpc/setup/setup.go +++ b/vpc/setup/setup.go @@ -111,9 +111,9 @@ func attachInterfaceAtIdx(ctx *context.VPCContext, instanceID, subnetID string, svc := ec2.New(ctx.AWSSession) createNetworkInterfaceInput := &ec2.CreateNetworkInterfaceInput{ - Description: aws.String(NetworkInterfaceDescription), - SubnetId: aws.String(subnetID), - // Ipv6AddressCount: aws.Int64(int64(getLimits(instanceType).ip6AddressesPerInterface)), + Description: aws.String(NetworkInterfaceDescription), + SubnetId: aws.String(subnetID), + Ipv6AddressCount: aws.Int64(int64(vpc.GetMaxIPv6Addresses(ctx.InstanceType))), } createNetworkInterfaceResult, err := svc.CreateNetworkInterfaceWithContext(ctx, createNetworkInterfaceInput) if err != nil { diff --git a/vpc/types/types.go b/vpc/types/types.go index e7368320c..b4db29bc3 100644 --- a/vpc/types/types.go +++ b/vpc/types/types.go @@ -5,6 +5,7 @@ import "errors" // Allocation is the public interface exposed when we allocate a namespace type Allocation struct { IPV4Address string `json:"ipv4Address"` + IPV6Address string `json:"ipv6Address"` DeviceIndex int `json:"deviceIndex"` Success bool `json:"success"` Error string `json:"error"`