Skip to content
This repository has been archived by the owner on Dec 1, 2021. It is now read-only.

Commit

Permalink
add rate limiter for metricsforwarder and golangapiserver (#518)
Browse files Browse the repository at this point in the history
* add rate limiter for metricsforwarder

* change the key of bucket token from remoteIP to appID-instanceID

* make the expire duration to be configurable

* move ratelimiter as a common function

* keep fill interval to be configurable only

* init ratelimit bucket with quantum

* add more test cases and add lock when manage/view ratelimit bucket entry

* change ratelimit to be a middleware in golangapi

* add ratelimiter middleware as a common middleware

* check ratelimiter before other middlewares

* add rarelimiter middleware tests for api and mereicsforwarder
  • Loading branch information
zyjiaobj authored and qibobo committed Oct 12, 2019
1 parent c3af7ca commit b844879
Show file tree
Hide file tree
Showing 35 changed files with 1,137 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,6 @@
[submodule "src/github.com/patrickmn/go-cache"]
path = src/github.com/patrickmn/go-cache
url = https://github.com/patrickmn/go-cache
[submodule "src/github.com/juju/ratelimit"]
path = src/github.com/juju/ratelimit
url = https://github.com/juju/ratelimit
2 changes: 2 additions & 0 deletions src/autoscaler/api/cmd/api/api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ var _ = SynchronizedBeforeSuite(func() []byte {
cfg.Health = models.HealthConfig{
Port: healthport,
}
cfg.RateLimit.MaxAmount = 10
cfg.RateLimit.ValidDuration = 1 * time.Second

configFile = writeConfig(&cfg)
apiClientTLSConfig, err := cfhttp.NewTLSConfig(
Expand Down
6 changes: 4 additions & 2 deletions src/autoscaler/api/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"autoscaler/db/sqldb"
"autoscaler/healthendpoint"
"autoscaler/helpers"
"autoscaler/ratelimiter"

"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager"
Expand Down Expand Up @@ -106,7 +107,8 @@ func main() {
os.Exit(1)
}

publicApiHttpServer, err := publicapiserver.NewPublicApiServer(logger.Session("public_api_http_server"), conf, policyDb, checkBindingFunc, cfClient, httpStatusCollector)
rateLimiter := ratelimiter.DefaultRateLimiter(conf.RateLimit.MaxAmount, conf.RateLimit.ValidDuration, logger.Session("api-ratelimiter"))
publicApiHttpServer, err := publicapiserver.NewPublicApiServer(logger.Session("public_api_http_server"), conf, policyDb, checkBindingFunc, cfClient, httpStatusCollector, rateLimiter)
if err != nil {
logger.Error("failed to create public api http server", err)
os.Exit(1)
Expand All @@ -130,4 +132,4 @@ func main() {
os.Exit(1)
}
logger.Info("exited")
}
}
17 changes: 15 additions & 2 deletions src/autoscaler/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"strings"
"time"

"golang.org/x/crypto/bcrypt"

Expand All @@ -18,7 +19,9 @@ import (
)

const (
DefaultLoggingLevel = "info"
DefaultLoggingLevel = "info"
DefaultMaxAmount = 10
DefaultValidDuration time.Duration = 1 * time.Second
)

type ServerConfig struct {
Expand Down Expand Up @@ -87,6 +90,7 @@ type Config struct {
InfoFilePath string `yaml:"info_file_path"`
MetricsForwarder MetricsForwarderConfig `yaml:"metrics_forwarder"`
Health models.HealthConfig `yaml:"health"`
RateLimit models.RateLimitConfig `yaml:"rate_limit"`
}

func LoadConfig(reader io.Reader) (*Config, error) {
Expand All @@ -98,6 +102,10 @@ func LoadConfig(reader io.Reader) (*Config, error) {
CF: cf.CFConfig{
SkipSSLValidation: false,
},
RateLimit: models.RateLimitConfig{
MaxAmount: DefaultMaxAmount,
ValidDuration: DefaultValidDuration,
},
}

bytes, err := ioutil.ReadAll(reader)
Expand Down Expand Up @@ -143,6 +151,12 @@ func (c *Config) Validate() error {
if c.PolicySchemaPath == "" {
return fmt.Errorf("Configuration error: PolicySchemaPath is empty")
}
if c.RateLimit.MaxAmount <= 0 {
return fmt.Errorf("Configuration error: RateLimit.MaxAmount is equal or less than zero")
}
if c.RateLimit.ValidDuration <= 0 * time.Nanosecond {
return fmt.Errorf("Configuration error: RateLimit.ValidDuration is equal or less than zero nanosecond")
}

if c.InfoFilePath == "" {
return fmt.Errorf("Configuration error: InfoFilePath is empty")
Expand Down Expand Up @@ -200,6 +214,5 @@ func (c *Config) Validate() error {
return fmt.Errorf(errString)
}
}

return nil
}
156 changes: 156 additions & 0 deletions src/autoscaler/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,142 @@ cf:
client_id: client-id
secret: client-secret
skip_ssl_validation: false
`)
})
It("should error", func() {
Expect(err).To(BeAssignableToTypeOf(&yaml.TypeError{}))
Expect(err).To(MatchError(MatchRegexp("cannot unmarshal.*into time.Duration")))
})
})
Context("when max_amount of rate_limit is not an integer", func() {
BeforeEach(func() {
configBytes = []byte(`
broker_server:
port: 8080
public_api_server:
port: 8081
logging:
level: debug
broker_username: brokeruser
broker_password: supersecretpassword
db:
binding_db:
url: postgres://postgres:postgres@localhost/autoscaler?sslmode=disable
max_open_connections: 10
max_idle_connections: 5
connection_max_lifetime: 60s
policy_db:
url: postgres://postgres:postgres@localhost/autoscaler?sslmode=disable
max_open_connections: 10
max_idle_connections: 5
connection_max_lifetime: 60s
catalog_schema_path: '../schemas/catalog.schema.json'
catalog_path: '../exampleconfig/catalog-example.json'
policy_schema_path: '../exampleconfig/policy.schema.json'
scheduler:
scheduler_url: https://localhost:8083
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/sc.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/sc.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
scaling_engine:
scaling_engine_url: https://localhost:8083
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/se.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/se.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
metrics_collector:
metrics_collector_url: https://localhost:8084
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/mc.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/mc.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
event_generator:
event_generator_url: https://localhost:8083
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/eg.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/eg.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
metrics_forwarder:
metrics_forwarder_url: https://localhost:8088
use_buildin_mode: false
info_file_path: /var/vcap/jobs/autoscaer/config/info-file.json
cf:
api: https://api.example.com
client_id: client-id
secret: client-secret
skip_ssl_validation: false
rate_limit:
max_amount: NOT-INTEGER
valid_duration: 1s
`)
})
It("should error", func() {
Expect(err).To(BeAssignableToTypeOf(&yaml.TypeError{}))
Expect(err).To(MatchError(MatchRegexp("cannot unmarshal.*into int")))
})
})
Context("when valid_duration of rate_limit is not a time duration", func() {
BeforeEach(func() {
configBytes = []byte(`
broker_server:
port: 8080
public_api_server:
port: 8081
logging:
level: debug
broker_username: brokeruser
broker_password: supersecretpassword
db:
binding_db:
url: postgres://postgres:postgres@localhost/autoscaler?sslmode=disable
max_open_connections: 10
max_idle_connections: 5
connection_max_lifetime: 60s
policy_db:
url: postgres://postgres:postgres@localhost/autoscaler?sslmode=disable
max_open_connections: 10
max_idle_connections: 5
connection_max_lifetime: 60s
catalog_schema_path: '../schemas/catalog.schema.json'
catalog_path: '../exampleconfig/catalog-example.json'
policy_schema_path: '../exampleconfig/policy.schema.json'
scheduler:
scheduler_url: https://localhost:8083
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/sc.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/sc.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
scaling_engine:
scaling_engine_url: https://localhost:8083
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/se.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/se.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
metrics_collector:
metrics_collector_url: https://localhost:8084
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/mc.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/mc.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
event_generator:
event_generator_url: https://localhost:8083
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/eg.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/eg.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
metrics_forwarder:
metrics_forwarder_url: https://localhost:8088
use_buildin_mode: false
info_file_path: /var/vcap/jobs/autoscaer/config/info-file.json
cf:
api: https://api.example.com
client_id: client-id
secret: client-secret
skip_ssl_validation: false
rate_limit:
max_amount: 2
valid_duration: NOT-TIME-DURATION
`)
})
It("should error", func() {
Expand Down Expand Up @@ -854,6 +990,8 @@ cf:
conf.InfoFilePath = "../exampleconfig/info-file.json"
conf.UseBuildInMode = false

conf.RateLimit.MaxAmount = 10
conf.RateLimit.ValidDuration = 1 * time.Second
})
JustBeforeEach(func() {
err = conf.Validate()
Expand Down Expand Up @@ -1076,6 +1214,24 @@ cf:
})
})

Context("when rate_limit.max_amount is <= zero", func() {
BeforeEach(func() {
conf.RateLimit.MaxAmount = 0
})
It("should err", func() {
Expect(err).To(MatchError(MatchRegexp("Configuration error: RateLimit.MaxAmount is equal or less than zero")))
})
})

Context("when rate_limit.valid_duration is <= 0 ns", func() {
BeforeEach(func() {
conf.RateLimit.ValidDuration = 0 * time.Nanosecond
})
It("should err", func() {
Expect(err).To(MatchError(MatchRegexp("Configuration error: RateLimit.ValidDuration is equal or less than zero nanosecond")))
})
})

Describe("Using BuildIn Mode", func() {
BeforeEach(func() {
conf.UseBuildInMode = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ db:
max_open_connections: 10
max_idle_connections: 5
connection_max_lifetime: 60s
policy_db:
url: postgres://postgres:postgres@localhost/autoscaler?sslmode=disable
max_open_connections: 10
max_idle_connections: 5
connection_max_lifetime: 60s
dashboard_redirect_uri: "https://dashboard-redirect-uri-settings.example.com"
catalog_schema_path: "/var/vcap/jobs/api/packages/api/config/catalog.schema.json"
catalog_path: "/var/vcap/jobs/api/packages/api/config/catalog.json"
Expand All @@ -34,6 +39,12 @@ event_generator:
key_file: /var/vcap/jobs/autoscaler/config/certs/eg.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/eg.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
scheduler:
scheduler_url: http://localhost:8082
tls:
key_file: /var/vcap/jobs/autoscaler/config/certs/eg.key
cert_file: /var/vcap/jobs/autoscaler/config/certs/eg.crt
ca_file: /var/vcap/jobs/autoscaler/config/certs/autoscaler-ca.crt
metrics_forwarder:
metrics_forwarder_url: http://localhost:8088
use_buildin_mode: false
Expand All @@ -43,3 +54,7 @@ cf:
secret: client-secret
skip_ssl_validation: false
grant_type: client_credentials
info_file_path: /var/vcap/jobs/golangapiserver/config/info.json
rate_limit:
max_amount: 10
valid_duration: 1s
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,4 @@ func (mw *Middleware) isValidUserToken(userToken string) bool {
}

return true
}
}
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,4 @@ var _ = Describe("Middleware", func() {
})
})

})
})
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/public_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,4 @@ func (h *PublicApiHandler) DeleteCredential(w http.ResponseWriter, r *http.Reque
}
handlers.WriteJSONResponse(w, http.StatusOK, nil)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1721,4 +1721,4 @@ var _ = Describe("PublicApiHandler", func() {
})

})
})
})
9 changes: 7 additions & 2 deletions src/autoscaler/api/publicapiserver/public_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"autoscaler/cf"
"autoscaler/db"
"autoscaler/healthendpoint"
"autoscaler/ratelimiter"
"autoscaler/routes"

"code.cloudfoundry.org/cfhttp"
Expand All @@ -25,23 +26,26 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vh(w, r, vars)
}

func NewPublicApiServer(logger lager.Logger, conf *config.Config, policydb db.PolicyDB, checkBindingFunc api.CheckBindingFunc, cfclient cf.CFClient, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
func NewPublicApiServer(logger lager.Logger, conf *config.Config, policydb db.PolicyDB, checkBindingFunc api.CheckBindingFunc, cfclient cf.CFClient, httpStatusCollector healthendpoint.HTTPStatusCollector, rateLimiter ratelimiter.Limiter) (ifrit.Runner, error) {
pah := NewPublicApiHandler(logger, conf, policydb)
mw := NewMiddleware(logger, cfclient, checkBindingFunc)
rateLimiterMiddleware := ratelimiter.NewRateLimiterMiddleware("appId", rateLimiter, logger.Session("api-ratelimiter-middleware"))
httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector)
r := routes.ApiOpenRoutes()
r.Use(httpStatusCollectMiddleware.Collect)
r.Get(routes.PublicApiInfoRouteName).Handler(VarsFunc(pah.GetApiInfo))
r.Get(routes.PublicApiHealthRouteName).Handler(VarsFunc(pah.GetHealth))

rp := routes.ApiRoutes()
rp.Use(rateLimiterMiddleware.CheckRateLimit)
rp.Use(mw.Oauth)
rp.Use(httpStatusCollectMiddleware.Collect)
rp.Get(routes.PublicApiScalingHistoryRouteName).Handler(VarsFunc(pah.GetScalingHistories))
rp.Get(routes.PublicApiMetricsHistoryRouteName).Handler(VarsFunc(pah.GetInstanceMetricsHistories))
rp.Get(routes.PublicApiAggregatedMetricsHistoryRouteName).Handler(VarsFunc(pah.GetAggregatedMetricsHistories))

rpolicy := routes.ApiPolicyRoutes()
rpolicy.Use(rateLimiterMiddleware.CheckRateLimit)
rpolicy.Use(mw.Oauth)
if !conf.UseBuildInMode {
rpolicy.Use(mw.CheckServiceBinding)
Expand All @@ -52,11 +56,12 @@ func NewPublicApiServer(logger lager.Logger, conf *config.Config, policydb db.Po
rpolicy.Get(routes.PublicApiDetachPolicyRouteName).Handler(VarsFunc(pah.DetachScalingPolicy))

rcredential := routes.ApiCredentialRoutes()
rcredential.Use(rateLimiterMiddleware.CheckRateLimit)
if !conf.UseBuildInMode {
rcredential.Use(mw.RejectCredentialOperationInServiceOffering)
}
rcredential.Use(httpStatusCollectMiddleware.Collect)
rcredential.Use(mw.Oauth)
rcredential.Use(httpStatusCollectMiddleware.Collect)
rcredential.Get(routes.PublicApiCreateCredentialRouteName).Handler(VarsFunc(pah.CreateCredential))
rcredential.Get(routes.PublicApiDeleteCredentialRouteName).Handler(VarsFunc(pah.DeleteCredential))
addr := fmt.Sprintf("0.0.0.0:%d", conf.PublicApiServer.Port)
Expand Down
Loading

0 comments on commit b844879

Please sign in to comment.