From 9ef9909bf72ec37b8983dafff7e38b859e526e48 Mon Sep 17 00:00:00 2001 From: kevin xu Date: Wed, 16 Nov 2016 13:47:51 +0800 Subject: [PATCH 1/4] design doc for IPAM added Signed-off-by: kevin xu --- design-docs/ipam.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 design-docs/ipam.md diff --git a/design-docs/ipam.md b/design-docs/ipam.md new file mode 100644 index 00000000..7ea44a5d --- /dev/null +++ b/design-docs/ipam.md @@ -0,0 +1,37 @@ +# IP Address Manager +The IPAM for Swan is supposed to manage lifecycle of a predefined group +of IPs, all IP addresses should be avaliable within the same layer 2 +subnet as well as the hosts, all of them are reserved for containers, +each container could be assigned a unique ip, with underlaying Macvlan +bridge created by docker. + +## This is not a docker plugin +The initial thought would be make this as a docker plugin like DHCP +IPAM, so docker daemon could reach the IPAM remotely to where it stay in the Swan +managers. But the truth is that as we have our own scheduler by default so this +IPAM was not intend to run in standlone mode without scheduler, as though the only consumer of +the IPAM would be the schdueler itself, it might be better choice make +the IPAM not a plugin but part of scheduler which can access both from +HTTP API and call directly. + +## How to initialize the IP list pool +IP list pool supposed to be entered mannuly through HTTP API, which +each ip should be unique and accessible within the same layer 2 subnet. + +## Lifecycle of a ip + + * `avaliable` avaliable to be allocate to a container + * `reserved` reserved, should not allocated to any container + * `allocated` currently used by a container + * `releasing` released but not avaliable soon, will be turn into + avaliable state after certain time periods + +## How to interact with IPAM, the APIs + + * `list` avaliable ips, no matter what state they are + * `initialize` the ip pool + * `empty` the ip pool + * `allocate` the ip from pool + * `release` a ip back to the pool + + From 222dd92c9a8b05c723ca9630d728ee5acc9e87b9 Mon Sep 17 00:00:00 2001 From: kevin xu Date: Fri, 18 Nov 2016 14:46:16 +0800 Subject: [PATCH 2/4] ipam feature done Signed-off-by: kevin xu --- .gitignore | 1 + api/router/ipam/ipam.go | 141 +++++++++++++++++++++ api/router/ipam/routes.go | 37 ++++++ commands.go | 140 +++++++++++++++++++++ examplejson/example-ipam.json | 3 + ipam/ip.go | 105 ++++++++++++++++ ipam/ip_test.go | 64 ++++++++++ ipam/ipam.go | 170 +++++++++++++++++++++++++ ipam/ipam_store.go | 163 ++++++++++++++++++++++++ ipam/ipam_store_interface.go | 18 +++ ipam/ipam_store_test.go | 104 ++++++++++++++++ ipam/ipam_test.go | 227 ++++++++++++++++++++++++++++++++++ 12 files changed, 1173 insertions(+) create mode 100644 api/router/ipam/ipam.go create mode 100644 api/router/ipam/routes.go create mode 100644 commands.go create mode 100644 examplejson/example-ipam.json create mode 100644 ipam/ip.go create mode 100644 ipam/ip_test.go create mode 100644 ipam/ipam.go create mode 100644 ipam/ipam_store.go create mode 100644 ipam/ipam_store_interface.go create mode 100644 ipam/ipam_store_test.go create mode 100644 ipam/ipam_test.go diff --git a/.gitignore b/.gitignore index 39862c2b..b5483682 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ coverage-all.out coverage.out coverage.html .bolt.db +.bolt-foobar.db diff --git a/api/router/ipam/ipam.go b/api/router/ipam/ipam.go new file mode 100644 index 00000000..75024b97 --- /dev/null +++ b/api/router/ipam/ipam.go @@ -0,0 +1,141 @@ +package ipam + +import ( + "encoding/json" + "errors" + "net/http" + + "github.com/Dataman-Cloud/swan/api/utils" + ipamanger "github.com/Dataman-Cloud/swan/ipam" +) + +func (r *Router) AllocateIP(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + if err := req.ParseForm(); err != nil { + return err + } + + ipStr := req.Form.Get("ip") + if ipStr == "" { + return errors.New("no ip specified") + } + + ip, err := r.ipam.AllocateIp(ipamanger.IP{Ip: ipStr}) + if err != nil { + return err + } + + return json.NewEncoder(w).Encode(ip) +} + +func (r *Router) AllocateNextAvailable(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + ip, err := r.ipam.AllocateNextAvailableIP() + if err != nil { + return err + } + + return json.NewEncoder(w).Encode(ip) +} + +func (r *Router) ListAvailableIps(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + list, err := r.ipam.IPsAvailable() + if err != nil { + return err + } + + return json.NewEncoder(w).Encode(list) +} + +func (r *Router) ListAllocatedIps(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + list, err := r.ipam.IPsAllocated() + if err != nil { + return err + } + + return json.NewEncoder(w).Encode(list) +} + +func (r *Router) ReleaseIP(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + if err := req.ParseForm(); err != nil { + return err + } + + var param struct { + IP string `json:"ip"` + } + + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(¶m); err != nil { + return err + } + + err := r.ipam.Release(ipamanger.IP{Ip: param.IP}) + if err != nil { + return err + } + + return nil +} + +func (r *Router) RefillIPs(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + if err := req.ParseForm(); err != nil { + return err + } + + var param struct { + IPs []string `json:"ips"` + } + + var ips []ipamanger.IP + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(¶m); err != nil { + return err + } + + for _, ipStr := range param.IPs { + ips = append(ips, ipamanger.IP{Ip: ipStr}) + } + + err := r.ipam.Refill(ips) + if err != nil { + return err + } + + return nil +} + +func (r *Router) ListIPs(w http.ResponseWriter, req *http.Request) error { + if err := utils.CheckForJSON(req); err != nil { + return err + } + + list, err := r.ipam.AllIPs() + if err != nil { + return err + } + + return json.NewEncoder(w).Encode(list) +} diff --git a/api/router/ipam/routes.go b/api/router/ipam/routes.go new file mode 100644 index 00000000..98bd30bd --- /dev/null +++ b/api/router/ipam/routes.go @@ -0,0 +1,37 @@ +package ipam + +import ( + "github.com/Dataman-Cloud/swan/api/router" + manager "github.com/Dataman-Cloud/swan/ipam" +) + +type Router struct { + routes []*router.Route + ipam *manager.IPAM +} + +// NewRouter initializes a new ipam router. +func NewRouter(manager *manager.IPAM) *Router { + r := &Router{ + ipam: manager, + } + + r.initRoutes() + return r +} + +func (r *Router) Routes() []*router.Route { + return r.routes +} + +func (r *Router) initRoutes() { + r.routes = []*router.Route{ + router.NewRoute("GET", "/v1/ipam/allocate_randomly", r.AllocateNextAvailable), + router.NewRoute("GET", "/v1/ipam/allocated_ips", r.ListAllocatedIps), + router.NewRoute("GET", "/v1/ipam/available_ips", r.ListAvailableIps), + router.NewRoute("POST", "/v1/ipam/release", r.ReleaseIP), + router.NewRoute("GET", "/v1/ipam/allocate", r.AllocateIP), + router.NewRoute("POST", "/v1/ipam/ips", r.RefillIPs), + router.NewRoute("GET", "/v1/ipam/ips", r.ListIPs), + } +} diff --git a/commands.go b/commands.go new file mode 100644 index 00000000..6fa9d5f9 --- /dev/null +++ b/commands.go @@ -0,0 +1,140 @@ +package main + +import ( + "fmt" + "net/url" + "os" + + "github.com/Dataman-Cloud/swan/api" + "github.com/Dataman-Cloud/swan/api/router" + "github.com/Dataman-Cloud/swan/api/router/application" + ipamapi "github.com/Dataman-Cloud/swan/api/router/ipam" + "github.com/Dataman-Cloud/swan/backend" + "github.com/Dataman-Cloud/swan/health" + "github.com/Dataman-Cloud/swan/ipam" + "github.com/Dataman-Cloud/swan/mesosproto/mesos" + "github.com/Dataman-Cloud/swan/scheduler" + . "github.com/Dataman-Cloud/swan/store/local" + + "github.com/Dataman-Cloud/swan/types" + "github.com/Sirupsen/logrus" + "github.com/andygrunwald/megos" + "github.com/golang/protobuf/proto" + "github.com/urfave/cli" +) + +var Commands = []cli.Command{ + { + Name: "server", + Aliases: []string{"s"}, + Usage: "spawn swan server", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "addr", + Usage: "API Server address ", + Value: "0.0.0.0:9999", + }, + + cli.StringFlag{ + Name: "masters", + Usage: "masters address ", + Value: "127.0.0.0:5050", + }, + + cli.StringFlag{ + Name: "user", + Usage: "mesos user", + Value: "root", + }, + }, + Action: func(c *cli.Context) error { + ServerCommand(c) + return nil + }, + }, +} + +func ServerCommand(c *cli.Context) { + hostname, err := os.Hostname() + if err != nil { + hostname = "UNKNOWN" + } + + fw := &mesos.FrameworkInfo{ + User: proto.String(c.String("user")), + Name: proto.String("swan"), + Hostname: proto.String(hostname), + FailoverTimeout: proto.Float64(60 * 60 * 24 * 7), + } + + store, err := NewBoltStore(".bolt.db") + if err != nil { + logrus.Errorf("Init store engine failed:%s", err) + return + } + + frameworkId, err := store.FetchFrameworkID() + if err != nil { + logrus.Errorf("Fetch framework id failed: %s", err) + return + } + + if frameworkId != "" { + fw.Id = &mesos.FrameworkID{ + Value: proto.String(frameworkId), + } + } + + msgQueue := make(chan types.ReschedulerMsg, 1) + + masters := []string{c.String("masters")} + masterUrls := make([]*url.URL, 0) + for _, master := range masters { + masterUrl, _ := url.Parse(fmt.Sprintf("http://%s", master)) + masterUrls = append(masterUrls, masterUrl) + } + + mesos := megos.NewClient(masterUrls, nil) + state, err := mesos.GetStateFromCluster() + if err != nil { + panic(err) + } + + cluster := state.Cluster + if cluster == "" { + cluster = "Unnamed" + } + + sched := scheduler.NewScheduler( + state.Leader, + fw, + store, + cluster, + health.NewHealthCheckManager(store, msgQueue), + msgQueue, + ) + + backend := backend.NewBackend(sched, store) + + ipamStore, err := ipam.NewBoltStore(".bolt-foobar.db") + if err != nil { + logrus.Errorf("Init store engine failed:%s", err) + return + } + ipamager := ipam.NewIPAM(ipamStore) + + srv := api.NewServer(c.String("addr")) + + routers := []router.Router{ + application.NewRouter(backend), + ipamapi.NewRouter(ipamager), + } + + srv.InitRouter(routers...) + + go func() { + srv.ListenAndServe() + }() + + <-sched.Start() +} diff --git a/examplejson/example-ipam.json b/examplejson/example-ipam.json new file mode 100644 index 00000000..a2036a52 --- /dev/null +++ b/examplejson/example-ipam.json @@ -0,0 +1,3 @@ +{ + "ips": ["192.168.1.2", "192.168.1.3", "192.168.1.4", "192.168.1.5", "192.168.1.6", "192.168.1.7", "192.168.1.8", "192.168.1.9", "192.168.1.10", "192.168.1.11", "192.168.1.12", "192.168.1.13", "192.168.1.14", "192.168.1.15", "192.168.1.16", "192.168.1.17", "192.168.1.18", "192.168.1.19", "192.168.1.20", "192.168.1.21", "192.168.1.22", "192.168.1.23", "192.168.1.24", "192.168.1.25", "192.168.1.26", "192.168.1.27", "192.168.1.28", "192.168.1.29", "192.168.1.30", "192.168.1.31", "192.168.1.32", "192.168.1.33", "192.168.1.34", "192.168.1.35", "192.168.1.36", "192.168.1.37", "192.168.1.38", "192.168.1.39", "192.168.1.40", "192.168.1.41", "192.168.1.42", "192.168.1.43", "192.168.1.44", "192.168.1.45", "192.168.1.46", "192.168.1.47", "192.168.1.48", "192.168.1.49", "192.168.1.50", "192.168.1.51", "192.168.1.52", "192.168.1.53", "192.168.1.54", "192.168.1.55", "192.168.1.56", "192.168.1.57", "192.168.1.58", "192.168.1.59", "192.168.1.60", "192.168.1.61", "192.168.1.62", "192.168.1.63", "192.168.1.64", "192.168.1.65", "192.168.1.66", "192.168.1.67", "192.168.1.68", "192.168.1.69", "192.168.1.70", "192.168.1.71", "192.168.1.72", "192.168.1.73", "192.168.1.74", "192.168.1.75", "192.168.1.76", "192.168.1.77", "192.168.1.78", "192.168.1.79", "192.168.1.80", "192.168.1.81", "192.168.1.82", "192.168.1.83", "192.168.1.84", "192.168.1.85", "192.168.1.86", "192.168.1.87", "192.168.1.88", "192.168.1.89", "192.168.1.90", "192.168.1.91", "192.168.1.92", "192.168.1.93", "192.168.1.94", "192.168.1.95", "192.168.1.96", "192.168.1.97", "192.168.1.98", "192.168.1.99", "192.168.1.100", "192.168.1.101", "192.168.1.102", "192.168.1.103", "192.168.1.104", "192.168.1.105", "192.168.1.106", "192.168.1.107", "192.168.1.108", "192.168.1.109", "192.168.1.110", "192.168.1.111", "192.168.1.112", "192.168.1.113", "192.168.1.114", "192.168.1.115", "192.168.1.116", "192.168.1.117", "192.168.1.118", "192.168.1.119", "192.168.1.120", "192.168.1.121", "192.168.1.122", "192.168.1.123", "192.168.1.124", "192.168.1.125", "192.168.1.126", "192.168.1.127", "192.168.1.128", "192.168.1.129", "192.168.1.130", "192.168.1.131", "192.168.1.132", "192.168.1.133", "192.168.1.134", "192.168.1.135", "192.168.1.136", "192.168.1.137", "192.168.1.138", "192.168.1.139", "192.168.1.140", "192.168.1.141", "192.168.1.142", "192.168.1.143", "192.168.1.144", "192.168.1.145", "192.168.1.146", "192.168.1.147", "192.168.1.148", "192.168.1.149", "192.168.1.150", "192.168.1.151", "192.168.1.152", "192.168.1.153", "192.168.1.154", "192.168.1.155", "192.168.1.156", "192.168.1.157", "192.168.1.158", "192.168.1.159", "192.168.1.160", "192.168.1.161", "192.168.1.162", "192.168.1.163", "192.168.1.164", "192.168.1.165", "192.168.1.166", "192.168.1.167", "192.168.1.168", "192.168.1.169", "192.168.1.170", "192.168.1.171", "192.168.1.172", "192.168.1.173", "192.168.1.174", "192.168.1.175", "192.168.1.176", "192.168.1.177", "192.168.1.178", "192.168.1.179", "192.168.1.180", "192.168.1.181", "192.168.1.182", "192.168.1.183", "192.168.1.184", "192.168.1.185", "192.168.1.186", "192.168.1.187", "192.168.1.188", "192.168.1.189", "192.168.1.190", "192.168.1.191", "192.168.1.192", "192.168.1.193", "192.168.1.194", "192.168.1.195", "192.168.1.196", "192.168.1.197", "192.168.1.198", "192.168.1.199", "192.168.1.200", "192.168.1.201", "192.168.1.202", "192.168.1.203", "192.168.1.204", "192.168.1.205", "192.168.1.206", "192.168.1.207", "192.168.1.208", "192.168.1.209", "192.168.1.210", "192.168.1.211", "192.168.1.212", "192.168.1.213", "192.168.1.214", "192.168.1.215", "192.168.1.216", "192.168.1.217", "192.168.1.218", "192.168.1.219", "192.168.1.220", "192.168.1.221", "192.168.1.222", "192.168.1.223", "192.168.1.224", "192.168.1.225", "192.168.1.226", "192.168.1.227", "192.168.1.228", "192.168.1.229", "192.168.1.230", "192.168.1.231", "192.168.1.232", "192.168.1.233", "192.168.1.234", "192.168.1.235", "192.168.1.236", "192.168.1.237", "192.168.1.238", "192.168.1.239", "192.168.1.240", "192.168.1.241", "192.168.1.242", "192.168.1.243", "192.168.1.244", "192.168.1.245", "192.168.1.246", "192.168.1.247", "192.168.1.248", "192.168.1.249", "192.168.1.250", "192.168.1.251", "192.168.1.252", "192.168.1.253", "192.168.1.254"] +} diff --git a/ipam/ip.go b/ipam/ip.go new file mode 100644 index 00000000..650ff905 --- /dev/null +++ b/ipam/ip.go @@ -0,0 +1,105 @@ +package ipam + +import ( + "errors" + "fmt" + "net" + "strconv" + "strings" + "time" +) + +const ( + IP_STATE_AVAILABLE = "available" + IP_STATE_ALLOCATED = "allocated" +) + +var ( + ErrParseIp = errors.New("ip parse IP") + ErrIsMultiCastIP = errors.New("ip is a multicast IP") + ErrIsLoopbackIP = errors.New("ip is a loopback IP") + ErrIsUnspecified = errors.New("ip is unspecified") + ErrIsLinkLocalUnicast = errors.New("ip is link local unicast") +) + +// see `https://golang.org/src/net/ip.go` + +type IP struct { + Ip string `json:"Ip"` + State string `json:"State"` + ReleaseAt time.Time `json:"releaseAt"` + TaskId string `json:"TaskId"` +} + +func NewIpFromIp(ip string) (IP, error) { + if err := validIp(ip); err != nil { + return IP{}, err + } + + return IP{ + Ip: ip, + State: IP_STATE_AVAILABLE, + }, nil +} + +func validIp(ipString string) error { + ip := net.ParseIP(ipString) + + if ip == nil { + return ErrParseIp + } + + if ip.IsUnspecified() { + return ErrIsUnspecified + } + + if ip.IsLoopback() { + return ErrIsLoopbackIP + } + + if ip.IsLinkLocalUnicast() { + return ErrIsLinkLocalUnicast + } + + if ip.IsMulticast() { + return ErrIsMultiCastIP + } + + return nil +} + +func (ip IP) ToIP() net.IP { + ipv4 := net.ParseIP(ip.Ip) + return ipv4 +} + +func (ip IP) ToString() string { + return fmt.Sprintf("ip<%s> state<%s> taskId<%s>", ip.Ip, ip.State, ip.TaskId) +} + +func (ip IP) ToInteger() int64 { + ip4 := ip.ToIP().To4() + bin := make([]string, len(ip4)) + for i, v := range ip4 { + bin[i] = fmt.Sprintf("%08b", v) + } + i, _ := strconv.ParseInt(strings.Join(bin, ""), 2, 64) + + return i +} + +func (ip IP) Key() string { + return ip.Ip +} + +type IPList []IP + +func (s IPList) Len() int { + return len(s) +} +func (s IPList) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s IPList) Less(i, j int) bool { + return s[i].ToInteger() < s[j].ToInteger() +} diff --git a/ipam/ip_test.go b/ipam/ip_test.go new file mode 100644 index 00000000..d51bec95 --- /dev/null +++ b/ipam/ip_test.go @@ -0,0 +1,64 @@ +package ipam + +import ( + "net" + "sort" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIpToString(t *testing.T) { + ip := IP{Ip: "127.0.0.1", State: IP_STATE_AVAILABLE, TaskId: "task-id"} + assert.Equal(t, ip.ToString(), "ip<127.0.0.1> state taskId") +} + +func TestIpKey(t *testing.T) { + ip := IP{Ip: "127.0.0.1", State: IP_STATE_AVAILABLE, TaskId: "task-id"} + assert.Equal(t, "127.0.0.1", ip.Key()) +} + +func TestToIP(t *testing.T) { + ip := IP{Ip: "127.0.0.1", State: IP_STATE_AVAILABLE, TaskId: "task-id"} + ipExpected := net.ParseIP("127.0.0.1") + assert.Equal(t, ipExpected, ip.ToIP()) +} + +func TestToInteger(t *testing.T) { + ip := IP{Ip: "127.0.0.1", State: IP_STATE_AVAILABLE, TaskId: "task-id"} + i, _ := strconv.ParseInt("01111111000000000000000000000001", 2, 64) + assert.Equal(t, i, ip.ToInteger()) +} + +func TestIPListSort(t *testing.T) { + ip1 := IP{Ip: "127.0.0.1", State: IP_STATE_AVAILABLE, TaskId: "task-id"} + ip2 := IP{Ip: "127.0.0.2", State: IP_STATE_AVAILABLE, TaskId: "task-id"} + + ips := IPList([]IP{ip2, ip1}) + assert.Equal(t, 2, len(ips)) + assert.Equal(t, ip2, ips[0]) + sort.Sort(ips) + assert.Equal(t, ip1, ips[0]) +} + +func TestValidIp(t *testing.T) { + nilIP := "foobar" + assert.Equal(t, ErrParseIp, validIp(nilIP)) + + unspeficiedIP := "0.0.0.0" + assert.Equal(t, ErrIsUnspecified, validIp(unspeficiedIP)) + + loobackIP := "127.0.0.1" + assert.Equal(t, ErrIsLoopbackIP, validIp(loobackIP)) + + linkLocalUnicast := "169.254.1.1" + assert.Equal(t, ErrIsLinkLocalUnicast, validIp(linkLocalUnicast)) + + multicast := "224.0.0.0" + assert.Equal(t, ErrIsMultiCastIP, validIp(multicast)) + + validIpStr := "192.168.1.2" + assert.Nil(t, validIp(validIpStr)) + +} diff --git a/ipam/ipam.go b/ipam/ipam.go new file mode 100644 index 00000000..1e68b7d9 --- /dev/null +++ b/ipam/ipam.go @@ -0,0 +1,170 @@ +package ipam + +import ( + "errors" + "sync" +) + +var ( + ErrIPAMPoolEmpty = errors.New("ipam pool empty") + ErrIpRequestedAllocated = errors.New("ip allocated") + ErrIpRequestedNotFound = errors.New("ip not found") + ErrNotInAllocatedState = errors.New("ip not in valid state") +) + +type IPAM struct { + mutex sync.Mutex // protected from multiple goroutine might access same IP at the same time + store IPAMStore +} + +// for time now, IPAM only acts as accessor of ips. +// it should be a sigleton but not necessary for now. +func NewIPAM(store IPAMStore) *IPAM { + return &IPAM{ + store: store, + } +} + +func (ipam *IPAM) GetIp(key string) (IP, error) { + return ipam.store.RetriveIP(key) +} + +func (ipam *IPAM) Refill(listOfIps IPList) error { + if err := ipam.Clear(); err != nil { + return err + } + + for _, ip := range listOfIps { + ip.State = IP_STATE_AVAILABLE + err := ipam.store.SaveIP(ip) + if err != nil { + return err + } + } + + return nil +} + +// remove all IPs in the pool, used when want refresh IPAM pool +func (ipam *IPAM) Clear() error { + return ipam.store.EmptyPool() +} + +// list all ips managed by IPAM +func (ipam *IPAM) AllIPs() (IPList, error) { + return ipam.store.ListAllIPs() +} + +// list all IPs that in state `available' +func (ipam *IPAM) IPsAvailable() (IPList, error) { + list, err := ipam.AllIPs() + ips := []IP{} + if err != nil { + return IPList(ips), nil + } + + for _, ip := range list { + if ip.State == IP_STATE_AVAILABLE { + ips = append(ips, ip) + } + } + + return IPList(ips), nil +} + +// list all IPs that in state `allocated' +func (ipam *IPAM) IPsAllocated() (IPList, error) { + list, err := ipam.AllIPs() + ips := []IP{} + if err != nil { + return IPList(ips), nil + } + + for _, ip := range list { + if ip.State == IP_STATE_ALLOCATED { + ips = append(ips, ip) + } + } + + return IPList(ips), nil +} + +// retrive next avaliable IP, mark that IP as `allocated` +func (ipam *IPAM) AllocateNextAvailableIP() (IP, error) { + iplist, err := ipam.IPsAvailable() + if err != nil { + return IP{}, err + } + + if len(iplist) == 0 { + return IP{}, ErrIPAMPoolEmpty + } + + ipam.mutex.Lock() + defer ipam.mutex.Unlock() + + ip := iplist[0] + ip.State = IP_STATE_ALLOCATED + err = ipam.store.UpdateIP(ip) + if err != nil { + return IP{}, err + } + + return ip, nil +} + +func (ipam *IPAM) AllocateIp(ip IP) (IP, error) { + iplist, err := ipam.AllIPs() + if err != nil { + return IP{}, err + } + + if len(iplist) == 0 { + return IP{}, ErrIPAMPoolEmpty + } + + ipam.mutex.Lock() + defer ipam.mutex.Unlock() + + ipRet := IP{} + for _, v := range iplist { + if v.Ip == ip.Ip { + ipRet = v + } + } + + if ipRet.State == IP_STATE_ALLOCATED { + return ipRet, ErrIpRequestedAllocated + } + + if ipRet.Ip != ip.Ip { + return ipRet, ErrIpRequestedNotFound + } + + ip.State = IP_STATE_ALLOCATED + err = ipam.store.UpdateIP(ip) + if err != nil { + return IP{}, err + } + + return ip, nil +} + +func (ipam *IPAM) Release(ip IP) error { + ip, err := ipam.GetIp(ip.Key()) + if err != nil { + return err + } + + if ip.State != IP_STATE_ALLOCATED { + return ErrNotInAllocatedState + } + + ip.State = IP_STATE_AVAILABLE + err = ipam.store.UpdateIP(ip) + if err != nil { + return err + } + + return nil +} diff --git a/ipam/ipam_store.go b/ipam/ipam_store.go new file mode 100644 index 00000000..8abda343 --- /dev/null +++ b/ipam/ipam_store.go @@ -0,0 +1,163 @@ +package ipam + +import ( + "encoding/json" + "errors" + "time" + + "github.com/boltdb/bolt" +) + +const ( + BUCKET_IPAM = "ipam" +) + +const ( + BOLTDB_OPEN_TIMEOUT = time.Second * 5 +) + +type BoltStore struct { + conn *bolt.DB + path string +} + +func NewBoltStore(path string) (*BoltStore, error) { + handle, err := bolt.Open(path, 0600, &bolt.Options{Timeout: BOLTDB_OPEN_TIMEOUT}) + if err != nil { + return nil, err + } + + store := &BoltStore{ + conn: handle, + path: path, + } + + if err := store.initialize(); err != nil { + store.Close() + return nil, err + } + + return store, nil +} + +func (b *BoltStore) initialize() error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + buckets_need_create := []string{ + "swan", + "applications", + "tasks", + "versions", + "checks", + BUCKET_IPAM, + } + + for _, bucket := range buckets_need_create { + if _, err := tx.CreateBucketIfNotExists([]byte(bucket)); err != nil { + return err + } + } + + return tx.Commit() +} + +func (b *BoltStore) Close() error { + return b.conn.Close() +} + +func (b *BoltStore) SaveIP(ip IP) error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(BUCKET_IPAM)) + + data, err := json.Marshal(ip) + if err != nil { + return err + } + + if err := bucket.Put([]byte(ip.Key()), data); err != nil { + return err + } + + return tx.Commit() +} + +func (b *BoltStore) RetriveIP(key string) (IP, error) { + tx, err := b.conn.Begin(true) + if err != nil { + return IP{}, err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(BUCKET_IPAM)) + + data := bucket.Get([]byte(key)) + if data == nil { + return IP{}, errors.New("Not Found") + } + + var ip IP + if err := json.Unmarshal(data, &ip); err != nil { + return IP{}, err + } + + return ip, nil +} + +func (b *BoltStore) ListAllIPs() (IPList, error) { + tx, err := b.conn.Begin(true) + if err != nil { + return nil, err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(BUCKET_IPAM)) + + ips := []IP{} + bucket.ForEach(func(k, v []byte) error { + var ip IP + if err := json.Unmarshal(v, &ip); err != nil { + return err + } + ips = append(ips, ip) + return nil + }) + + return ips, nil +} + +func (b *BoltStore) UpdateIP(ip IP) error { + _, err := b.RetriveIP(ip.Key()) + if err != nil { + return err + } + + if err := b.SaveIP(ip); err != nil { + return err + } + + return nil +} + +func (b *BoltStore) EmptyPool() error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + bucket := tx.Bucket([]byte(BUCKET_IPAM)) + + bucket.ForEach(func(k, v []byte) error { + bucket.Delete(k) + return nil + }) + + return tx.Commit() +} diff --git a/ipam/ipam_store_interface.go b/ipam/ipam_store_interface.go new file mode 100644 index 00000000..55360a33 --- /dev/null +++ b/ipam/ipam_store_interface.go @@ -0,0 +1,18 @@ +package ipam + +type IPAMStore interface { + // to store an ip into a persist store + SaveIP(ip IP) error + + // retrive an ip from a store + RetriveIP(k string) (ip IP, err error) + + // retrive all ip from a store + ListAllIPs() (IPList, error) + + // update a ip address + UpdateIP(ip IP) error + + // wipe out all ip and recreate the bucket + EmptyPool() error +} diff --git a/ipam/ipam_store_test.go b/ipam/ipam_store_test.go new file mode 100644 index 00000000..8c11c9ee --- /dev/null +++ b/ipam/ipam_store_test.go @@ -0,0 +1,104 @@ +package ipam + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSaveIP(t *testing.T) { + bolt, _ := NewBoltStore("/tmp/boltdbtest") + defer func() { + bolt.Close() + os.Remove("/tmp/boltdbtest") + }() + + ip := IP{Ip: "127.0.0.1"} + + err := bolt.SaveIP(ip) + assert.Nil(t, err) + + ip, err1 := bolt.RetriveIP("127.0.0.1") + assert.Nil(t, err1) + + assert.Equal(t, "127.0.0.1", ip.Ip) +} + +func TestRetriveIP(t *testing.T) { + bolt, _ := NewBoltStore("/tmp/boltdbtest") + defer func() { + bolt.Close() + os.Remove("/tmp/boltdbtest") + }() + + ip := IP{Ip: "127.0.0.1"} + + err := bolt.SaveIP(ip) + assert.Nil(t, err) + + ip, err1 := bolt.RetriveIP("127.0.0.1") + assert.Nil(t, err1) + + assert.Equal(t, "127.0.0.1", ip.Ip) +} + +func TestListIPs(t *testing.T) { + bolt, _ := NewBoltStore("/tmp/boltdbtest") + defer func() { + bolt.Close() + os.Remove("/tmp/boltdbtest") + }() + + ip := IP{Ip: "127.0.0.1"} + err := bolt.SaveIP(ip) + assert.Nil(t, err) + + ip1 := IP{Ip: "127.0.0.2"} + err1 := bolt.SaveIP(ip1) + assert.Nil(t, err1) + + list, err2 := bolt.ListAllIPs() + assert.Nil(t, err2) + assert.Equal(t, 2, len(list)) +} + +func TestUpdateIP(t *testing.T) { + bolt, _ := NewBoltStore("/tmp/boltdbtest") + defer func() { + bolt.Close() + os.Remove("/tmp/boltdbtest") + }() + + ip := IP{Ip: "127.0.0.1"} + err := bolt.SaveIP(ip) + assert.Nil(t, err) + + ip2 := IP{Ip: "127.0.0.1", State: IP_STATE_ALLOCATED} + err2 := bolt.UpdateIP(ip2) + assert.Nil(t, err2) + + ip, err1 := bolt.RetriveIP("127.0.0.1") + assert.Nil(t, err1) + + assert.Equal(t, IP_STATE_ALLOCATED, ip.State) +} + +func TestEmptyPool(t *testing.T) { + bolt, _ := NewBoltStore("/tmp/boltdbtest") + defer func() { + bolt.Close() + os.Remove("/tmp/boltdbtest") + }() + + ip := IP{Ip: "127.0.0.1"} + err := bolt.SaveIP(ip) + assert.Nil(t, err) + + errEmpty := bolt.EmptyPool() + assert.Nil(t, errEmpty) + + ip, err1 := bolt.RetriveIP("127.0.0.1") + assert.NotNil(t, err1) + assert.Equal(t, "", ip.Ip) +} diff --git a/ipam/ipam_test.go b/ipam/ipam_test.go new file mode 100644 index 00000000..057f4980 --- /dev/null +++ b/ipam/ipam_test.go @@ -0,0 +1,227 @@ +package ipam + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewIPAM(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + + assert.Nil(t, err) + + ipam := NewIPAM(bolt) + assert.NotNil(t, ipam) +} + +func TestRefillIp(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + listRet, err := ipam.AllIPs() + + assert.Equal(t, 2, len(listRet)) +} + +func TestClear(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ipam.Clear() + + listRet, err := ipam.AllIPs() + assert.Equal(t, 0, len(listRet)) +} + +func TestAllIps(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + listRet, err := ipam.AllIPs() + assert.Equal(t, 2, len(listRet)) +} + +func TestAllIpAvailable(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1", State: IP_STATE_AVAILABLE} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip1.State = IP_STATE_ALLOCATED + ipam.AllocateIp(ip1) + listRet, err := ipam.IPsAvailable() + assert.Equal(t, 1, len(listRet)) +} + +func TestAllIpsAllocated(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1", State: IP_STATE_AVAILABLE} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip1.State = IP_STATE_ALLOCATED + ipam.AllocateIp(ip1) + listRet, err := ipam.IPsAllocated() + assert.Equal(t, 1, len(listRet)) +} + +func TestAllocatedNextAvailabelIP(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip, err := ipam.AllocateNextAvailableIP() + assert.Nil(t, err) + assert.Equal(t, "192.168.1.1", ip.Ip) + + ip1, err1 := ipam.AllocateNextAvailableIP() + assert.Nil(t, err1) + assert.Equal(t, "192.168.1.2", ip1.Ip) +} + +func TestAllocatedIp(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip, err := ipam.AllocateIp(IP{Ip: "192.168.1.1"}) + assert.Nil(t, err) + assert.Equal(t, "192.168.1.1", ip.Ip) +} + +func TestAllocatedIpEmptyPool(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip, err := ipam.AllocateIp(IP{Ip: "192.168.1.1"}) + assert.Nil(t, err) + assert.Equal(t, "192.168.1.1", ip.Ip) + + _, err1 := ipam.AllocateIp(IP{Ip: "192.168.1.1"}) + assert.NotNil(t, err1) + assert.Equal(t, ErrIpRequestedAllocated, err1) +} + +func TestAllocatedIpAlreadyAllocated(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + list := IPList([]IP{ip1}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip, err := ipam.AllocateIp(IP{Ip: "192.168.1.1"}) + assert.Nil(t, err) + assert.Equal(t, "192.168.1.1", ip.Ip) + + _, err1 := ipam.AllocateIp(IP{Ip: "192.168.1.1"}) + assert.NotNil(t, err1) + assert.Equal(t, ErrIpRequestedAllocated, err1) +} + +func TestRelease(t *testing.T) { + bolt, err := NewBoltStore("/tmp/xxxx") + defer func() { + bolt.Close() + os.Remove("/tmp/xxxx") + }() + ipam := NewIPAM(bolt) + + ip1 := IP{Ip: "192.168.1.1"} + ip2 := IP{Ip: "192.168.1.2"} + list := IPList([]IP{ip1, ip2}) + err = ipam.Refill(list) + assert.Nil(t, err) + + ip, err := ipam.AllocateIp(IP{Ip: "192.168.1.1"}) + assert.Nil(t, err) + assert.Equal(t, "192.168.1.1", ip.Ip) + + err1 := ipam.Release(IP{Ip: "192.168.1.1"}) + assert.Nil(t, err1) +} From 14fb283291427712b8eaee796c960f5f123ace7e Mon Sep 17 00:00:00 2001 From: kevin xu Date: Mon, 21 Nov 2016 10:38:45 +0800 Subject: [PATCH 3/4] corrections base on feedback Signed-off-by: kevin xu --- ipam/ip.go | 2 +- ipam/ipam_store.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ipam/ip.go b/ipam/ip.go index 650ff905..ead23a4a 100644 --- a/ipam/ip.go +++ b/ipam/ip.go @@ -27,7 +27,7 @@ var ( type IP struct { Ip string `json:"Ip"` State string `json:"State"` - ReleaseAt time.Time `json:"releaseAt"` + ReleaseAt time.Time `json:"ReleaseAt"` TaskId string `json:"TaskId"` } diff --git a/ipam/ipam_store.go b/ipam/ipam_store.go index 8abda343..4813f8f7 100644 --- a/ipam/ipam_store.go +++ b/ipam/ipam_store.go @@ -91,7 +91,7 @@ func (b *BoltStore) SaveIP(ip IP) error { } func (b *BoltStore) RetriveIP(key string) (IP, error) { - tx, err := b.conn.Begin(true) + tx, err := b.conn.Begin(false) if err != nil { return IP{}, err } @@ -113,7 +113,7 @@ func (b *BoltStore) RetriveIP(key string) (IP, error) { } func (b *BoltStore) ListAllIPs() (IPList, error) { - tx, err := b.conn.Begin(true) + tx, err := b.conn.Begin(false) if err != nil { return nil, err } From 517e4e168b2ac4afb873592255be36d1a5ce8b5d Mon Sep 17 00:00:00 2001 From: kevin xu Date: Mon, 21 Nov 2016 10:42:54 +0800 Subject: [PATCH 4/4] remove unused commands.go Signed-off-by: kevin xu --- commands.go | 140 ---------------------------------------------------- 1 file changed, 140 deletions(-) delete mode 100644 commands.go diff --git a/commands.go b/commands.go deleted file mode 100644 index 6fa9d5f9..00000000 --- a/commands.go +++ /dev/null @@ -1,140 +0,0 @@ -package main - -import ( - "fmt" - "net/url" - "os" - - "github.com/Dataman-Cloud/swan/api" - "github.com/Dataman-Cloud/swan/api/router" - "github.com/Dataman-Cloud/swan/api/router/application" - ipamapi "github.com/Dataman-Cloud/swan/api/router/ipam" - "github.com/Dataman-Cloud/swan/backend" - "github.com/Dataman-Cloud/swan/health" - "github.com/Dataman-Cloud/swan/ipam" - "github.com/Dataman-Cloud/swan/mesosproto/mesos" - "github.com/Dataman-Cloud/swan/scheduler" - . "github.com/Dataman-Cloud/swan/store/local" - - "github.com/Dataman-Cloud/swan/types" - "github.com/Sirupsen/logrus" - "github.com/andygrunwald/megos" - "github.com/golang/protobuf/proto" - "github.com/urfave/cli" -) - -var Commands = []cli.Command{ - { - Name: "server", - Aliases: []string{"s"}, - Usage: "spawn swan server", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "addr", - Usage: "API Server address ", - Value: "0.0.0.0:9999", - }, - - cli.StringFlag{ - Name: "masters", - Usage: "masters address ", - Value: "127.0.0.0:5050", - }, - - cli.StringFlag{ - Name: "user", - Usage: "mesos user", - Value: "root", - }, - }, - Action: func(c *cli.Context) error { - ServerCommand(c) - return nil - }, - }, -} - -func ServerCommand(c *cli.Context) { - hostname, err := os.Hostname() - if err != nil { - hostname = "UNKNOWN" - } - - fw := &mesos.FrameworkInfo{ - User: proto.String(c.String("user")), - Name: proto.String("swan"), - Hostname: proto.String(hostname), - FailoverTimeout: proto.Float64(60 * 60 * 24 * 7), - } - - store, err := NewBoltStore(".bolt.db") - if err != nil { - logrus.Errorf("Init store engine failed:%s", err) - return - } - - frameworkId, err := store.FetchFrameworkID() - if err != nil { - logrus.Errorf("Fetch framework id failed: %s", err) - return - } - - if frameworkId != "" { - fw.Id = &mesos.FrameworkID{ - Value: proto.String(frameworkId), - } - } - - msgQueue := make(chan types.ReschedulerMsg, 1) - - masters := []string{c.String("masters")} - masterUrls := make([]*url.URL, 0) - for _, master := range masters { - masterUrl, _ := url.Parse(fmt.Sprintf("http://%s", master)) - masterUrls = append(masterUrls, masterUrl) - } - - mesos := megos.NewClient(masterUrls, nil) - state, err := mesos.GetStateFromCluster() - if err != nil { - panic(err) - } - - cluster := state.Cluster - if cluster == "" { - cluster = "Unnamed" - } - - sched := scheduler.NewScheduler( - state.Leader, - fw, - store, - cluster, - health.NewHealthCheckManager(store, msgQueue), - msgQueue, - ) - - backend := backend.NewBackend(sched, store) - - ipamStore, err := ipam.NewBoltStore(".bolt-foobar.db") - if err != nil { - logrus.Errorf("Init store engine failed:%s", err) - return - } - ipamager := ipam.NewIPAM(ipamStore) - - srv := api.NewServer(c.String("addr")) - - routers := []router.Router{ - application.NewRouter(backend), - ipamapi.NewRouter(ipamager), - } - - srv.InitRouter(routers...) - - go func() { - srv.ListenAndServe() - }() - - <-sched.Start() -}