Permalink
Browse files

Merge "Init daylimit check daemon for rabbit"

  • Loading branch information...
2 parents 0ab7ccc + e5ddd25 commit bb84d264c02a66a6b6c097b76e093cb74c97ef31 @andyzh andyzh committed with Gerrit Code Review Dec 1, 2012
View
36 tools/daylimit/src/daylimit/iptablesrun/iptables-tpl
@@ -0,0 +1,36 @@
+# Generated by iptables-save v1.4.4 on Mon Nov 26 18:50:22 2012
+*nat
+:PREROUTING ACCEPT [20:1256]
+:INPUT ACCEPT [18:1088]
+:OUTPUT ACCEPT [85639:5777388]
+:POSTROUTING ACCEPT [37193:2231578]
+:warden-instance-16fe0br1hc5 - [0:0]
+:warden-prerouting - [0:0]
+[18:1088] -A PREROUTING -i eth0 -j warden-prerouting
+[37193:2231578] -A OUTPUT -o lo -j warden-prerouting
+[48448:3545978] -A POSTROUTING -o eth0 -j MASQUERADE
+[0:0] -A warden-instance-16fe0br1hc5 -p tcp -m tcp --dport 10001 -j DNAT --to-destination 10.254.0.2:10001
+[677:40620] -A warden-prerouting -j warden-instance-16fe0br1hc5
+COMMIT
+# Completed on Mon Nov 26 18:50:22 2012
+# Generated by iptables-save v1.4.4 on Mon Nov 26 18:50:22 2012
+*filter
+:INPUT ACCEPT [1902417:1655560434]
+:FORWARD ACCEPT [67552:5674368]
+:OUTPUT ACCEPT [1230814:136650485]
+:throughput-count - [0:0]
+:warden-default - [0:0]
+:warden-dispatch - [0:0]
+:warden-instance-16fe0br1hc5 - [0:0]
+[0:0] -A INPUT -i w-+ -j warden-dispatch
+[67552:5674368] -A FORWARD -o w-+ -j throughput-count
+[2941:247044] -A FORWARD -i w-+ -j warden-dispatch
+[0:0] -A FORWARD -o w-+ -j throughput-count
+[0:0] -A throughput-count -i w-16fe0br1hc5-0 -j TP_STATUS
+[0:0] -A throughput-count -o w-16fe0br1hc5-0 -j TP_STATUS
+[0:0] -A warden-dispatch -p tcp -m tcp ! --tcp-flags FIN,SYN,RST,ACK SYN -j throughput-count
+[0:0] -A warden-dispatch -i w-16fe0br1hc5-0 -g warden-instance-16fe0br1hc5
+[2941:247044] -A warden-dispatch -j DROP
+[0:0] -A warden-instance-16fe0br1hc5 -g warden-default
+COMMIT
+# Completed on Mon Nov 26 18:50:22 2012
View
174 tools/daylimit/src/daylimit/iptablesrun/iptablesrun.go
@@ -0,0 +1,174 @@
+package iptablesrun
+
+import (
+ "bytes"
+ "fmt"
+ "os/exec"
+ "regexp"
+ "strconv"
+ "strings"
+)
+
+const (
+ SAVECMD = "/sbin/iptables-save"
+ RESTORECMD = "/sbin/iptables-restore"
+ ARGS = "-c"
+ // Iptables in rule match regexp
+ INRULE = "\\[(\\d+):(\\d+)\\] -A throughput-count -i w-(\\w+)-0 -j (ACCEPT|DROP)"
+ // Iptables out rule match regexp
+ OUTRULE = "\\[(\\d+):(\\d+)\\] -A throughput-count -o w-(\\w+)-0 -j (ACCEPT|DROP)"
+ RULETPL = "[0:0] -A throughput-count -%c w-%s-0 -j %s"
+ TAGIPTABLESIN = 'i'
+ TAGIPTABLESOUT = 'o'
+)
+
+const (
+ ACCEPT = 1
+ DROP = 2
+)
+
+const (
+ IN = 0
+ OUT = 1
+)
+
+// Rule info per container id
+type RuleInfo struct {
+ Size int64
+ Status int8
+ InRule string
+ OutRule string
+}
+
+var blockList = make(map[string]int8)
+var unblockList = make(map[string]int8)
+var rules = make(map[string]*RuleInfo)
+var rawRule string
+var changeFrom = map[string]string{
+ "ACCEPT": "DROP",
+ "DROP": "ACCEPT",
+}
+
+func GetBlockList() map[string]int8 {
+ return blockList
+}
+
+func GetUnblockList() map[string]int8 {
+ return unblockList
+}
+
+func SetRules(info map[string]*RuleInfo) {
+ rules = info
+}
+
+func Block(id string) (ok bool) {
+ if _, ok = rules[id]; ok {
+ blockList[id] = 1
+ }
+ return ok
+}
+
+func Unblock(id string) (ok bool) {
+ if _, ok = rules[id]; ok {
+ unblockList[id] = 1
+ }
+ return ok
+}
+
+func Update() (err error) {
+ log := Logger()
+ if len(blockList)+len(unblockList) <= 0 {
+ return
+ }
+ oldNews := make([]string, 0, 2*(len(blockList)+len(unblockList)))
+ for id, _ := range unblockList {
+ oldNews = append(oldNews, rules[id].InRule, fmt.Sprintf(RULETPL, TAGIPTABLESIN, id, "ACCEPT"))
+ oldNews = append(oldNews, rules[id].OutRule, fmt.Sprintf(RULETPL, TAGIPTABLESOUT, id, "ACCEPT"))
+ }
+ for id, _ := range blockList {
+ oldNews = append(oldNews, rules[id].InRule, fmt.Sprintf(RULETPL, TAGIPTABLESIN, id, "DROP"))
+ oldNews = append(oldNews, rules[id].OutRule, fmt.Sprintf(RULETPL, TAGIPTABLESOUT, id, "DROP"))
+ }
+
+ ruleRep := strings.NewReplacer(oldNews...)
+ newRules := ruleRep.Replace(rawRule)
+ resCmd := exec.Command(RESTORECMD, ARGS)
+ var buf bytes.Buffer
+ buf.WriteString(newRules)
+ stdin, err := resCmd.StdinPipe()
+ if err != nil {
+ log.Errorf("Get stdin pipe error [%s]", err)
+ return
+ }
+ if err = resCmd.Start(); err != nil {
+ log.Errorf("Start iptables-restore error [%s]", err)
+ return
+ }
+ if _, err = buf.WriteTo(stdin); err != nil {
+ log.Errorf("Write rules to iptables-restore error [%s]", err)
+ return
+ }
+ stdin.Close()
+ if err = resCmd.Wait(); err != nil {
+ log.Errorf("Wait iptables-restore error [%s]", err)
+ return
+ }
+ blockList = make(map[string]int8)
+ unblockList = make(map[string]int8)
+ return
+}
+
+func FetchAll() (ret map[string]*RuleInfo, err error) {
+ log := Logger()
+ type reg struct {
+ rExp *regexp.Regexp
+ rule string
+ }
+
+ regRules := map[string]*reg{
+ "in": &reg{rule: INRULE},
+ "out": &reg{rule: OUTRULE},
+ }
+
+ for _, r := range regRules {
+ if r.rExp, err = regexp.Compile(r.rule); err != nil {
+ log.Errorf("Compile regexp [%s] error [%s]", r.rule, err)
+ return
+ }
+ }
+
+ cmd := exec.Command(SAVECMD, ARGS)
+ var out []byte
+ out, err = cmd.Output()
+ if err != nil {
+ log.Errorf("Run iptables-save error [%s]", err)
+ return
+ }
+ rawRule = string(out)
+ var size int64
+ ret = make(map[string]*RuleInfo)
+ for _, line := range strings.Split(rawRule, "\n") {
+ for inOut, r := range map[int]*regexp.Regexp{IN: regRules["in"].rExp, OUT: regRules["out"].rExp} {
+ if subs := r.FindStringSubmatch(line); subs != nil {
+ id := subs[3]
+ if size, err = strconv.ParseInt(subs[2], 10, 64); err != nil {
+ return
+ }
+ if _, ok := ret[id]; !ok {
+ ret[id] = &RuleInfo{}
+ }
+ ret[id].Size += size
+ if ret[id].Status = DROP; subs[4] == "ACCEPT" {
+ ret[id].Status = ACCEPT
+ }
+ if inOut == IN {
+ ret[id].InRule = subs[0]
+ } else {
+ ret[id].OutRule = subs[0]
+ }
+ }
+ }
+ }
+ rules = ret
+ return
+}
View
152 tools/daylimit/src/daylimit/iptablesrun/iptablesrun_test.go
@@ -0,0 +1,152 @@
+package iptablesrun
+
+import (
+ "bytes"
+ "io"
+ "io/ioutil"
+ "os/exec"
+ "strings"
+ "testing"
+)
+
+const (
+ RULETP = "./iptables-tpl"
+)
+
+var originalRule string
+
+func bakupIptables() error {
+ saveCmd := exec.Command(SAVECMD, ARGS)
+ if out, err := saveCmd.Output(); err != nil {
+ return err
+ } else {
+ originalRule = string(out)
+ }
+ return nil
+}
+
+func restoreIptables(rules string) (err error) {
+ if len(rules) <= 0 {
+ return
+ }
+ restoreCmd := exec.Command(RESTORECMD, ARGS)
+ var stdin io.WriteCloser
+ if stdin, err = restoreCmd.StdinPipe(); err != nil {
+ return
+ }
+ if err = restoreCmd.Start(); err != nil {
+ return
+ }
+ var buf bytes.Buffer
+ buf.WriteString(rules)
+ buf.WriteTo(stdin)
+ stdin.Close()
+ if err = restoreCmd.Wait(); err != nil {
+ return
+ }
+ return
+}
+
+func restoreTp(filename string, status int) error {
+ if rules, err := ioutil.ReadFile(filename); err != nil {
+ return err
+ } else {
+ var rs string
+ if status == ACCEPT {
+ rs = strings.Replace(string(rules), "TP_STATUS", "ACCEPT", -1)
+ } else {
+ rs = strings.Replace(string(rules), "TP_STATUS", "DROP", -1)
+ }
+ return restoreIptables(rs)
+ }
+ return nil
+}
+
+func TestFetchAll(t *testing.T) {
+ if err := bakupIptables(); err != nil {
+ t.Fatalf("Backup iptables error [%s]\n", err)
+ }
+ if err := restoreTp(RULETP, ACCEPT); err != nil {
+ t.Fatalf("Restore iptables template error [%s]\n", err)
+ }
+ rules, err := FetchAll()
+ if err != nil {
+ t.Fatalf("Fetch all rules error [%s]\n", err)
+ }
+ for id, rule := range rules {
+ if rule.Status == DROP {
+ t.Errorf("Id [%s] status should not be DROP", id)
+ }
+ }
+ restoreIptables(originalRule)
+ if !t.Failed() {
+ t.Log("FetchAll Passed")
+ }
+ return
+}
+
+func TestBlock(t *testing.T) {
+ if err := bakupIptables(); err != nil {
+ t.Fatalf("Backup iptables error [%s]\n", err)
+ }
+ if err := restoreTp(RULETP, ACCEPT); err != nil {
+ t.Fatalf("Restore iptables template error [%s]\n", err)
+ }
+ rules, err := FetchAll()
+ if err != nil {
+ t.Fatalf("Fetch all rules error [%s]\n", err)
+ }
+ for id, _ := range rules {
+ Block(id)
+ }
+ if err := Update(); err != nil {
+ t.Fatalf("Update rules error [%s]\n", err)
+ }
+ rules, err = FetchAll()
+ if err != nil {
+ t.Fatalf("Fetch all rules again error [%s]\n", err)
+ }
+ for id, rule := range rules {
+ if rule.Status == ACCEPT {
+ t.Errorf("Container [%s] is not blocked", id)
+ }
+ }
+ restoreIptables(originalRule)
+ if !t.Failed() {
+ t.Log("Block Passed")
+ }
+ return
+}
+
+func TestUnblock(t *testing.T) {
+ if err := bakupIptables(); err != nil {
+ t.Fatalf("Backup iptables error [%s]\n", err)
+ }
+ if err := restoreTp(RULETP, DROP); err != nil {
+ t.Fatalf("Restore iptables template error [%s]\n", err)
+ }
+ rules, err := FetchAll()
+ if err != nil {
+ t.Fatalf("Fetch all rules error [%s]\n", err)
+ }
+ for id, _ := range rules {
+ Unblock(id)
+ }
+ if err := Update(); err != nil {
+ t.Fatalf("Update rules error [%s]\n", err)
+ }
+ rules, err = FetchAll()
+ if err != nil {
+ t.Fatalf("Fetch all rules again error [%s]\n", err)
+ }
+ for id, rule := range rules {
+ if rule.Status == DROP {
+ t.Errorf("Container [%s] is not unblocked", id)
+ }
+ }
+ restoreIptables(originalRule)
+ if !t.Failed() {
+ t.Log("UnBlock Passed")
+ }
+ return
+}
View
27 tools/daylimit/src/daylimit/iptablesrun/logger.go
@@ -0,0 +1,27 @@
+package iptablesrun
+
+import (
+ steno "github.com/cloudfoundry/gosteno"
+ "os"
+)
+
+var obj steno.Logger
+
+func Logger() steno.Logger {
+ return obj
+}
+
+func InitLog(logFile string) {
+ c := &steno.Config{
+ Level: steno.LOG_INFO,
+ Codec: steno.NewJsonCodec(),
+ EnableLOC: true,
+ }
+ if logFile == "" {
+ c.Sinks = []steno.Sink{steno.NewIOSink(os.Stdout)}
+ } else {
+ c.Sinks = []steno.Sink{steno.NewFileSink(logFile)}
+ }
+ steno.Init(c)
+ obj = steno.NewLogger("")
+}
View
116 tools/daylimit/src/daylimit/main.go
@@ -0,0 +1,116 @@
+package main
+
+import (
+ "daylimit/iptablesrun"
+ "flag"
+ "fmt"
+ "os"
+ "syscall"
+ "time"
+)
+
+const (
+ MAXERR = 3
+)
+
+type serviceCheckPoint struct {
+ Id string
+ LastCheck time.Time
+ Size int64
+ Status int8
+ LastSize int64
+}
+
+var items map[string]*serviceCheckPoint
+
+type CmdOptions struct {
+ LimitWindow int64
+ LimitSize int64
+ LogFile string
+ FetchInteval int64
+}
+
+var opts CmdOptions
+
+func SizeCheck(id string, netInfo *iptablesrun.RuleInfo) {
+ ckInfo, ok := items[id]
+ if !ok {
+ items[id] = &serviceCheckPoint{
+ Id: id,
+ LastCheck: time.Now(),
+ LastSize: netInfo.Size,
+ Size: 0,
+ Status: 0}
+ ckInfo = items[id]
+ }
+ // Size is set only for ACCEPT rules
+ if netInfo.Status == iptablesrun.ACCEPT {
+ ckInfo.Size = netInfo.Size
+ }
+ if time.Since(ckInfo.LastCheck) > time.Duration(opts.LimitWindow)*time.Second {
+ tw := time.Duration(opts.LimitWindow)
+ ckInfo.LastSize = ckInfo.Size
+ ckInfo.LastCheck = ckInfo.LastCheck.Add(time.Since(ckInfo.LastCheck) / time.Second / tw * tw * time.Second)
+ if netInfo.Status == iptablesrun.DROP {
+ // Unblock connection
+ iptablesrun.Unblock(ckInfo.Id)
+ iptablesrun.Logger().Infof("Unblock container [%s]", ckInfo.Id)
+ ckInfo.Size = 0
+ ckInfo.LastSize = 0
+ }
+ }
+ if ckInfo.Size-ckInfo.LastSize > opts.LimitSize && netInfo.Status == iptablesrun.ACCEPT {
+ // Block connection
+ iptablesrun.Block(ckInfo.Id)
+ iptablesrun.Logger().Infof("Block container [%s]", ckInfo.Id)
+ ckInfo.Size = 0
+ ckInfo.LastSize = 0
+ }
+}
+
+func runDaemon() {
+ var errNum int8
+ for {
+ // Get iptables all rules
+ info, err := iptablesrun.FetchAll()
+ if err != nil {
+ iptablesrun.Logger().Errorf("Fetch iptables info error:%s", err)
+ time.Sleep(time.Duration(opts.FetchInteval) * time.Second)
+ errNum++
+ if errNum >= MAXERR {
+ os.Exit(2)
+ }
+ continue
+ }
+ errNum = 0
+ // Check limit match
+ for id, netInfo := range info {
+ SizeCheck(id, netInfo)
+ }
+ // Update rules
+ iptablesrun.Update()
+ time.Sleep(time.Duration(opts.FetchInteval) * time.Second)
+ }
+}
+
+func main() {
+ // Parse options
+ flag.StringVar(&opts.LogFile, "l", "", "Log file path")
+ flag.Int64Var(&opts.LimitWindow, "lw", 86400, "Limit time window default")
+ flag.Int64Var(&opts.LimitSize, "ls", 1*1024*1024, "Limit size")
+ flag.Int64Var(&opts.FetchInteval, "fi", 5*60, "Interval for get iptables info")
+ flag.Parse()
+ flag.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage: %s [-l log_file] [-lw limit_window] [-ls limit_size] [-fi fetch_interval]\n", os.Args[0])
+ flag.PrintDefaults()
+ os.Exit(2)
+ }
+
+ items = make(map[string]*serviceCheckPoint)
+
+ iptablesrun.InitLog(opts.LogFile)
+ if err := syscall.Setuid(0); err != nil {
+ panic(err)
+ }
+ runDaemon()
+}
View
52 tools/daylimit/src/daylimit/main_test.go
@@ -0,0 +1,52 @@
+package main
+
+import (
+ "daylimit/iptablesrun"
+ "fmt"
+ "testing"
+ "time"
+)
+
+func TestSizeCheck(t *testing.T) {
+ items = make(map[string]*serviceCheckPoint)
+ opts = CmdOptions{
+ LimitWindow: 10,
+ LimitSize: 1000,
+ FetchInteval: 5,
+ }
+ id := "test"
+
+ nInfo := &iptablesrun.RuleInfo{
+ Size: 0,
+ Status: iptablesrun.ACCEPT,
+ InRule: fmt.Sprintf("[0:0] -A throughput-count -i w-%s-0 -j ACCEPT", id),
+ OutRule: fmt.Sprintf("[0:0] -A throughput-count -o w-%s-0 -j ACCEPT", id),
+ }
+
+ iptablesrun.SetRules(
+ map[string]*iptablesrun.RuleInfo{
+ id: nInfo,
+ })
+ SizeCheck(id, nInfo)
+ nInfo.Size = opts.LimitSize + 1
+ SizeCheck(id, nInfo)
+ bl := iptablesrun.GetBlockList()
+ if _, ok := bl[id]; !ok {
+ t.Fatalf("Id [%s] is not blocked when size greater then limit size [%d]\n", id, opts.LimitSize)
+ }
+ t.Log("Passed the block case")
+
+ time.Sleep(time.Duration(opts.LimitWindow+1) * time.Second)
+ nInfo = &iptablesrun.RuleInfo{
+ Size: 0,
+ Status: iptablesrun.DROP,
+ InRule: fmt.Sprintf("[0:0] -A throughput-count -i w-%s-0 -j DROP", id),
+ OutRule: fmt.Sprintf("[0:0] -A throughput-count -o w-%s-0 -j DROP", id),
+ }
+ SizeCheck(id, nInfo)
+ bl = iptablesrun.GetUnblockList()
+ if _, ok := bl[id]; !ok {
+ t.Fatalf("Id [%s] is not unblocked when size less then limit size [%d]\n", id, opts.LimitSize)
+ }
+ t.Log("Passed the unblock case")
+}

0 comments on commit bb84d26

Please sign in to comment.