Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
WIP discrete etcd cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
colhom committed Sep 8, 2016
1 parent de727bc commit bbe383e
Show file tree
Hide file tree
Showing 13 changed files with 676 additions and 80 deletions.
1 change: 1 addition & 0 deletions multi-node/aws/cmd/kube-aws/command_render.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func runCmdRender(cmd *cobra.Command, args []string) error {
{"credentials/.gitignore", []byte("*"), 0644},
{"userdata/cloud-config-controller", config.CloudConfigController, 0644},
{"userdata/cloud-config-worker", config.CloudConfigWorker, 0644},
{"userdata/cloud-config-etcd", config.CloudConfigEtcd, 0644},
{"stack-template.json", config.StackTemplateTemplate, 0644},
{"kubeconfig", kubeconfig.Bytes(), 0600},
}
Expand Down
1 change: 1 addition & 0 deletions multi-node/aws/cmd/kube-aws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var stackTemplateOptions = config.StackTemplateOptions{
TLSAssetsDir: "credentials",
ControllerTmplFile: "userdata/cloud-config-controller",
WorkerTmplFile: "userdata/cloud-config-worker",
EtcdTmplFile: "userdata/cloud-config-etcd",
StackTemplateTmplFile: "stack-template.json",
}

Expand Down
70 changes: 69 additions & 1 deletion multi-node/aws/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cluster

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"text/tabwriter"
"time"
Expand Down Expand Up @@ -232,12 +234,78 @@ func (c *Cluster) createStack(cfSvc cloudformationService, stackBody string) (*c
return cfSvc.CreateStack(creq)
}

/*
Makes sure that etcd resource definitions are not upgrades by cloudformation stack update.
Fetches resource defintions from existing stack and splices them into the updated resource defintions.
TODO(chom): etcd controller + dynamic cluster management will obviate need for this function
*/
type cfStackResources struct {
Resources map[string]map[string]interface{} `json:"Resources"`
}

func (c *Cluster) lockEtcdResources(cfSvc *cloudformation.CloudFormation, stackBody string) (string, error) {

//Unmarshal incoming stack resource defintions
var newStack cfStackResources
if err := json.Unmarshal([]byte(stackBody), &newStack); err != nil {
return "", fmt.Errorf("error unmarshaling new stack json: %v", err)
}

instanceEtcdExpr := regexp.MustCompile("^InstanceEtcd[0-9]+$")
//Remove all etcdInstance resource defintions from incoming stack
for name, _ := range newStack.Resources {
if instanceEtcdExpr.Match([]byte(name)) {
fmt.Printf("[lockEtcdResources: REMOVE %s\n", name)
delete(newStack.Resources, name)
}
}

//Fetch and unmarshal existing stack resource defintions
res, err := cfSvc.GetTemplate(&cloudformation.GetTemplateInput{
StackName: aws.String(c.ClusterName),
})
if err != nil {
return "", fmt.Errorf("error getting stack template: %v", err)
}
var existingStack cfStackResources
if err := json.Unmarshal([]byte(*res.TemplateBody), &existingStack); err != nil {
return "", fmt.Errorf("error unmarshaling existing stack json: %v", err)
}

//splice in existing resource defintions for etcd into new stack
for name, definition := range existingStack.Resources {
if instanceEtcdExpr.Match([]byte(name)) {
fmt.Printf("[lockEtcdResources: ADD %s\n", name)
newStack.Resources[name] = definition
}
}
var outgoingStack map[string]interface{}
if err := json.Unmarshal([]byte(stackBody), &outgoingStack); err != nil {
return "", fmt.Errorf("error unmarshaling outgoing stack json: %v", err)
}
outgoingStack["Resources"] = newStack.Resources

// ship off new stack to cloudformation api for an update
out, err := json.MarshalIndent(&outgoingStack, "", "")
if err != nil {
return "", fmt.Errorf("error marshaling stack json: %v", err)
}

return string(out), nil
}

func (c *Cluster) Update(stackBody string) (string, error) {

cfSvc := cloudformation.New(c.session)
var err error
if stackBody, err = c.lockEtcdResources(cfSvc, stackBody); err != nil {
return "", err
}
input := &cloudformation.UpdateStackInput{
Capabilities: []*string{aws.String(cloudformation.CapabilityCapabilityIam)},
StackName: aws.String(c.ClusterName),
TemplateBody: &stackBody,
TemplateBody: aws.String(stackBody),
}

updateOutput, err := cfSvc.UpdateStack(input)
Expand Down
113 changes: 106 additions & 7 deletions multi-node/aws/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ func newDefaultCluster() *Cluster {
WorkerRootVolumeType: "gp2",
WorkerRootVolumeIOPS: 0,
WorkerRootVolumeSize: 30,
EtcdCount: 1,
EtcdInstanceType: "m3.medium",
EtcdRootVolumeSize: 30,
EtcdDataVolumeSize: 30,
CreateRecordSet: false,
RecordSetTTL: 300,
Subnets: []Subnet{},
Subnets: []*Subnet{},
}
}

Expand Down Expand Up @@ -94,7 +98,7 @@ func ClusterFromBytes(data []byte) (*Cluster, error) {

// For backward-compatibility
if len(c.Subnets) == 0 {
c.Subnets = []Subnet{
c.Subnets = []*Subnet{
{
AvailabilityZone: c.AvailabilityZone,
InstanceCIDR: c.InstanceCIDR,
Expand Down Expand Up @@ -122,6 +126,11 @@ type Cluster struct {
WorkerRootVolumeIOPS int `yaml:"workerRootVolumeIOPS,omitempty"`
WorkerRootVolumeSize int `yaml:"workerRootVolumeSize,omitempty"`
WorkerSpotPrice string `yaml:"workerSpotPrice,omitempty"`
EtcdCount int `yaml:"etcdCount"`
EtcdInstanceType string `yaml:"etcdInstanceType,omitempty"`
EtcdRootVolumeSize int `yaml:"etcdRootVolumeSize,omitempty"`
EtcdDataVolumeSize int `yaml:"etcdDataVolumeSize,omitempty"`
EtcdDataVolumeEphemeral bool `yaml:"etcdDataVolumEphemeral,omitempty"`
VPCID string `yaml:"vpcId,omitempty"`
RouteTableID string `yaml:"routeTableId,omitempty"`
VPCCIDR string `yaml:"vpcCIDR,omitempty"`
Expand All @@ -140,12 +149,13 @@ type Cluster struct {
HostedZoneID string `yaml:"hostedZoneId,omitempty"`
StackTags map[string]string `yaml:"stackTags,omitempty"`
UseCalico bool `yaml:"useCalico,omitempty"`
Subnets []Subnet `yaml:"subnets,omitempty"`
Subnets []*Subnet `yaml:"subnets,omitempty"`
}

type Subnet struct {
AvailabilityZone string `yaml:"availabilityZone,omitempty"`
InstanceCIDR string `yaml:"instanceCIDR,omitempty"`
AvailabilityZone string `yaml:"availabilityZone,omitempty"`
InstanceCIDR string `yaml:"instanceCIDR,omitempty"`
lastAllocatedAddr *net.IP
}

const (
Expand All @@ -160,7 +170,7 @@ var supportedReleaseChannels = map[string]bool{

func (c Cluster) Config() (*Config, error) {
config := Config{Cluster: c}
config.ETCDEndpoints = fmt.Sprintf("http://%s:2379", c.ControllerIP)

config.APIServers = fmt.Sprintf("http://%s:8080", c.ControllerIP)
config.SecureAPIServers = fmt.Sprintf("https://%s:443", c.ControllerIP)
config.APIServerEndpoint = fmt.Sprintf("https://%s", c.ExternalDNSName)
Expand Down Expand Up @@ -199,6 +209,77 @@ func (c Cluster) Config() (*Config, error) {
config.VPCRef = fmt.Sprintf("%q", config.VPCID)
}

config.EtcdInstances = make([]etcdInstance, config.EtcdCount)
var etcdEndpoints, etcdInitialCluster bytes.Buffer
for etcdIndex := 0; etcdIndex < config.EtcdCount; etcdIndex++ {

//Round-robbin etcd instances across all available subnets
subnetIndex := etcdIndex % len(config.Subnets)
subnet := config.Subnets[subnetIndex]

_, subnetCIDR, err := net.ParseCIDR(subnet.InstanceCIDR)
if err != nil {
return nil, fmt.Errorf("error parsing subnet instance cidr %s: %v", subnet.InstanceCIDR, err)
}

if subnet.lastAllocatedAddr == nil {
ip := subnetCIDR.IP
//TODO:(chom) this is sloppy, but "soon-ish" etcd with be self-hosted so we'll leave this be
for i := 0; i < 3; i++ {
ip = incrementIP(ip)
}
subnet.lastAllocatedAddr = &ip
}

nextAddr := incrementIP(*subnet.lastAllocatedAddr)
subnet.lastAllocatedAddr = &nextAddr
instance := etcdInstance{
IPAddress: *subnet.lastAllocatedAddr,
SubnetIndex: subnetIndex,
}

//TODO(chom): validate we're not overflowing the address space
//This is complicated, must also factor in DHCP addresses
//for ASG components

//Punt on this- we're going to have an answer for dynamic etcd clusters at some point. Then we can either throw
//the instances in an ASG and use DHCP like all other instances, or simply self-host on cluster

config.EtcdInstances[etcdIndex] = instance

//TODO: ipv6 support
if len(instance.IPAddress) != 4 {
return nil, fmt.Errorf("Non ipv4 address for etcd node: %v", instance.IPAddress)
}

//http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-instance-addressing.html#concepts-private-addresses

var dnsSuffix string
if config.Region == "us-east-1" {
// a special DNS suffix for the original AWS region!
dnsSuffix = "ec2.internal"
} else {
dnsSuffix = fmt.Sprintf("%s.compute.internal", config.Region)
}

hostname := fmt.Sprintf("ip-%d-%d-%d-%d.%s",
instance.IPAddress[0],
instance.IPAddress[1],
instance.IPAddress[2],
instance.IPAddress[3],
dnsSuffix,
)

fmt.Fprintf(&etcdEndpoints, "https://%s:2379", hostname)
fmt.Fprintf(&etcdInitialCluster, "%s=https://%s:2380", hostname, hostname)
if etcdIndex < config.EtcdCount-1 {
fmt.Fprintf(&etcdEndpoints, ",")
fmt.Fprintf(&etcdInitialCluster, ",")
}
}
config.EtcdEndpoints = etcdEndpoints.String()
config.EtcdInitialCluster = etcdInitialCluster.String()

return &config, nil
}

Expand Down Expand Up @@ -232,13 +313,15 @@ type StackTemplateOptions struct {
TLSAssetsDir string
ControllerTmplFile string
WorkerTmplFile string
EtcdTmplFile string
StackTemplateTmplFile string
}

type stackConfig struct {
*Config
UserDataWorker string
UserDataController string
UserDataEtcd string
ControllerSubnetIndex int
}

Expand Down Expand Up @@ -289,6 +372,7 @@ func (c Cluster) stackConfig(opts StackTemplateOptions, compressUserData bool) (
if controllerIPAddr == nil {
return nil, fmt.Errorf("invalid controllerIP: %s", stackConfig.ControllerIP)
}

controllerSubnetFound := false
for i, subnet := range stackConfig.Subnets {
_, instanceCIDR, err := net.ParseCIDR(subnet.InstanceCIDR)
Expand All @@ -310,6 +394,9 @@ func (c Cluster) stackConfig(opts StackTemplateOptions, compressUserData bool) (
if stackConfig.UserDataController, err = execute(opts.ControllerTmplFile, stackConfig.Config, compressUserData); err != nil {
return nil, fmt.Errorf("failed to render controller cloud config: %v", err)
}
if stackConfig.UserDataEtcd, err = execute(opts.EtcdTmplFile, stackConfig.Config, compressUserData); err != nil {
return nil, fmt.Errorf("failed to render etcd cloud config: %v", err)
}

return &stackConfig, nil
}
Expand All @@ -334,6 +421,10 @@ func (c Cluster) ValidateUserData(opts StackTemplateOptions) error {
Content: stackConfig.UserDataController,
Name: "UserDataController",
},
{
Content: stackConfig.UserDataEtcd,
Name: "UserDataEtcd",
},
} {
report, err := validate.Validate([]byte(userData.Content))

Expand Down Expand Up @@ -413,10 +504,18 @@ func getContextString(buf []byte, offset, lineCount int) string {
return string(buf[leftLimit:rightLimit])
}

type etcdInstance struct {
IPAddress net.IP
SubnetIndex int
}

type Config struct {
Cluster

ETCDEndpoints string
EtcdEndpoints string
EtcdInitialCluster string
EtcdInstances []etcdInstance

APIServers string
SecureAPIServers string
APIServerEndpoint string
Expand Down
12 changes: 6 additions & 6 deletions multi-node/aws/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestMultipleSubnets(t *testing.T) {

validConfigs := []struct {
conf string
subnets []Subnet
subnets []*Subnet
}{
{
conf: `
Expand All @@ -320,7 +320,7 @@ subnets:
- availabilityZone: ap-northeast-1c
instanceCIDR: 10.4.4.0/24
`,
subnets: []Subnet{
subnets: []*Subnet{
{
InstanceCIDR: "10.4.3.0/24",
AvailabilityZone: "ap-northeast-1a",
Expand All @@ -339,7 +339,7 @@ controllerIP: 10.4.3.50
availabilityZone: ap-northeast-1a
instanceCIDR: 10.4.3.0/24
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.4.3.0/24",
Expand All @@ -355,7 +355,7 @@ availabilityZone: ap-northeast-1a
instanceCIDR: 10.4.3.0/24
subnets: []
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.4.3.0/24",
Expand All @@ -368,7 +368,7 @@ subnets: []
availabilityZone: "ap-northeast-1a"
subnets: []
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.0.0.0/24",
Expand All @@ -380,7 +380,7 @@ subnets: []
# Missing subnets field fall-backs to the single subnet with the default az/cidr.
availabilityZone: "ap-northeast-1a"
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.0.0.0/24",
Expand Down
Loading

0 comments on commit bbe383e

Please sign in to comment.