Skip to content

Commit

Permalink
Bow realize current limiting function
Browse files Browse the repository at this point in the history
  • Loading branch information
abericyang@gmail.com committed May 14, 2019
1 parent 6177c40 commit 10d96ce
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 5 deletions.
16 changes: 12 additions & 4 deletions bow/bow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type Bow struct {

// RouteService 路由对象
type RouteService struct {
Name string
InURI string
OutRemote string
OutURI string
Name string // 服务名称
InURI string // 路由入口 URI
OutRemote string // 路由出口地址
OutURI string // 路由出口 URI
Limit *Limit // 服务限流策略
}

// Add 新增路由服务数组
Expand All @@ -60,6 +61,9 @@ func (s *Bow) Add(routeServiceArr ...*RouteService) {
routeService := routeServiceArr[index]
routeServices[routeService.Name] = routeService
GetBowInstance().register(routeService)
if nil != routeService.Limit {
go routeService.Limit.limit()
}
serviceCount++
}
}
Expand Down Expand Up @@ -106,6 +110,10 @@ func RunBowCallback(context *gin.Context, serviceName string, filter func(contex
context.JSON(http.StatusOK, result)
return
}
// 限流
if nil != routeService.Limit {
routeService.Limit.LimitChan <- 1
}
if nil == f {
request.SyncPoolGetRequest().Call(context, context.Request.Method, routeService.OutRemote, routeService.OutURI)
} else {
Expand Down
68 changes: 68 additions & 0 deletions bow/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2019. ENNOO - All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bow

import (
"github.com/ennoo/rivet/utils/log"
"strconv"
"time"
)

// Limit 限流对象
type Limit struct {
LimitMillisecond int64 // 请求限定的时间段(毫秒)
LimitCount int // 请求限定的时间段内允许的请求次数
LimitIntervalMillisecond int64 // 请求允许的最小间隔时间(毫秒),0表示不限
LimitChan chan int // 限流通道
Times []int64 // 请求时间数组
}

func (l *Limit) timeInit() {
l.Times = make([]int64, 0)
for i := 0; i < l.LimitCount; i++ {
time.Sleep(10 * time.Millisecond)
l.add(time.Now().UnixNano() / 1e6)
}
}

func (l *Limit) limit() {
for {
timeNow := time.Now().UnixNano() / 1e6
if len(l.Times) < l.LimitCount {
l.add(time.Now().UnixNano() / 1e6)
} else if timeNow-l.Times[0] > l.LimitMillisecond && timeNow-l.Times[len(l.Times)-1] > l.LimitIntervalMillisecond {
limitChanResult := strconv.Itoa(<-l.LimitChan)
log.Bow.Debug("取出一个元素,放行 <-c = " + limitChanResult)
if len(l.Times) == l.LimitCount {
l.remove()
}
l.add(time.Now().UnixNano() / 1e6)
} else {
time.Sleep(100 * time.Millisecond)
}
}
}

// add 新增一个元素
func (l *Limit) add(time int64) {
if len(l.Times) < l.LimitCount {
l.Times = append(l.Times, time)
}
}

// remove 移除第一个元素
func (l *Limit) remove() {
l.Times = l.Times[1:]
}
79 changes: 79 additions & 0 deletions bow/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2019. ENNOO - All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bow

import (
"fmt"
"testing"
"time"
)

func TestLimit(t *testing.T) {
limit := Limit{
LimitMillisecond: int64(1 * 1000),
LimitCount: 3,
LimitIntervalMillisecond: 150,
LimitChan: make(chan int, 10),
}
fmt.Printf("时间戳(秒):%v;\n", time.Now().Unix())
fmt.Printf("时间戳(纳秒):%v;\n", time.Now().UnixNano())
fmt.Printf("时间戳(毫秒):%v;\n", time.Now().UnixNano()/1e6)
fmt.Printf("时间戳(纳秒转换为秒):%v;\n", time.Now().UnixNano()/1e9)
limit.timeInit()
fmt.Printf("%v\n", limit.Times)
time.Sleep(1 * time.Second)
limit.remove()
fmt.Printf("%v\n", limit.Times)
time.Sleep(1 * time.Second)
limit.add(time.Now().UnixNano() / 1e6)
fmt.Printf("%v\n", limit.Times)
go limit.limit()
loop(&limit)
}

func loop(limit *Limit) {
i := 0
for i <= 20 {
fmt.Println("被堵住了 c len = ", len(limit.LimitChan))
limit.LimitChan <- i
fmt.Printf("被放行了 %v\n", limit.Times)
i++
time.Sleep(100 * time.Millisecond)
}
}

func TestLimitMap(t *testing.T) {
serviceName := "test"
routeService := RouteService{
Name: serviceName,
InURI: "hello1",
OutRemote: "http://localhost:8081",
OutURI: "rivet/shunt",
Limit: &Limit{
LimitMillisecond: int64(1 * 1000),
LimitCount: 3,
LimitIntervalMillisecond: 150,
LimitChan: make(chan int, 10),
},
}
go routeService.Limit.limit()
loopMap(routeService.Limit)
}

func loopMap(limit *Limit) {
if nil != limit {
loop(limit)
}
}
12 changes: 12 additions & 0 deletions examples/bow2/bow2.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ func main() {
InURI: "hello1",
OutRemote: "http://localhost:8081",
OutURI: "rivet/shunt",
Limit: &bow.Limit{
LimitMillisecond: int64(3 * 1000),
LimitCount: 3,
LimitIntervalMillisecond: 150,
LimitChan: make(chan int, 10),
},
},
&bow.RouteService{
Name: "test2",
InURI: "hello2",
OutRemote: "https://localhost:8092",
OutURI: "rivet/shunt",
Limit: &bow.Limit{
LimitMillisecond: int64(3 * 1000),
LimitCount: 3,
LimitIntervalMillisecond: 150,
LimitChan: make(chan int, 10),
},
},
)
rivet.ListenAndServe(&rivet.ListenServe{
Expand Down
7 changes: 6 additions & 1 deletion examples/routine/routine2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func main() {
//提前将队列放满
c <- 1
c <- 2
fmt.Println("c len = ", len(c))
fmt.Println("开始尝试执行")
go cross(c)
process(c)
Expand All @@ -51,6 +52,10 @@ func cross(c chan int) {
}

fmt.Println("释放一个通行证")
<-c //取出元素,则chan可以继续放入数据,将唤醒**行代码
fmt.Println("<-c 1 = ", <-c)
fmt.Println("<-c 2 = ", <-c)
fmt.Println("<-c 3 = ", <-c)
fmt.Println("<-c 4 = ", <-c)
//取出元素,则chan可以继续放入数据,将唤醒**行代码

}
6 changes: 6 additions & 0 deletions rivet/rivet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ennoo/rivet/trans/response"
"github.com/ennoo/rivet/utils/env"
"github.com/ennoo/rivet/utils/log"
"github.com/ennoo/rivet/utils/sql"
"github.com/rs/xid"
)

Expand Down Expand Up @@ -62,3 +63,8 @@ func Request() *request.Request {
func Bow() *bow.Bow {
return bow.GetBowInstance()
}

// SQL 提供实例化调用数据库连接对象
func SQL() *sql.SQL {
return sql.GetSQLInstance()
}

0 comments on commit 10d96ce

Please sign in to comment.