Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Use Describe Network Interfaces instead of the IMDS
Browse files Browse the repository at this point in the history
This moves from using the IMDS for (relatively) dynamic information
like what interfaces are attached / available, to using the Describe*
(DescribeInstances, DescribeNetworkInterfaces) calls.
  • Loading branch information
sargun committed Mar 13, 2019
1 parent 270b06c commit 8fae8a6
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 134 deletions.
55 changes: 20 additions & 35 deletions vpc/allocate/allocate_network.go
Expand Up @@ -23,7 +23,6 @@ import (
)

var (
errInterfaceNotFoundAtIndex = errors.New("Network interface not found at index")
errSecurityGroupsNotConverged = errors.New("Security groups for interface not converged")
)

Expand Down Expand Up @@ -153,11 +152,14 @@ func allocateNetwork(parentCtx *context.VPCContext) error {
err = json.NewEncoder(os.Stdout).
Encode(
types.Allocation{
IPV4Address: allocation.ip4Address,
IPV6Address: allocation.ip6Address,
DeviceIndex: deviceIdx,
Success: true,
ENI: allocation.eni})
IPV4Address: allocation.ip4Address,
IPV6Address: allocation.ip6Address,
DeviceIndex: deviceIdx,
Success: true,
ENI: allocation.eni,
ENIMACAddress: allocation.macAddress,
SubnetID: allocation.subnetID,
})
if err != nil {
return cli.NewMultiError(cli.NewExitError("Unable to write allocation record", 1), err)
}
Expand Down Expand Up @@ -193,6 +195,8 @@ type allocation struct {
ip4Address string
ip6Address string
eni string
macAddress string
subnetID string
}

func (a *allocation) refresh() error {
Expand All @@ -212,29 +216,23 @@ func (a *allocation) deallocate(ctx *context.VPCContext) {
}

func getDefaultSecurityGroups(parentCtx *context.VPCContext) (map[string]struct{}, error) {
primaryInterfaceMac, err := parentCtx.EC2metadataClientWrapper.PrimaryInterfaceMac()
if err != nil {
return nil, err
}
primaryInterface, err := parentCtx.EC2metadataClientWrapper.GetInterface(primaryInterfaceMac)
if err != nil {
return nil, err
}
return primaryInterface.GetSecurityGroupIds(), nil
return parentCtx.EC2metadataClientWrapper.PrimaryInterfaceSecurityGroups()
}

func doAllocateNetwork(parentCtx *context.VPCContext, deviceIdx, batchSize int, securityGroups map[string]struct{}, securityConvergenceTimeout, waitForSgLockTimeout, ipRefreshTimeout time.Duration, allocateIPv6Address bool) (*allocation, error) {
// 1. Ensure security groups are setup
ctx, cancel := parentCtx.WithTimeout(5 * time.Minute)
defer cancel()

networkInterface, err := getInterfaceByIdx(ctx, deviceIdx)
networkInterface, err := parentCtx.EC2metadataClientWrapper.GetInterfaceByIdx(ctx, parentCtx.AWSSession, deviceIdx)
if err != nil {
ctx.Logger.Warning("Unable to get interface by idx: ", err)
return nil, err
}
allocation := &allocation{
eni: networkInterface.GetInterfaceID(),
eni: networkInterface.GetInterfaceID(),
macAddress: networkInterface.GetMAC(),
subnetID: networkInterface.GetSubnetID(),
}
allocation.sharedSGLock, err = setupSecurityGroups(ctx, networkInterface, securityGroups, securityConvergenceTimeout, waitForSgLockTimeout)
if err != nil {
Expand Down Expand Up @@ -334,7 +332,7 @@ func setupSecurityGroups(ctx *context.VPCContext, networkInterface ec2wrapper.Ne
if reflect.DeepEqual(securityGroups, networkInterface.GetSecurityGroupIds()) {
return sgConfigurationLock, nil
}
err = networkInterface.Refresh()
err = networkInterface.Refresh(ctx, ctx.AWSSession)
if err != nil {
sgConfigurationLock.Unlock()
return nil, err
Expand All @@ -354,32 +352,19 @@ func setupSecurityGroups(ctx *context.VPCContext, networkInterface ec2wrapper.Ne

func waitForSecurityGroupToConverge(ctx *context.VPCContext, networkInterface ec2wrapper.NetworkInterface, securityGroups map[string]struct{}, securityConvergenceTimeout time.Duration) error {
now := time.Now()
i := 0
for time.Since(now) < securityConvergenceTimeout {
err := networkInterface.Refresh()
err := networkInterface.Refresh(ctx, ctx.AWSSession)
i++
if err != nil {
ctx.Logger.Warning("Unable to refresh interface while waiting for security group change, bailing: ", err)
return err
}
if reflect.DeepEqual(securityGroups, networkInterface.GetSecurityGroupIds()) {
ctx.Logger.WithField("duration", time.Since(now).String()).Info("Changed security groups successfully")
ctx.Logger.WithField("duration", time.Since(now).String()).WithField("refreshes", i).Info("Changed security groups successfully")
return nil
}
time.Sleep(time.Second)
}
return errSecurityGroupsNotConverged
}

func getInterfaceByIdx(parentCtx *context.VPCContext, idx int) (ec2wrapper.NetworkInterface, error) {
allInterfaces, err := parentCtx.EC2metadataClientWrapper.Interfaces()
if err != nil {
return nil, err
}

for _, networkInterface := range allInterfaces {
if networkInterface.GetDeviceNumber() == idx {
return networkInterface, nil
}
}

return nil, errInterfaceNotFoundAtIndex
}
33 changes: 18 additions & 15 deletions vpc/allocate/ip_pool_manager.go
Expand Up @@ -39,9 +39,10 @@ func (mgr *IPPoolManager) lockConfiguration(parentCtx *context.VPCContext) (*fsl
return parentCtx.FSLocker.ExclusiveLock(path, &timeout)
}

func assignMoreIPs(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration, networkInterface ec2wrapper.NetworkInterface, ipAssignmentFunction ec2wrapper.IPAssignmentFunction, ipFetchFunction ec2wrapper.IPFetchFunction) error {
// returns number of refreshes done, and error
func assignMoreIPs(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration, networkInterface ec2wrapper.NetworkInterface, ipAssignmentFunction ec2wrapper.IPAssignmentFunction, ipFetchFunction ec2wrapper.IPFetchFunction) (int, error) {
if len(ipFetchFunction()) >= vpc.GetMaxIPAddresses(ctx.InstanceType) {
return errMaxIPAddressesAllocated
return 0, errMaxIPAddressesAllocated
}

if len(ipFetchFunction())+batchSize > vpc.GetMaxIPAddresses(ctx.InstanceType) {
Expand All @@ -54,27 +55,29 @@ func assignMoreIPs(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time

if err := ipAssignmentFunction(ctx, ctx.AWSSession, batchSize); err != nil {
ctx.Logger.Warning("Unable to assign IPs from AWS: ", err)
return err
return 0, err
}

now := time.Now()
i := 0
for time.Since(now) < ipRefreshTimeout {
if err := networkInterface.Refresh(); err != nil {
return err
if err := networkInterface.Refresh(ctx, ctx.AWSSession); err != nil {
return i, err
}
i++

newIPSet := ec2wrapper.GetIPAddressesAsSet(ipFetchFunction)

if len(newIPSet.Difference(originalIPSet).ToSlice()) > 0 {
// Retry allocating an IP Address from the pool, now that the metadata service says that we have at
// least one more IP available from EC2
return nil
return i, nil
}
time.Sleep(time.Second)
}

ctx.Logger.Warning("Refreshed allocations seconds failed")
return errIPRefreshFailed
return i, errIPRefreshFailed
}

func (mgr *IPPoolManager) allocateIPv6(ctx *context.VPCContext, batchSize int, ipRefreshTimeout time.Duration) (string, *fslocker.ExclusiveLock, error) {
Expand All @@ -93,7 +96,7 @@ func (mgr *IPPoolManager) allocateIP(ctx *context.VPCContext, batchSize int, ipR
}
defer configLock.Unlock()

err = mgr.networkInterface.Refresh()
err = mgr.networkInterface.Refresh(ctx, ctx.AWSSession)
if err != nil {
ctx.Logger.Warning("Unable to refresh interface before attempting to do allocate: ", err)
return "", nil, err
Expand All @@ -109,12 +112,12 @@ func (mgr *IPPoolManager) allocateIP(ctx *context.VPCContext, batchSize int, ipR
} else if err == errNoFreeIPAddressFound {
ctx.Logger.Info("No free IP addresses available, trying to assign more")
now := time.Now()
err = assignMoreIPs(ctx, batchSize, ipRefreshTimeout, mgr.networkInterface, ipAssignmentFunction, ipFetchFunction)
n, err := assignMoreIPs(ctx, batchSize, ipRefreshTimeout, mgr.networkInterface, ipAssignmentFunction, ipFetchFunction)
if err != nil {
ctx.Logger.WithField("duration", time.Since(now)).WithError(err).Warning("Unable assign more IPs")
return "", nil, err
}
ctx.Logger.WithField("duration", time.Since(now)).Info("Successfully completed IP allocation")
ctx.Logger.WithField("duration", time.Since(now)).WithField("refreshes", n).Info("Successfully completed IP allocation")
return mgr.doAllocate(ctx, ipFetchFunction)
}

Expand Down Expand Up @@ -288,12 +291,12 @@ func (mgr *IPPoolManager) doFileCleanup(parentCtx *context.VPCContext, deallocat
return nil
}

func (mgr *IPPoolManager) ipsFreed(parentCtx *context.VPCContext, oldIPList, deallocationList []string) bool {
func (mgr *IPPoolManager) ipsFreed(ctx *context.VPCContext, oldIPList, deallocationList []string) bool {
successCount := 0
for i := 0; i < 180; i++ {
err := mgr.networkInterface.Refresh()
err := mgr.networkInterface.Refresh(ctx, ctx.AWSSession)
if err != nil {
parentCtx.Logger.Error("Could not refresh IPs: ", err)
ctx.Logger.Error("Could not refresh IPs: ", err)
} else {
allocMap := make(map[string]struct{})
for _, ip := range mgr.networkInterface.GetIPv4Addresses() {
Expand All @@ -307,13 +310,13 @@ func (mgr *IPPoolManager) ipsFreed(parentCtx *context.VPCContext, oldIPList, dea
}
}
if missingIPs > 0 {
parentCtx.Logger.Infof("%d IPs successfully freed; intended to free: %d", missingIPs, len(deallocationList))
ctx.Logger.Infof("%d IPs successfully freed; intended to free: %d", missingIPs, len(deallocationList))
successCount++
if successCount > 3 {
return true
}
} else {
parentCtx.Logger.Info("Resetting freed success count to 0")
ctx.Logger.Info("Resetting freed success count to 0")
// Reset the success count
successCount = 0
}
Expand Down
2 changes: 1 addition & 1 deletion vpc/allocate/ip_pool_manager_test.go
Expand Up @@ -82,7 +82,7 @@ func (tni *testNetworkInterface) GetIPv4Addresses() []string {
return tni.ipv4Addresses
}

func (tni *testNetworkInterface) Refresh() error {
func (tni *testNetworkInterface) Refresh(ctx context.Context, session client.ConfigProvider) error {
if tni.refresh != nil {
return tni.refresh()
}
Expand Down
24 changes: 8 additions & 16 deletions vpc/allocate/setup_container_linux.go
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/Netflix/titus-executor/vpc"
"github.com/Netflix/titus-executor/vpc/context"
"github.com/Netflix/titus-executor/vpc/ec2wrapper"
"github.com/Netflix/titus-executor/vpc/types"
"github.com/apparentlymart/go-cidr/cidr"
"github.com/vishvananda/netlink"
Expand All @@ -33,16 +32,9 @@ var (
)

func doSetupContainer(parentCtx *context.VPCContext, netnsfd int, bandwidth uint64, burst, jumbo bool, allocation types.Allocation) (netlink.Link, error) {

networkInterface, err := getInterfaceByIdx(parentCtx, allocation.DeviceIndex)
if err != nil {
parentCtx.Logger.Error("Cannot get interface by index: ", err)
return nil, err
}

ip4 := net.ParseIP(allocation.IPV4Address)
ip6 := net.ParseIP(allocation.IPV6Address)
parentLink, err := getLink(networkInterface)
parentLink, err := getLink(allocation.ENIMACAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -85,11 +77,11 @@ func doSetupContainer(parentCtx *context.VPCContext, netnsfd int, bandwidth uint
return nil, err
}

return newLink, configureLink(parentCtx, nsHandle, newLink, bandwidth, mtu, burst, networkInterface, ip4, ip6)
return newLink, configureLink(parentCtx, nsHandle, newLink, bandwidth, mtu, burst, allocation.SubnetID, ip4, ip6)
}

func addIPv4AddressAndRoute(parentCtx *context.VPCContext, networkInterface ec2wrapper.NetworkInterface, nsHandle *netlink.Handle, link netlink.Link, ip net.IP) error {
subnet, err := parentCtx.Cache.DescribeSubnet(parentCtx, networkInterface.GetSubnetID())
func addIPv4AddressAndRoute(parentCtx *context.VPCContext, subnetId string, nsHandle *netlink.Handle, link netlink.Link, ip net.IP) error {
subnet, err := parentCtx.Cache.DescribeSubnet(parentCtx, subnetId)
if err != nil {
return err
}
Expand Down Expand Up @@ -125,7 +117,7 @@ func addIPv4AddressAndRoute(parentCtx *context.VPCContext, networkInterface ec2w
return nil
}

func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link netlink.Link, bandwidth uint64, mtu int, burst bool, networkInterface ec2wrapper.NetworkInterface, ip4, ip6 net.IP) error {
func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link netlink.Link, bandwidth uint64, mtu int, burst bool, subnetId string, ip4, ip6 net.IP) error {
// Rename link
err := nsHandle.LinkSetName(link, "eth0")
if err != nil {
Expand Down Expand Up @@ -156,7 +148,7 @@ func configureLink(parentCtx *context.VPCContext, nsHandle *netlink.Handle, link
}
}

err = addIPv4AddressAndRoute(parentCtx, networkInterface, nsHandle, link, ip4)
err = addIPv4AddressAndRoute(parentCtx, subnetId, nsHandle, link, ip4)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,12 +282,12 @@ func setupIFBClass(parentCtx *context.VPCContext, bandwidth uint64, burst bool,
return nil
}

func getLink(networkInterface ec2wrapper.NetworkInterface) (netlink.Link, error) {
func getLink(macAddress string) (netlink.Link, error) {
links, err := netlink.LinkList()
if err != nil {
return nil, err
}
mac, err := net.ParseMAC(networkInterface.GetMAC())
mac, err := net.ParseMAC(macAddress)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8fae8a6

Please sign in to comment.