Skip to content

Commit

Permalink
POC init
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Danshin <kirill@danshin.pro>
  • Loading branch information
kirillDanshin committed Jun 5, 2019
1 parent 5f8ecf1 commit 9853ec7
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -10,3 +10,7 @@

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# default binary paths
testutils/mockservers/mockservers
connmgrd/connmgrd
50 changes: 50 additions & 0 deletions connmgrd/main.go
@@ -0,0 +1,50 @@
package main

import (
"log"
"time"

"github.com/highloadltd/connmgr"
"github.com/valyala/fasthttp"
)

type state struct {
httpPool *connmgr.Pool
}

func (s *state) proxy(ctx *fasthttp.RequestCtx) {
h := s.httpPool.GetNextHost()
if h == nil {
ctx.Error("Service Temporary Unavailable", 502)
return
}
addr := h.Addr()

ctx.Request.SetRequestURI(addr)
err := fasthttp.Do(&ctx.Request, &ctx.Response)
if err != nil {
log.Printf("error while processing request: %s", err)
}
}

func main() {
s := &state{
httpPool: connmgr.NewPool(),
}
h1 := connmgr.NewHost(connmgr.HostTypeHTTP, "http://localhost:9111", "http://localhost:9111")
h1.SetTimeout(100 * time.Millisecond)
h2 := connmgr.NewHost(connmgr.HostTypeHTTP, "http://localhost:9111", "http://localhost:9112")
h2.SetTimeout(100 * time.Millisecond)
s.httpPool.Add(h1)
s.httpPool.Add(h2)
server := fasthttp.Server{
Handler: s.proxy,
}
must(server.ListenAndServe(":9110"))
}

func must(err error) {
if err != nil {
panic(err)
}
}
68 changes: 68 additions & 0 deletions httpResolver.go
@@ -0,0 +1,68 @@
package connmgr

import (
"fmt"
"strings"
"time"

"github.com/valyala/fasthttp"
)

type HTTPResolver struct {
method string
body string
timeout time.Duration
}

func NewHTTPResolver() Resolver {
return &HTTPResolver{}
}

func (r *HTTPResolver) Load(cfg KV) error {
m := cfg.GetWithDefault("method", "GET")
method, ok := m.(string)
if !ok {
return fmt.Errorf("invalid method type: got %T", m)
}

r.method = strings.ToUpper(method)

b := cfg.GetWithDefault("body", "")
body, ok := b.(string)
if ok && body != "" && (r.method == "GET" || r.method == "HEAD") {
return fmt.Errorf("unexpected body for method %q", r.method)
}

r.body = body

t := cfg.GetWithDefault("timeout", time.Duration(0))
timeout, ok := t.(time.Duration)
if !ok {
return fmt.Errorf("unexpected timeout type: %T", t)
}
if timeout >= 0 {
r.timeout = timeout
}

return nil
}

func (r *HTTPResolver) Do(addr string) error {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()

req.Header.SetMethod(r.method)
req.SetBodyString(r.body)
req.SetRequestURI(addr)
var err error
if r.timeout != 0 {
err = fasthttp.DoDeadline(req, resp, time.Now().Add(r.timeout))
} else {
err = fasthttp.Do(req, resp)
}

fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(resp)

return err
}
40 changes: 40 additions & 0 deletions kv.go
@@ -0,0 +1,40 @@
package connmgr

type kvPair struct {
key string
value interface{}
}

type KV struct {
kv []kvPair
}

func (kv *KV) Get(key string) (value interface{}) {
for _, pair := range kv.kv {
if pair.key == key {
return pair.value
}
}
return nil
}

func (kv *KV) Set(key string, value interface{}) {
for i, pair := range kv.kv {
if pair.key == key {
kv.kv[i].value = value
return
}
}

kv.kv = append(kv.kv, kvPair{
key: key,
value: value,
})
}

func (kv *KV) GetWithDefault(key string, defaultValue interface{}) (value interface{}) {
if x := kv.Get(key); x != nil {
return x
}
return defaultValue
}
9 changes: 9 additions & 0 deletions nanotime_no_unsafe.go
@@ -0,0 +1,9 @@
// +build appengine windows

package connmgr

import "time"

func nanotime() int64 {
return time.Now().UnixNano()
}
12 changes: 12 additions & 0 deletions nanotime_unsafe.go
@@ -0,0 +1,12 @@
// +build !appengine,!windows

package connmgr

import (
_ "unsafe" // required to use //go:linkname
)

// Nanotime is monotonic time provider.
//go:noescape
//go:linkname nanotime runtime.nanotime
func nanotime() int64
133 changes: 133 additions & 0 deletions pool.go
@@ -0,0 +1,133 @@
package connmgr

import (
"sort"
"sync"
"sync/atomic"
"time"
)

type Pool struct {
mu *sync.RWMutex
hosts []*Host
lastRTTStats atomic.Value
lastRTTStatsUpdatedAt int64
n int64
}

func NewPool(hosts ...*Host) *Pool {
if hosts == nil {
hosts = make([]*Host, 0)
}
p := &Pool{
mu: &sync.RWMutex{},
hosts: hosts,
}

p.sortedHosts(true)
go p.maintain()
return p
}

func (p *Pool) maintain() {
for {
p.sortedHosts(true)
time.Sleep(1 * time.Second)
}
}

func (p *Pool) Add(h *Host) {
p.mu.Lock()
p.hosts = append(p.hosts, h)
p.mu.Unlock()
}

func (p *Pool) GetHost() *Host {
return p.getLastFastestHost()
}

func (p *Pool) GetHostUncached() *Host {
return p.getFastestHost(false)
}

func (p *Pool) GetNextHost() *Host {
x := atomic.AddInt64(&p.n, 1)
hosts := p.lastSortedHosts()
if len(hosts) == 0 {
return nil
}
return hosts[x%int64(len(hosts))].host
}

type rttStats struct {
host *Host
rtt time.Duration
}

func (p *Pool) getFastestHost(forceUpdate bool) *Host {
hosts := p.sortedHosts(forceUpdate)
if len(hosts) == 0 {
return nil
}

return hosts[0].host
}

func (p *Pool) getLastFastestHost() *Host {
if nanotime()-atomic.LoadInt64(&p.lastRTTStatsUpdatedAt) > int64(10000*time.Millisecond) {
return p.getFastestHost(true)
}
hosts, ok := p.lastRTTStats.Load().([]rttStats)
if !ok || len(hosts) == 0 {
return p.getFastestHost(true)
}

return hosts[0].host
}

func (p *Pool) lastSortedHosts() []rttStats {
if nanotime()-atomic.LoadInt64(&p.lastRTTStatsUpdatedAt) > int64(10000*time.Millisecond) {
return p.sortedHosts(true)
}
hosts, ok := p.lastRTTStats.Load().([]rttStats)
if !ok || len(hosts) == 0 {
return p.sortedHosts(true)
}

return hosts
}
func (p *Pool) sortedHosts(forceUpdate bool) []rttStats {
p.mu.RLock()
res := []rttStats{}
mu := sync.Mutex{}
wg := sync.WaitGroup{}
for i, host := range p.hosts {
wg.Add(1)
go func(i int, host *Host) {
rtt, err := host.RTT()
mu.Lock()
if err == nil {
res = append(res, rttStats{
host: host,
rtt: rtt,
})
}
mu.Unlock()
wg.Done()
}(i, host)
}
wg.Wait()
p.mu.RUnlock()
sort.SliceStable(res, func(i, j int) bool {
return res[i].rtt < res[j].rtt
})

if forceUpdate || nanotime()-atomic.LoadInt64(&p.lastRTTStatsUpdatedAt) > int64(10000*time.Millisecond) {
copiedStats := make([]rttStats, len(res))
copy(copiedStats, res)
p.lastRTTStats.Store(copiedStats)
atomic.StoreInt64(&p.lastRTTStatsUpdatedAt, nanotime())
}

return res
}
31 changes: 31 additions & 0 deletions resolverRegistry.go
@@ -0,0 +1,31 @@
package connmgr

import (
"sync"
)

type resolvers struct {
reg map[HostType]func() Resolver
mu sync.RWMutex
}

var resolverRegistry = &resolvers{
mu: sync.RWMutex{},
reg: map[HostType]func() Resolver{
HostTypeHTTP: NewHTTPResolver,
},
}

// AddResolverFor given HostType. Resolver's methods must not panic (we do not handle panics).
func AddResolverFor(t HostType, createResolver func() Resolver) {
resolverRegistry.mu.Lock()
resolverRegistry.reg[t] = createResolver
resolverRegistry.mu.Unlock()
}

func getResolverFor(t HostType) (func() Resolver, bool) {
resolverRegistry.mu.Lock()
r, ok := resolverRegistry.reg[t]
resolverRegistry.mu.Unlock()
return r, ok
}

0 comments on commit 9853ec7

Please sign in to comment.