Skip to content

Commit

Permalink
Limit Connections (#571)
Browse files Browse the repository at this point in the history
* Add limit-rps and limit-rpm
* HTTP Limits
* Limit Connections
* Add reference links
  • Loading branch information
sadlil authored and tamalsaha committed Oct 6, 2017
1 parent c682474 commit cff6917
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 11 deletions.
28 changes: 28 additions & 0 deletions apis/voyager/v1beta1/annotations.go
Expand Up @@ -213,6 +213,19 @@ const (

// https://github.com/appscode/voyager/issues/525
ErrorFiles = EngressKey + "/errorfiles"

// Limit requests per second per IP address
// http://cbonte.github.io/haproxy-dconv/1.8/configuration.html#7.3.3-sc_conn_rate
// https://serverfault.com/a/679172/349346
// https://discourse.haproxy.org/t/solved-how-to-configure-basic-ddos-protection-when-behind-aws-elb-x-forwarded-for/932
// https://www.haproxy.com/blog/use-a-load-balancer-as-a-first-row-of-defense-against-ddos/
LimitRPS = IngressKey + "/limit-rps"
// Limit requests per minute per IP address
LimitRPM = IngressKey + "/limit-rpm"

// http://cbonte.github.io/haproxy-dconv/1.8/configuration.html#7.3.3-src_conn_cur
// https://www.haproxy.com/blog/use-a-load-balancer-as-a-first-row-of-defense-against-ddos/
LimitConnection = IngressKey + "/limit-connection"
)

const (
Expand Down Expand Up @@ -530,6 +543,21 @@ func (r Ingress) ErrorFilesConfigMapName() string {
return GetString(r.Annotations, ErrorFiles)
}

func (r Ingress) LimitRPS() int {
value, _ := GetInt(r.Annotations, LimitRPS)
return value
}

func (r Ingress) LimitRPM() int {
value, _ := GetInt(r.Annotations, LimitRPM)
return value
}

func (r Ingress) LimitConnections() int {
value, _ := GetInt(r.Annotations, LimitConnection)
return value
}

// ref: https://github.com/kubernetes/kubernetes/blob/078238a461a0872a8eacb887fbb3d0085714604c/staging/src/k8s.io/apiserver/pkg/apis/example/v1/types.go#L134
// Deprecated, for newer ones use '{"k1":"v1", "k2", "v2"}' form
// This expects the form k1=v1,k2=v2
Expand Down
17 changes: 17 additions & 0 deletions hack/docker/voyager/templates/default-frontend.cfg
@@ -1,6 +1,23 @@
frontend http-frontend
bind *:80 {{ if .AcceptProxy }}accept-proxy{{ end }}

# Limit Connections
{{- if .Limit }}
{{- if .Limit.Connection }}
stick-table type ip size 100k expire 2m store conn_cur
acl __mark_as_overload_conn__ sc1_conn_cur gt {{ .Limit.Connection }}
tcp-request content track-sc1 src
http-request deny if __mark_as_overload_conn__
{{ end }}
{{- if .Limit.Rate }}
tcp-request inspect-delay 5s
stick-table type ip size 1m expire 5m store conn_rate({{ .Limit.TimeSecond }}s)
acl __mark_as_overload__ sc1_conn_rate gt {{ .Limit.Rate }}
tcp-request content track-sc1 src
http-request deny if __mark_as_overload__
{{ end -}}
{{ end }}

mode http
option httplog
option forwardfor
Expand Down
19 changes: 18 additions & 1 deletion hack/docker/voyager/templates/http-frontend.cfg
Expand Up @@ -5,12 +5,29 @@ frontend {{ .FrontendName }}
rsprep ^Set-Cookie:\ (.*) Set-Cookie:\ \1;\ Secure
{{- if .EnableHSTS }}
# Add the HSTS header with a 6 month default max-age
rspadd Strict-Transport-Security:\ max-age={{ .HSTSMaxAge }}{{ if .HSTSPreload }};\ preload{{ end }}{{if .HSTSIncludeSubDomains }};\ includeSubDomains{{ end }}
rspadd Strict-Transport-Security:\ max-age={{ .HSTSMaxAge }}{{ if .HSTSPreload }};\ preload{{ end }}{{ if .HSTSIncludeSubDomains }};\ includeSubDomains{{ end }}
{{ end -}}
{{ else -}}
bind *:{{ .Port }} {{ if .AcceptProxy }}accept-proxy{{ end }}
{{ end -}}

# Limit Connections
{{- if .Limit }}
{{- if .Limit.Connection }}
stick-table type ip size 100k expire 2m store conn_cur
acl __mark_as_overload_conn__ sc1_conn_cur gt {{ .Limit.Connection }}
tcp-request content track-sc1 src
http-request deny if __mark_as_overload_conn__
{{ end }}
{{- if .Limit.Rate }}
tcp-request inspect-delay 5s
stick-table type ip size 1m expire 5m store conn_rate({{ .Limit.TimeSecond }}s)
acl __mark_as_overload__ sc1_conn_rate gt {{ .Limit.Rate }}
tcp-request content track-sc1 src
http-request deny if __mark_as_overload__
{{ end -}}
{{ end }}

mode http

{{- if .WhitelistSourceRange }}
Expand Down
23 changes: 23 additions & 0 deletions hack/docker/voyager/templates/tcp-frontend.cfg
@@ -1,6 +1,29 @@
frontend {{ .FrontendName }}
bind *:{{ .Port }} {{ if .AcceptProxy }}accept-proxy{{ end }} {{ if .SecretName }}ssl no-sslv3 no-tlsv10 no-tls-tickets crt /etc/ssl/private/haproxy/{{ .SecretName }}.pem{{ end }} {{ if .ALPNOptions }}{{ .ALPNOptions }}{{ end }}
mode tcp

{{- if .Limit }}
{{- if .Limit.Rate }}
stick-table type ip size 100k expire 10m store conn_rate({{ .Limit.TimeSecond }}s)
tcp-request connection reject if { src_conn_rate ge {{ .Limit.Rate }} }
tcp-request connection track-sc1 src
{{ end -}}
{{ end }}

# Limit Connections
{{- if .Limit }}
{{- if .Limit.Connection }}
stick-table type ip size 100k expire 2m store conn_cur
tcp-request connection reject if { sc1_conn_cur gt {{ .Limit.Connection }} }
tcp-request connection track-sc1 src
{{ end }}
{{- if .Limit.Rate }}
stick-table type ip size 100k expire 10m store conn_rate({{ .Limit.TimeSecond }}s)
tcp-request connection reject if { src_conn_rate gt {{ .Limit.Rate }} }
tcp-request connection track-sc1 src
{{ end -}}
{{ end }}

{{- if .WhitelistSourceRange }}
# Add whitelisted ips
acl network_allowed src {{ .WhitelistSourceRange }}
Expand Down
1 change: 1 addition & 0 deletions pkg/haproxy/template_test.go
Expand Up @@ -59,6 +59,7 @@ func TestTemplate(t *testing.T) {
},
MaxConnections: 3000,
ForceSSLRedirect: true,
Limit: &Limit{Rate: 5, TimeSecond: 20},
}
testParsedConfig := TemplateData{
SharedInfo: si,
Expand Down
7 changes: 7 additions & 0 deletions pkg/haproxy/types.go
Expand Up @@ -38,6 +38,7 @@ type SharedInfo struct {
WhitelistSourceRange string
MaxConnections int
ForceMatchServicePort bool
Limit *Limit
ForceSSLRedirect bool
}

Expand Down Expand Up @@ -136,3 +137,9 @@ type AuthUser struct {
Password string
Encrypted bool
}

type Limit struct {
Connection int
TimeSecond int
Rate int
}
14 changes: 13 additions & 1 deletion pkg/ingress/parser.go
Expand Up @@ -221,8 +221,20 @@ func (c *controller) generateConfig() error {
WhitelistSourceRange: c.Ingress.WhitelistSourceRange(),
MaxConnections: c.Ingress.MaxConnections(),
ForceMatchServicePort: c.Ingress.ForceServicePort(),
ForceSSLRedirect: c.Ingress.ForceSSLRedirect(),
Limit: &haproxy.Limit{
Connection: c.Ingress.LimitConnections(),
},
ForceSSLRedirect: c.Ingress.ForceSSLRedirect(),
}

if val := c.Ingress.LimitRPM(); val > 0 {
si.Limit.TimeSecond = 60
si.Limit.Rate = val
} else if val := c.Ingress.LimitRPS(); val > 0 {
si.Limit.TimeSecond = 1
si.Limit.Rate = val
}

if c.Opt.CloudProvider == "aws" && c.Ingress.LBType() == api.LBTypeLoadBalancer {
si.AcceptProxy = c.Ingress.KeepSourceIP()
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/ingress/rbac.go
@@ -1,9 +1,6 @@
package ingress

import (
"encoding/json"
"fmt"

core_util "github.com/appscode/kutil/core/v1"
rbac_util "github.com/appscode/kutil/rbac/v1beta1"
api "github.com/appscode/voyager/apis/voyager/v1beta1"
Expand Down Expand Up @@ -77,12 +74,6 @@ func (c *controller) ensureRoles() error {
Verbs: []string{"get", "list", "watch"},
})
}

rb, _ := json.MarshalIndent(in, "", " ")
fmt.Println()
fmt.Println(string(rb))
fmt.Println()

return in
})
return err
Expand Down
114 changes: 114 additions & 0 deletions test/e2e/ingress_ops.go
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"time"

"github.com/appscode/go/log"
internalapi "github.com/appscode/voyager/apis/voyager"
api "github.com/appscode/voyager/apis/voyager/v1beta1"
"github.com/appscode/voyager/test/framework"
Expand Down Expand Up @@ -861,4 +862,117 @@ var _ = Describe("IngressOperations", func() {

})
})

Describe("With Limit RPM", func() {
BeforeEach(func() {
ing.Annotations[api.LimitRPM] = "2"
})

It("Should Allow 2 Connections In one minute", func() {
By("Getting HTTP endpoints")
eps, err := f.Ingress.GetHTTPEndpoints(ing)
Expect(err).NotTo(HaveOccurred())
Expect(len(eps)).Should(BeNumerically(">=", 1))

err = f.Ingress.DoHTTP(framework.MaxRetry, "", ing, eps, "GET",
"/testpath/ok",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
Expect(err).NotTo(HaveOccurred())

err = f.Ingress.DoHTTP(framework.MaxRetry, "", ing, eps, "GET",
"/testpath/ok",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
Expect(err).NotTo(HaveOccurred())

err = f.Ingress.DoHTTP(framework.NoRetry, "", ing, eps, "GET",
"/testpath/ok",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
Expect(err).To(HaveOccurred())

log.Warningln("Waiting 2 minute for timer to be reset")
time.Sleep(time.Minute * 2)
log.Warningln("Request should response")
err = f.Ingress.DoHTTP(framework.MaxRetry, "", ing, eps, "GET",
"/testpath/ok",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
Expect(err).NotTo(HaveOccurred())
})
})

Describe("With Limit Max Concurrent connection per ip", func() {
BeforeEach(func() {
ing.Annotations[api.LimitConnection] = "2"
ing.Annotations[api.DefaultsTimeOut] = `{"connect": "300s", "server": "300s"}`
})

It("Should Allow 2 From IP Connections", func() {
By("Getting HTTP endpoints")
eps, err := f.Ingress.GetHTTPEndpoints(ing)
Expect(err).NotTo(HaveOccurred())
Expect(len(eps)).Should(BeNumerically(">=", 1))

errChan := make(chan error)
for i := 1; i <= 2; i++ {
go func() {
err := f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 150, "", ing, eps, "GET",
"/testpath/ok?delay=60s",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
errChan <- err
}()
}

// Ensure this request must occurred after two long running request
time.Sleep(time.Second * 10)
err = f.Ingress.DoHTTP(framework.NoRetry, "", ing, eps, "GET",
"/testpath/ok",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
Expect(err).To(HaveOccurred())

Expect(<-errChan).NotTo(HaveOccurred())
Expect(<-errChan).NotTo(HaveOccurred())

log.Warningln("Waiting 2 minute for timer to be reset")
time.Sleep(time.Minute * 1)
log.Warningln("Request should response")
err = f.Ingress.DoHTTP(framework.MaxRetry, "", ing, eps, "GET",
"/testpath/ok",
func(r *testserverclient.Response) bool {
return Expect(r.Status).Should(Equal(http.StatusOK)) &&
Expect(r.Method).Should(Equal("GET")) &&
Expect(r.Path).Should(Equal("/testpath/ok"))
},
)
Expect(err).NotTo(HaveOccurred())
})
})
})
56 changes: 56 additions & 0 deletions test/e2e/ingress_tcp.go
@@ -1,6 +1,8 @@
package e2e

import (
"time"

api "github.com/appscode/voyager/apis/voyager/v1beta1"
"github.com/appscode/voyager/test/framework"
"github.com/appscode/voyager/test/test-server/testserverclient"
Expand Down Expand Up @@ -192,4 +194,58 @@ var _ = Describe("IngressTCP", func() {
// TODO @ dipta: how to test if whitelist is actually working?
})
})

Describe("Create TCP With Limit RPM", func() {
BeforeEach(func() {
ing.Spec.Rules = []api.IngressRule{
{
IngressRuleValue: api.IngressRuleValue{
TCP: &api.TCPIngressRuleValue{
Port: intstr.FromInt(4001),
Backend: api.IngressBackend{
ServiceName: f.Ingress.TestServerName(),
ServicePort: intstr.FromInt(4343),
},
},
},
},
}
ing.Annotations[api.LimitRPM] = "2"
})

It("Should test TCP Connections", func() {
By("Getting HTTP endpoints")
eps, err := f.Ingress.GetHTTPEndpoints(ing)
Expect(err).NotTo(HaveOccurred())
Expect(len(eps)).Should(BeNumerically(">=", 1))

svc, err := f.Ingress.GetOffShootService(ing)
Expect(err).NotTo(HaveOccurred())
Expect(len(svc.Spec.Ports)).Should(Equal(1))
Expect(svc.Spec.Ports[0].Port).To(Equal(int32(4001)))

err = f.Ingress.DoTCP(framework.MaxRetry, ing, eps, func(r *testserverclient.Response) bool {
return Expect(r.ServerPort).Should(Equal(":4343"))
})
Expect(err).NotTo(HaveOccurred())

err = f.Ingress.DoTCP(framework.MaxRetry, ing, eps, func(r *testserverclient.Response) bool {
return Expect(r.ServerPort).Should(Equal(":4343"))
})
Expect(err).NotTo(HaveOccurred())

err = f.Ingress.DoTCP(3, ing, eps, func(r *testserverclient.Response) bool {
return Expect(r.ServerPort).Should(Equal(":4343"))
})
Expect(err).To(HaveOccurred())

// Wait for the rates to be reset
time.Sleep(time.Minute)

err = f.Ingress.DoTCP(framework.MaxRetry, ing, eps, func(r *testserverclient.Response) bool {
return Expect(r.ServerPort).Should(Equal(":4343"))
})
Expect(err).NotTo(HaveOccurred())
})
})
})

0 comments on commit cff6917

Please sign in to comment.