-
Notifications
You must be signed in to change notification settings - Fork 0
/
awsvpc.go
247 lines (216 loc) · 7.13 KB
/
awsvpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package tracker
// The AWSVPC tracker tracks the IPAM ring changes and accordingly updates
// the AWS VPC route table and, if the announced change is local
// (denoted by local=true) to the host, the host route table.
//
// During the initialization, the tracker detects AWS VPC route table id which
// is associated with the host's subnet on the AWS network. If such a table does
// not exist, the default VPC route table is used.
//
// When a host A donates a range to a host B, the necessary route table
// updates (removal) happen on the host A first, and afterwards on
// the host B (installation).
//
// NB: there is a hard limit for 50 routes within any VPC route table
// (practically, it is 48, because one route is used by the AWS Internet GW and
// one by the AWS host subnet), therefore it is suggested to avoid
// an excessive fragmentation within the IPAM ring which might happen due to
// the claim operations or uneven distribution of containers across the hosts.
import (
"fmt"
"net"
"syscall"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/vishvananda/netlink"
"github.com/weaveworks/weave/common"
"github.com/weaveworks/weave/net/address"
)
type AWSVPCTracker struct {
ec2 *ec2.EC2
instanceID string // EC2 Instance ID
routeTableID string // VPC Route Table ID
linkIndex int // The weave bridge link index
}
// NewAWSVPCTracker creates and initialises AWS VPC based tracker.
func NewAWSVPCTracker(bridgeName string) (*AWSVPCTracker, error) {
var (
err error
session = session.New()
t = &AWSVPCTracker{}
)
// Detect region and instance id
meta := ec2metadata.New(session)
t.instanceID, err = meta.GetMetadata("instance-id")
if err != nil {
return nil, fmt.Errorf("cannot detect instance-id: %s", err)
}
region, err := meta.Region()
if err != nil {
return nil, fmt.Errorf("cannot detect region: %s", err)
}
t.ec2 = ec2.New(session, aws.NewConfig().WithRegion(region))
routeTableID, err := t.detectRouteTableID()
if err != nil {
return nil, err
}
t.routeTableID = *routeTableID
// Detect Weave bridge link index
link, err := netlink.LinkByName(bridgeName)
if err != nil {
return nil, fmt.Errorf("cannot find \"%s\" interface: %s", bridgeName, err)
}
t.linkIndex = link.Attrs().Index
t.infof("AWSVPC has been initialized on %s instance for %s route table at %s region",
t.instanceID, t.routeTableID, region)
return t, nil
}
// HandleUpdate method updates the AWS VPC and the host route tables.
func (t *AWSVPCTracker) HandleUpdate(prevRanges, currRanges []address.Range, local bool) error {
t.debugf("replacing %q by %q; local(%t)", prevRanges, currRanges, local)
prev, curr := RemoveCommon(address.NewCIDRs(Merge(prevRanges)), address.NewCIDRs(Merge(currRanges)))
// It might make sense to do the removal first and then add entries
// because of the 50 routes limit. However, in such case a container might
// not be reachable for a short period of time which is not a desired behavior.
// Add new entries
for _, cidr := range curr {
cidrStr := cidr.String()
t.debugf("adding route %s to %s", cidrStr, t.instanceID)
if _, err := t.createVPCRoute(cidrStr); err != nil {
return fmt.Errorf("createVPCRoutes failed: %s", err)
}
if local {
if err := t.createHostRoute(cidrStr); err != nil {
if errno, ok := err.(syscall.Errno); !(ok && errno == syscall.EEXIST) {
return fmt.Errorf("createHostRoute failed: %s", err)
}
}
}
}
// Remove obsolete entries
for _, cidr := range prev {
cidrStr := cidr.String()
t.debugf("removing %s route", cidrStr)
if _, err := t.deleteVPCRoute(cidrStr); err != nil {
return fmt.Errorf("deleteVPCRoute failed: %s", err)
}
if local {
if err := t.deleteHostRoute(cidrStr); err != nil {
return fmt.Errorf("deleteHostRoute failed: %s", err)
}
}
}
return nil
}
func (t *AWSVPCTracker) String() string {
return "awsvpc"
}
func (t *AWSVPCTracker) createVPCRoute(cidr string) (*ec2.CreateRouteOutput, error) {
route := &ec2.CreateRouteInput{
RouteTableId: &t.routeTableID,
InstanceId: &t.instanceID,
DestinationCidrBlock: &cidr,
}
return t.ec2.CreateRoute(route)
}
func (t *AWSVPCTracker) createHostRoute(cidr string) error {
dst, err := parseCIDR(cidr)
if err != nil {
return err
}
route := &netlink.Route{
LinkIndex: t.linkIndex,
Dst: dst,
Scope: netlink.SCOPE_LINK,
}
return netlink.RouteAdd(route)
}
func (t *AWSVPCTracker) deleteVPCRoute(cidr string) (*ec2.DeleteRouteOutput, error) {
route := &ec2.DeleteRouteInput{
RouteTableId: &t.routeTableID,
DestinationCidrBlock: &cidr,
}
return t.ec2.DeleteRoute(route)
}
func (t *AWSVPCTracker) deleteHostRoute(cidr string) error {
dst, err := parseCIDR(cidr)
if err != nil {
return err
}
route := &netlink.Route{
LinkIndex: t.linkIndex,
Dst: dst,
Scope: netlink.SCOPE_LINK,
}
return netlink.RouteDel(route)
}
// detectRouteTableID detects AWS VPC Route Table ID of the given tracker instance.
func (t *AWSVPCTracker) detectRouteTableID() (*string, error) {
instancesParams := &ec2.DescribeInstancesInput{
InstanceIds: []*string{aws.String(t.instanceID)},
}
instancesResp, err := t.ec2.DescribeInstances(instancesParams)
if err != nil {
return nil, fmt.Errorf("DescribeInstances failed: %s", err)
}
if len(instancesResp.Reservations) == 0 ||
len(instancesResp.Reservations[0].Instances) == 0 {
return nil, fmt.Errorf("cannot find %s instance within reservations", t.instanceID)
}
vpcID := instancesResp.Reservations[0].Instances[0].VpcId
subnetID := instancesResp.Reservations[0].Instances[0].SubnetId
// First try to find a routing table associated with the subnet of the instance
tablesParams := &ec2.DescribeRouteTablesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("association.subnet-id"),
Values: []*string{subnetID},
},
},
}
tablesResp, err := t.ec2.DescribeRouteTables(tablesParams)
if err != nil {
return nil, fmt.Errorf("DescribeRouteTables failed: %s", err)
}
if len(tablesResp.RouteTables) != 0 {
return tablesResp.RouteTables[0].RouteTableId, nil
}
// Fallback to the default routing table
tablesParams = &ec2.DescribeRouteTablesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("association.main"),
Values: []*string{aws.String("true")},
},
{
Name: aws.String("vpc-id"),
Values: []*string{vpcID},
},
},
}
tablesResp, err = t.ec2.DescribeRouteTables(tablesParams)
if err != nil {
return nil, fmt.Errorf("DescribeRouteTables failed: %s", err)
}
if len(tablesResp.RouteTables) != 0 {
return tablesResp.RouteTables[0].RouteTableId, nil
}
return nil, fmt.Errorf("cannot find routetable for %s instance", t.instanceID)
}
func (t *AWSVPCTracker) debugf(fmt string, args ...interface{}) {
common.Log.Debugf("[tracker] "+fmt, args...)
}
func (t *AWSVPCTracker) infof(fmt string, args ...interface{}) {
common.Log.Infof("[tracker] "+fmt, args...)
}
// Helpers
func parseCIDR(cidr string) (*net.IPNet, error) {
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, err
}
ipnet.IP = ip
return ipnet, nil
}