Skip to content

Commit

Permalink
Merge b3bfdcf into 66117a1
Browse files Browse the repository at this point in the history
  • Loading branch information
ninokop committed Apr 18, 2018
2 parents 66117a1 + b3bfdcf commit 4cd1653
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 67 deletions.
2 changes: 2 additions & 0 deletions core/router/cse/router_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ServiceComb/go-chassis/core/config/model"
"github.com/ServiceComb/go-chassis/core/lager"
"github.com/ServiceComb/go-chassis/core/router"
wp "github.com/ServiceComb/go-chassis/core/router/weightpool"
)

const routeFileSourceName = "RouteFileSource"
Expand Down Expand Up @@ -40,6 +41,7 @@ func (r *routeRuleEventListener) Event(e *core.Event) {

if router.ValidateRule(map[string][]*model.RouteRule{e.Key: routeRules}) {
SetRouteRuleByKey(e.Key, routeRules)
wp.GetPool().Reset(e.Key)
lager.Logger.Infof("Update [%s] route rule success", e.Key)
}
}
Expand Down
74 changes: 10 additions & 64 deletions core/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,20 @@
package router

import (
"errors"
"regexp"
"strconv"

"errors"
"github.com/ServiceComb/go-chassis/core/common"
"github.com/ServiceComb/go-chassis/core/config/model"
"github.com/ServiceComb/go-chassis/core/invocation"
"github.com/ServiceComb/go-chassis/core/registry"
"sync"
wp "github.com/ServiceComb/go-chassis/core/router/weightpool"
)

//Templates is for source match template settings
var Templates = make(map[string]*model.Match)

// SafeMap safe map structure
type SafeMap struct {
sync.RWMutex
Map map[string]int
}

func (sm *SafeMap) get(key string) (int, bool) {
sm.RLock()
value, ok := sm.Map[key]
sm.RUnlock()
return value, ok
}

func (sm *SafeMap) set(key string, value int) {
sm.Lock()
sm.Map[key] = value
sm.Unlock()
}

var invokeCount = initMap()

// initMap initialize map
func initMap() *SafeMap {
sm := new(SafeMap)
sm.Map = make(map[string]int)
return sm
}

//Router return route rule, you can also set custom route rule
type Router interface {
SetRouteRule(map[string][]*model.RouteRule)
Expand Down Expand Up @@ -83,9 +55,8 @@ func Route(header map[string]string, si *registry.SourceInfo, inv *invocation.In
rules := SortRules(inv.MicroServiceName)
for _, rule := range rules {
if Match(rule.Match, header, si) {
tag, _ := FitRate(rule.Routes, inv.MicroServiceName)
tag := FitRate(rule.Routes, inv.MicroServiceName)
if tag != nil {

inv.Version = tag.Tags[common.BuildinTagVersion]
if tag.Tags[common.BuildinTagApp] != "" {
inv.AppID = tag.Tags[common.BuildinTagApp]
Expand All @@ -112,43 +83,18 @@ func Route(header map[string]string, si *registry.SourceInfo, inv *invocation.In
}

// FitRate fit rate
func FitRate(tags []*model.RouteTag, dest string) (tag *model.RouteTag, err error) {
func FitRate(tags []*model.RouteTag, dest string) *model.RouteTag {
if tags[0].Weight == 100 {
tag = tags[0]
return tag, nil
return tags[0]
}

totalKey := dest + "-t-" + tags[0].Tags[common.BuildinTagVersion] + "-" + tags[0].Tags[common.BuildinTagApp]
firstKey := dest + "-" + tags[0].Tags[common.BuildinTagVersion] + "-" + tags[0].Tags[common.BuildinTagApp]
total, ok := invokeCount.get(totalKey)
// invoke request num for dest is 0
pool, ok := wp.GetPool().Get(dest)
if !ok {
total = 0
invokeCount.set(firstKey, 0)
}

invokeCount.set(totalKey, total+1)
// first request or only contain one rule tag, route to tags[0]
if total == 0 {
tag = tags[0]
invokeCount.set(firstKey, 1)
return tag, nil
}

for _, t := range tags {
key := dest + "-" + t.Tags[common.BuildinTagVersion] + "-" + t.Tags[common.BuildinTagApp]
percent, exist := invokeCount.get(key)
if !exist {
percent = 0
}
//currently, t does not get enough requests, then route this one to t
if (percent * 100 / total) <= t.Weight {
tag = t
invokeCount.set(key, percent+1)
break
}
// first request route to tags[0]
wp.GetPool().Set(dest, wp.NewPool(tags...))
return tags[0]
}
return tag, nil
return pool.PickOne()
}

// Match check the route rule
Expand Down
6 changes: 3 additions & 3 deletions core/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ func TestMatch(t *testing.T) {

func TestFitRate(t *testing.T) {
tags := InitTags("0.1", "0.2")
tag, _ := router.FitRate(tags, "service") //0,0
tag := router.FitRate(tags, "service") //0,0
assert.Equal(t, "0.1", tag.Tags["version"])
tag, _ = router.FitRate(tags, "service") //100%, 0
tag = router.FitRate(tags, "service") //100%, 0
assert.Equal(t, "0.2", tag.Tags["version"])
tag, _ = router.FitRate(tags, "service") //50%, 50%
tag = router.FitRate(tags, "service") //50%, 50%
assert.Equal(t, "0.1", tag.Tags["version"])

count := 100
Expand Down
137 changes: 137 additions & 0 deletions core/router/weightpool/weightpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package weightpool

import (
"sync"

"github.com/ServiceComb/go-chassis/core/common"
"github.com/ServiceComb/go-chassis/core/config/model"
)

var weightPool *SafePool
var once sync.Once

func init() { once.Do(func() { weightPool = &SafePool{pool: map[string]*Pool{}} }) }

// GetPool returns singleton of weightPool
func GetPool() *SafePool { return weightPool }

// SafePool is a cache for pool of all destination
type SafePool struct {
sync.RWMutex
pool map[string]*Pool
}

// Get returns specific pool for key
func (s *SafePool) Get(key string) (*Pool, bool) {
s.RLock()
value, ok := s.pool[key]
s.RUnlock()
return value, ok
}

// Set can set pool to safe cache
func (s *SafePool) Set(key string, value *Pool) {
s.Lock()
s.pool[key] = value
s.Unlock()
}

// Reset can delete pool for specific key
func (s *SafePool) Reset(key string) {
s.Lock()
delete(s.pool, key)
s.Unlock()
}

/* Weighted Round-Robin Scheduling
http://zh.linuxvirtualserver.org/node/37
while (true) {
i = (i + 1) mod n;
if (i == 0) {
cw = cw - gcd(S);
if (cw <= 0) {
cw = max(S);
if (cw == 0)
return NULL;
}
}
if (W(Si) >= cw)
return Si;
}*/

// Pool defines sets of weighted tags
type Pool struct {
tags []model.RouteTag

gcd int
max int
i int
cw int
num int
}

// NewPool returns pool for provided tags
func NewPool(routeTags ...*model.RouteTag) *Pool {
var total int
p := &Pool{tags: make([]model.RouteTag, len(routeTags))}
for i, t := range routeTags {
if t.Weight > 0 {
total += t.Weight
p.refreshGCD(t)
}
p.tags[i] = *t
}

if total < 100 {
latestT := model.RouteTag{
Weight: 100 - total,
Tags: map[string]string{
common.BuildinTagVersion: common.LatestVersion,
},
}
p.refreshGCD(&latestT)
p.tags = append(p.tags, latestT)
}

p.num = len(p.tags)
return p
}

// PickOne returns tag according to its weight
func (p *Pool) PickOne() *model.RouteTag {
if p.num == 0 || p.max == 0 {
return nil
}
if p.num == 1 {
return &p.tags[0]
}

for {
p.i = (p.i + 1) % p.num
if p.i == 0 {
p.cw = p.cw - p.gcd
if p.cw <= 0 {
p.cw = p.max
}
}

if p.tags[p.i].Weight >= p.cw {
return &p.tags[p.i]
}
}
}

func (p *Pool) refreshGCD(t *model.RouteTag) {
p.gcd = gcd(p.gcd, t.Weight)
if p.max < t.Weight {
p.max = t.Weight
}
}

func gcd(a, b int) int {
if b == 0 {
return a
}
return gcd(b, a%b)
}

0 comments on commit 4cd1653

Please sign in to comment.