Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

connection pool for clients to services, as well as smart pool functi…

…onality so that it self updates as it sees instances registered/unregistered from the cluster that match it's internal query
  • Loading branch information...
commit e121188de91d1b95d569041e4ec4a7e4d1d79817 1 parent 8636fc3
@erikstmartin erikstmartin authored
View
12 README.md
@@ -29,6 +29,8 @@ SkyNet uses Doozer to store configuration data about the available services. Co
## Doozer
Skynet makes heavy usage of Doozer. Both clients and services will take a DoozerConfig so that it knows how to communicate with doozer. In the examples directory there is a shell script to startup a cluster of doozer instances locally for testing.
+We recommend using at least 5 instances of doozer in your cluster, if you have 3, and loose 1, if an additonal doozer instance goes down the doozer cluster doesn't reject it.
+
<pre>
type DoozerConfig struct {
Uri string
@@ -86,6 +88,9 @@ Checkout the examples/service directory for a full example, also a call to skyli
##Clients
Clients are just as simple. They start with a ClientConfig:
+#####Smart Connection Pools
+ServiceClients's contain a pool of connections to a given service, up to a specified size to load balance requests across. Instances are removed from skynet when they crash, the pools are smart enough to remove any connections to any instances that are no longer available and replace them with connections to valid instances to maintain the pool.
+
<pre>
type ClientConfig struct {
Log *log.Logger `json:"-"`
@@ -134,6 +139,7 @@ Commands:
-version - limit results to instances of the specified version of service
-region - limit results to instances in the specified region
-host - limit results to instances on the specified host
+ -registered - (true, false) limit results to instances that are registered (accepting requests)
regions: List all regions available that meet the specified criteria
services: List all services available that meet the specified criteria
-host - limit results to the specified host
@@ -163,11 +169,12 @@ type Query struct {
Version string
Host string
Region string
+ Registered bool
DoozerConn *DoozerConnection
}
</pre>
-The only required field here is a pointer to a doozer connection. All other fields are optional, any field not supplied will be considered as any/all.
+The only required field here is a pointer to a doozer connection. All other fields are optional, any field not supplied will be considered as any/all. Keep in mind if you're going to make requests to an instance you'll want to ensure the Registered attribute is true, you don't want your code responsibile for sending requests to a server that's trying to shut down.
From here you can use any of the following
@@ -196,9 +203,6 @@ query.ServiceVersions()
</pre>
## Work In Progress
-#####Smart Connection Pools
-ServiceClients's will have a pool of connections to a given service, to load balance across. Instances are already removed from skynet when they crash, but local pools will be smart enough to remove any connections to any instances that are no longer available and replace them with connections to valid instances to maintain pool size.
-
#####Process Monitoring / Restarting
Services will restart themselves a specified number of times after crashing and add themselves back to the pool.
View
34 examples/client/client.go
@@ -2,20 +2,52 @@ package main
import (
"github.com/bketelsen/skynet/skylib"
+ "os"
+ "os/signal"
+ "syscall"
"fmt"
)
func main() {
+ c := make(chan os.Signal, 1)
+
config := &skylib.ClientConfig {
DoozerConfig: &skylib.DoozerConfig {
Uri: "127.0.0.1:8046",
+ AutoDiscover: true,
},
}
client := skylib.NewClient(config)
+
+ // This will not fail if no services currently exist, as connections are created on demand
+ // this saves from chicken and egg issues with dependencies between services
service := client.GetService("TestService", "", "", "") // any version, any region, any host
- ret, _ := service.Send("Upcase", "Upcase me!!")
+ // This on the other hand will fail if it can't find a service to connect to
+ ret, err := service.Send("Upcase", "Upcase me!!")
+
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
fmt.Println(ret)
+
+ watchSignals(c)
+}
+
+func watchSignals(c chan os.Signal) {
+ signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGSEGV, syscall.SIGSTOP, syscall.SIGTERM)
+
+ for {
+ select {
+ case sig := <-c:
+ switch sig.(syscall.Signal) {
+ // Trap signals for clean shutdown
+ case syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGSEGV, syscall.SIGSTOP, syscall.SIGTERM:
+ syscall.Exit(0)
+ }
+ }
+ }
}
View
18 examples/doozer_cluster.sh
@@ -14,8 +14,8 @@ START_WEB_PORT=8080
START_DZNS_PORT=10000
START_DZNS_WEB_PORT=11000
-DZNS_INSTANCES=3
-CLUSTER_INSTANCES=3
+DZNS_INSTANCES=5
+CLUSTER_INSTANCES=5
if [ $# -lt 1 ]
then
@@ -25,7 +25,7 @@ fi
echo "Using Doozerd: $DOOZERD_PATH"
-if [ $1 = "start" ]; then
+function start {
if [ $# -gt 1 ]; then
CLUSTER_INSTANCES=$3
fi
@@ -80,7 +80,17 @@ if [ $1 = "start" ]; then
dz_web_port=$(( $dz_web_port + 1 ))
dz_count=$(( $dz_count + 1 ))
done
+}
-elif [ $1 = "stop" ]; then
+function stop {
killall doozerd
+}
+
+if [ $1 = "start" ]; then
+ start
+elif [ $1 = "stop" ]; then
+ stop
+elif [ $1 = "restart" ]; then
+ stop
+ start
fi
View
44 sky/sky.go
@@ -12,15 +12,15 @@ var VersionFlag *string = flag.String("version", "", "service version")
var ServiceNameFlag *string = flag.String("service", "", "service name")
var HostFlag *string = flag.String("host", "", "host")
var RegionFlag *string = flag.String("region", "", "region")
+var RegisteredFlag *string = flag.String("registered", "", "registered")
var DC *skylib.DoozerConnection
func main() {
flag.Parse()
- Connect()
query := &skylib.Query{
- DoozerConn: DC,
+ DoozerConn: Connect(),
Service: *ServiceNameFlag,
Version: *VersionFlag,
Host: *HostFlag,
@@ -48,7 +48,7 @@ func main() {
}
}
-func Connect() {
+func Connect() (*skylib.DoozerConnection) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Failed to connect to Doozer")
@@ -57,18 +57,35 @@ func Connect() {
}()
// TODO: This needs to come from command line, or environment variable
- DC = &skylib.DoozerConnection{
- Config: &skylib.DoozerConfig {
- Uri: "127.0.0.1:8046",
- },
- }
+ conn := skylib.NewDoozerConnection("127.0.0.1:8046", "", false, nil) // nil as the last param will default to a Stdout logger
+ conn.Connect()
+
+ return conn
}
func ListInstances(q *skylib.Query) {
+ var regFlag *bool
+
+ if *RegisteredFlag == "true" {
+ b := true
+ regFlag = &b
+ } else if *RegisteredFlag == "false" {
+ b := false
+ regFlag = &b
+ }
+
+ q.Registered = regFlag
+
results := q.FindInstances()
for _, instance := range *results {
- fmt.Println(instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port) + " - " + instance.Config.Name + " (" + instance.Config.Version + ")")
+ registered := ""
+
+ if(instance.Registered){
+ registered = " [Registered]"
+ }
+
+ fmt.Println(instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port) + " - " + instance.Config.Name + " " + instance.Config.Version + registered)
}
}
@@ -149,7 +166,13 @@ func PrintTopology(q *skylib.Query) {
fmt.Println("\t\t\tVersion: " + versionName)
for _, instance := range version {
- fmt.Println("\t\t\t\t" + instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port) + " - " + instance.Config.Name + " (" + instance.Config.Version + ")")
+ registered := ""
+
+ if(instance.Registered){
+ registered = " [Registered]"
+ }
+
+ fmt.Println("\t\t\t\t" + instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port) + registered)
}
}
}
@@ -172,6 +195,7 @@ func Help() {
"\n\t\t-version - limit results to instances of the specified version of service" +
"\n\t\t-region - limit results to instances in the specified region" +
"\n\t\t-host - limit results to instances on the specified host" +
+ "\n\t\t-registered - (true, false) limit results to instances that are registered (accepting requests)" +
"\n\tregions: List all regions available that meet the specified criteria" +
View
197 skylib/client.go
@@ -2,19 +2,41 @@ package skylib
import (
"github.com/erikstmartin/msgpack-rpc/go/rpc"
+ "github.com/bketelsen/skynet/skylib/util"
"net"
"log"
"os"
"strconv"
"reflect"
+ "errors"
+ "math/rand"
+ "encoding/json"
+ "bytes"
+ "strings"
)
type Client struct {
DoozerConn *DoozerConnection
+
Config *ClientConfig
Log *log.Logger `json:"-"`
}
+type ServiceResource struct {
+ conn *rpc.Session
+ service Service
+ closed bool
+}
+
+func (s ServiceResource) Close() {
+ s.closed = true
+ s.conn.Close()
+}
+
+func (s ServiceResource) IsClosed() bool {
+ return s.closed
+}
+
func (c *Client) doozer() *DoozerConnection {
if c.DoozerConn == nil {
c.DoozerConn = &DoozerConnection {
@@ -32,6 +54,10 @@ func NewClient(config *ClientConfig) *Client {
config.Log = log.New(os.Stderr, "", log.LstdFlags)
}
+ if config.ConnectionPoolSize == 0 {
+ config.ConnectionPoolSize = 1
+ }
+
client := &Client {
Config: config,
DoozerConn: &DoozerConnection {
@@ -46,55 +72,166 @@ func NewClient(config *ClientConfig) *Client {
return client
}
-
-// TODO: For now this will return a single connection to this service, each time it's called
-// refactor this so that it returns back a connection pool
-// (should this return back the same pool un subsequent calls? or a new pool?)
+// This will not fail if no services currently exist, this saves from chicken and egg issues with dependencies between services
+// TODO: We should probably determine a way of supplying secondary conditions, for example it's ok to go to a different data center only if there are no instances in our current datacenter
func (c *Client) GetService(name string, version string, region string, host string) (*ServiceClient){
var conn net.Conn
var err error
- query := &Query{
- DoozerConn: c.DoozerConn,
- Service: name,
- Version: version,
- Host: host,
- Region: region,
- }
+ registered := true
+
+ service := &ServiceClient {
+ Log: c.Config.Log,
+ connectionPool: pools.NewRoundRobin(c.Config.ConnectionPoolSize, c.Config.IdleTimeout),
+ query: &Query {
+ DoozerConn: c.DoozerConn,
+ Service: name,
+ Version: version,
+ Host: host,
+ Region: region,
+ Registered: &registered,
+ },
+ instances: make(map[string]Service, 0),
+ }
+
+ // Load initial list of instances
+ results := service.query.FindInstances()
- results := query.FindInstances()
- ok := false
+ if results != nil {
+ for _, instance := range *results {
+ key := instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port)
+ service.instances[key] = *instance
+ }
+ }
+
+ go service.monitorInstances()
+
+ factory := func() (pools.Resource, error) {
+ if len(service.instances) < 1 {
+
+ return nil, errors.New("No services available that match your criteria")
+ }
+
+ // Connect to random instance
+ key := (rand.Int()%len(service.instances))
+
+ var instance Service
+
+ i := 0
+
+ for _, v := range service.instances {
+ if i == key {
+ instance = v
+ break
+ }
+ }
- // Connect to the first instance we find available
- for _, instance := range *results {
- //fmt.Println(instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port) + " - " + instance.Config.Name + " (" + instance.Config.Version + ")")
conn, err = net.Dial("tcp", instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port))
- if err == nil {
- ok = true
- break
+ if err != nil {
+ // TODO: handle failure here and attempt to connect to a different instance
+ return nil, errors.New("Failed to connect to service: " + instance.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(instance.Config.ServiceAddr.Port))
+ }
+
+ resource := ServiceResource {
+ conn: rpc.NewSession(conn, true),
+ service: instance,
}
- }
- if !ok {
- c.Log.Panic("Unable to find available instance for Service: " + name)
+ return resource, nil
}
- client := rpc.NewSession(conn, true)
+ service.connectionPool.Open(factory)
- return &ServiceClient {
- conn: client,
- }
+ return service
}
type ServiceClient struct {
- conn *rpc.Session
+ Log *log.Logger `json:"-"`
+ connectionPool *pools.RoundRobin
+ query *Query
+ instances map[string]Service
+}
+
+func (c *ServiceClient) monitorInstances() {
+ // TODO: Let's watch doozer and keep this list up to date so we don't need to search it every time we spawn a new connection
+ doozer := c.query.DoozerConn
+
+ rev := doozer.GetCurrentRevision()
+
+ for {
+ ev, err := doozer.Wait("/services/**", rev + 1)
+ rev = ev.Rev
+
+ if err == nil {
+ var service Service
+
+ buf := bytes.NewBuffer(ev.Body)
+
+ err = json.Unmarshal(buf.Bytes(), &service)
+
+ if err == nil {
+ parts := strings.Split(ev.Path, "/")
+
+ if c.query.pathMatches(parts, ev.Path) {
+ key := service.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(service.Config.ServiceAddr.Port)
+
+ if service.Registered == true {
+ c.Log.Println("New Service Instance Discovered: " + key)
+ c.instances[key] = service
+ } else {
+ c.Log.Println("Service Instance Removed: " + key)
+ delete(c.instances, key)
+ }
+ }
+ }
+ }
+ }
}
func (c *ServiceClient) Send(funcName string, arguments ...interface{}) (reflect.Value, error) {
- // TODO: do logging here, average response time, number of calls etc
// TODO: timeout logic
-
- return c.conn.SendV(funcName, arguments)
+ service, _ := c.getConnection(0)
+
+
+ // TODO: Check for connectivity issue so that we can try to get another resource out of the pool
+ val, er := service.conn.SendV(funcName, arguments)
+
+ c.connectionPool.Put(service)
+
+ return val, er
}
+func (c *ServiceClient) getConnection(lvl int) (ServiceResource, error) {
+ if(lvl > 5) {
+ panic("Unable to retrieve a valid connection to the service")
+ }
+
+ conn, err := c.connectionPool.Get()
+
+ if err != nil || c.isClosed(conn.(ServiceResource)) {
+ if conn != nil {
+ c.connectionPool.Put(conn)
+ }
+
+ return c.getConnection(lvl + 1)
+ }
+
+ service := conn.(ServiceResource)
+
+ return service, err
+}
+
+func (c *ServiceClient) isClosed(service ServiceResource) bool {
+ key := getInstanceKey(service.service)
+
+ if _, ok := c.instances[key]; ok {
+ return false
+ }
+
+ return true
+}
+
+func getInstanceKey(service Service) string {
+ return service.Config.ServiceAddr.IPAddress + ":" + strconv.Itoa(service.Config.ServiceAddr.Port)
+}
View
3  skylib/config.go
@@ -11,6 +11,7 @@ package skylib
import (
"flag"
"log"
+ "time"
)
type BindAddr struct {
@@ -31,6 +32,8 @@ type ServiceConfig struct {
type ClientConfig struct {
Log *log.Logger `json:"-"`
DoozerConfig *DoozerConfig `json:"-"`
+ ConnectionPoolSize int
+ IdleTimeout time.Duration
}
func GetServiceConfigFromFlags() *ServiceConfig {
View
38 skylib/doozer.go
@@ -5,6 +5,7 @@ import (
"github.com/4ad/doozer"
"log"
"sync"
+ "os"
)
type DoozerServer struct {
@@ -31,13 +32,19 @@ type DoozerConfig struct {
AutoDiscover bool
}
-func NewDoozerConnection(uri string, boot string, discover bool) (*DoozerConnection){
+func NewDoozerConnection(uri string, boot string, discover bool, logger *log.Logger) (*DoozerConnection){
+ if logger == nil {
+ logger = log.New(os.Stderr, "", log.LstdFlags)
+ }
+
return &DoozerConnection {
Config: &DoozerConfig {
Uri: uri,
BootUri: boot,
AutoDiscover: discover,
},
+
+ Log: logger,
}
}
@@ -174,12 +181,16 @@ func (d *DoozerConnection) recoverFromError(err interface{}) {
// if they enabled Auto Discovery let's try to get a connection from one of the instances we know about
if len(d.doozerInstances) > 0 && d.Config.AutoDiscover == true {
- for _, server := range d.doozerInstances {
+ for key, server := range d.doozerInstances {
success, _ := d.dial(server.Addr, "")
if success == true {
return
- }
+ } else {
+ // Remove failed doozer instance from map
+ delete(d.doozerInstances, key)
+
+ }
}
}
@@ -192,6 +203,8 @@ func (d *DoozerConnection) recoverFromError(err interface{}) {
}
}
+// TODO: Need to track last known revision, so when we are monitor for changes to the doozer cluster
+// we can replay changes that took place while we were looking for a new connection instead of using the latest GetCurrentRevision()
func (d *DoozerConnection) monitorCluster() {
defer func() {
if err := recover(); err != nil {
@@ -201,15 +214,18 @@ func (d *DoozerConnection) monitorCluster() {
}
}()
+ rev := d.GetCurrentRevision()
+
for {
// blocking wait call returns on a change
- ev, err := d.Connection.Wait("/ctl/cal/*", d.GetCurrentRevision())
+ ev, err := d.Wait("/ctl/cal/*", rev + 1)
if err != nil {
d.Log.Panic(err.Error())
}
buf := bytes.NewBuffer(ev.Body)
id := basename(ev.Path)
+ rev = ev.Rev
if buf.String() == "" && d.doozerInstances[id] != nil {
// Server is down, remove from list
@@ -227,6 +243,20 @@ func (d *DoozerConnection) monitorCluster() {
}
}
+func (d *DoozerConnection) Wait(glob string, rev int64) (ev doozer.Event, err error) {
+ defer func() {
+ if err := recover(); err != nil {
+ d.recoverFromError(err)
+
+ ev, err = d.Wait(glob, rev)
+ }
+ }()
+
+ ev, err = d.Connection.Wait(glob, rev)
+
+ return ev, err
+}
+
func (d *DoozerConnection) getDoozerServer(key string) *DoozerServer {
rev := d.GetCurrentRevision()
data, _, err := d.Get("/ctl/node/"+key+"/addr", &rev)
View
6 skylib/query.go
@@ -12,6 +12,7 @@ type Query struct {
Version string
Host string
Region string
+ Registered *bool
DoozerConn *DoozerConnection
DoozerRev int64
@@ -131,6 +132,11 @@ func (q *Query) FindInstances() *[]*Service {
continue
}
+
+ if q.Registered != nil && *q.Registered != service.Registered {
+ continue
+ }
+
results = append(results, &service)
}
View
39 skylib/service.go
@@ -12,6 +12,8 @@ import (
"syscall"
)
+// TODO: Better error handling, should gracefully fail to startup if it can't connect to doozer
+
// A Generic struct to represent any service in the SkyNet system.
type ServiceInterface interface {
Started(s *Service)
@@ -23,7 +25,7 @@ type ServiceInterface interface {
type Service struct {
Config *ServiceConfig
DoozerConn *DoozerConnection `json:"-"`
- Registered bool `json:"-"`
+ Registered bool
doneChan chan bool `json:"-"`
Log *log.Logger `json:"-"`
@@ -54,6 +56,8 @@ func (s *Service) Start(register bool) {
go rpcServ.Run()
go s.Delegate.Started(s) // Call user defined callback
+
+ s.UpdateCluster()
if register == true {
s.Register()
@@ -62,11 +66,11 @@ func (s *Service) Start(register bool) {
// Endless loop to keep app from returning
select {
case _ = <-s.doneChan:
+ syscall.Exit(0)
}
}
-func (s *Service) Register() {
-
+func (s *Service) UpdateCluster() {
b, err := json.Marshal(s)
if err != nil {
s.Log.Panic(err.Error())
@@ -78,30 +82,37 @@ func (s *Service) Register() {
if err != nil {
s.Log.Panic(err.Error())
}
+}
- s.Registered = true
+func (s *Service) RemoveFromCluster() {
+ rev := s.doozer().GetCurrentRevision()
+ path := s.GetConfigPath()
+ err := s.doozer().Del(path, rev)
+ if err != nil {
+ s.Log.Panic(err.Error())
+ }
+}
+
+func (s *Service) Register() {
+ s.Registered = true
+ s.UpdateCluster()
s.Delegate.Registered(s) // Call user defined callback
}
func (s *Service) Unregister() {
- if s.Registered == true {
- rev := s.doozer().GetCurrentRevision()
- path := s.GetConfigPath()
- err := s.doozer().Del(path, rev)
- if err != nil {
- s.Log.Panic(err.Error())
- }
- }
+ s.Registered = false
+ s.UpdateCluster()
s.Delegate.Unregistered(s) // Call user defined callback
}
func (s *Service) Shutdown() {
- // TODO: make this wait for requests to finish
s.Unregister()
+
+ // TODO: make this wait for requests to finish
+ s.RemoveFromCluster()
s.doneChan <- true
- syscall.Exit(0)
s.Delegate.Stopped(s) // Call user defined callback
}
View
254 skylib/util/roundrobin.go
@@ -0,0 +1,254 @@
+/*
+Copyright 2012, Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+/*
+ Package pools provides functionality to manage and reuse resources
+ like connections.
+*/
+package pools
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+// Factory is a function that can be used to create a resource.
+type Factory func() (Resource, error)
+
+// Every resource needs to suport the Resource interface.
+// Thread synchronization between Close() and IsClosed()
+// is the responsibility the caller.
+type Resource interface {
+ Close()
+ IsClosed() bool
+}
+
+// RoundRobin allows you to use a pool of resources in a round robin fashion.
+type RoundRobin struct {
+ mu sync.Mutex
+ available *sync.Cond
+ resources chan fifoWrapper
+ size int64
+ factory Factory
+ idleTimeout time.Duration
+
+ // stats
+ waitCount int64
+ waitTime time.Duration
+}
+
+type fifoWrapper struct {
+ resource Resource
+ timeUsed time.Time
+}
+
+// NewRoundRobin creates a new RoundRobin pool.
+// capacity is the maximum number of resources RoundRobin will create.
+// factory will be the function used to create resources.
+// If a resource is unused beyond idleTimeout, it's discarded.
+func NewRoundRobin(capacity int, idleTimeout time.Duration) *RoundRobin {
+ r := &RoundRobin{
+ resources: make(chan fifoWrapper, capacity),
+ size: 0,
+ idleTimeout: idleTimeout,
+ }
+ r.available = sync.NewCond(&r.mu)
+ return r
+}
+
+// Open starts allowing the creation of resources
+func (self *RoundRobin) Open(factory Factory) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+ self.factory = factory
+}
+
+// Close empties the pool calling Close on all its resources.
+// It waits for all resources to be returned (Put).
+func (self *RoundRobin) Close() {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+ for self.size > 0 {
+ select {
+ case fw := <-self.resources:
+ go fw.resource.Close()
+ self.size--
+ default:
+ self.available.Wait()
+ }
+ }
+ self.factory = nil
+}
+
+func (self *RoundRobin) IsClosed() bool {
+ return self.factory == nil
+}
+
+// Get will return the next available resource. If none is available, and capacity
+// has not been reached, it will create a new one using the factory. Otherwise,
+// it will indefinitely wait till the next resource becomes available.
+func (self *RoundRobin) Get() (resource Resource, err error) {
+ return self.get(true)
+}
+
+// TryGet will return the next available resource. If none is available, and capacity
+// has not been reached, it will create a new one using the factory. Otherwise,
+// it will return nil with no error.
+func (self *RoundRobin) TryGet() (resource Resource, err error) {
+ return self.get(false)
+}
+
+func (self *RoundRobin) get(wait bool) (resource Resource, err error) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+ // Any waits in this loop will release the lock, and it will be
+ // reacquired before the waits return.
+ for {
+ select {
+ case fw := <-self.resources:
+ // Found a free resource in the channel
+ if self.idleTimeout > 0 && fw.timeUsed.Add(self.idleTimeout).Sub(time.Now()) < 0 {
+ // resource has been idle for too long. Discard & go for next.
+ go fw.resource.Close()
+ self.size--
+ // Nobody else should be waiting, but signal anyway.
+ self.available.Signal()
+ continue
+ }
+ return fw.resource, nil
+ default:
+ // resource channel is empty
+ if self.size >= int64(cap(self.resources)) {
+ // The pool is full
+ if wait {
+ start := time.Now()
+ self.available.Wait()
+ self.recordWait(start)
+ continue
+ }
+ return nil, nil
+ }
+ // Pool is not full. Create a resource.
+ if resource, err = self.waitForCreate(); err != nil {
+ // size was decremented, and somebody could be waiting.
+ self.available.Signal()
+ return nil, err
+ }
+ // Creation successful. Account for this by incrementing size.
+ self.size++
+ return resource, err
+ }
+ }
+ panic("unreachable")
+}
+
+func (self *RoundRobin) recordWait(start time.Time) {
+ self.waitCount++
+ self.waitTime += time.Now().Sub(start)
+}
+
+func (self *RoundRobin) waitForCreate() (resource Resource, err error) {
+ // Prevent thundering herd: increment size before creating resource, and decrement after.
+ self.size++
+ self.mu.Unlock()
+ defer func() {
+ self.mu.Lock()
+ self.size--
+ }()
+ return self.factory()
+}
+
+// Put will return a resource to the pool. You MUST return every resource to the pool,
+// even if it's closed. If a resource is closed, Put will discard it. Thread synchronization
+// between Close() and IsClosed() is the caller's responsibility.
+func (self *RoundRobin) Put(resource Resource) {
+ self.mu.Lock()
+ defer self.available.Signal()
+ defer self.mu.Unlock()
+
+ if self.size > int64(cap(self.resources)) {
+ go resource.Close()
+ self.size--
+ } else if resource.IsClosed() {
+ self.size--
+ } else {
+ if len(self.resources) == cap(self.resources) {
+ panic("unexpected")
+ }
+ self.resources <- fifoWrapper{resource, time.Now()}
+ }
+}
+
+// Set capacity changes the capacity of the pool.
+// You can use it to expand or shrink.
+func (self *RoundRobin) SetCapacity(capacity int) {
+ self.mu.Lock()
+ defer self.available.Broadcast()
+ defer self.mu.Unlock()
+
+ nr := make(chan fifoWrapper, capacity)
+ // This loop transfers resources from the old channel
+ // to the new one, until it fills up or runs out.
+ // It discards extras, if any.
+ for {
+ select {
+ case fw := <-self.resources:
+ if len(nr) < cap(nr) {
+ nr <- fw
+ } else {
+ go fw.resource.Close()
+ self.size--
+ }
+ continue
+ default:
+ }
+ break
+ }
+ self.resources = nr
+}
+
+func (self *RoundRobin) SetIdleTimeout(idleTimeout time.Duration) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+ self.idleTimeout = idleTimeout
+}
+
+func (self *RoundRobin) StatsJSON() string {
+ s, c, a, wc, wt, it := self.Stats()
+ return fmt.Sprintf("{\"Size\": %v, \"Capacity\": %v, \"Available\": %v, \"WaitCount\": %v, \"WaitTime\": %v, \"IdleTimeout\": %v}", s, c, a, wc, float64(wt)/1e9, float64(it)/1e9)
+}
+
+func (self *RoundRobin) Stats() (size, capacity, available, waitCount int64, waitTime, idleTimeout time.Duration) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+ return self.size, int64(cap(self.resources)), int64(len(self.resources)), self.waitCount, self.waitTime, self.idleTimeout
+}
View
182 skylib/util/roundrobin_test.go
@@ -0,0 +1,182 @@
+/*
+Copyright 2012, Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+package pools
+
+import (
+ "errors"
+ "testing"
+ "time"
+)
+
+var lastId int64
+
+type TestResource struct {
+ num int64
+ closed bool
+}
+
+func (self *TestResource) Close() {
+ self.closed = true
+}
+
+func (self *TestResource) IsClosed() bool {
+ return self.closed
+}
+
+func PoolFactory() (Resource, error) {
+ lastId++
+ return &TestResource{lastId, false}, nil
+}
+
+func FailFactory() (Resource, error) {
+ return nil, errors.New("Failed")
+}
+
+func SlowFailFactory() (Resource, error) {
+ time.Sleep(1e8)
+ return nil, errors.New("Failed")
+}
+
+func TestPool(t *testing.T) {
+ lastId = 0
+ p := NewRoundRobin(5, time.Duration(10e9))
+ p.Open(PoolFactory)
+ defer p.Close()
+
+ for i := 0; i < 2; i++ {
+ r, err := p.TryGet()
+ if err != nil {
+ t.Errorf("TryGet failed: %v", err)
+ }
+ if r.(*TestResource).num != 1 {
+ t.Errorf("Expecting 1, received %d", r.(*TestResource).num)
+ }
+ p.Put(r)
+ }
+ // p = [1]
+
+ all := make([]Resource, 5)
+ for i := 0; i < 5; i++ {
+ if all[i], _ = p.TryGet(); all[i] == nil {
+ t.Errorf("TryGet failed with nil")
+ }
+ }
+ // all = [1-5], p is empty
+ if none, _ := p.TryGet(); none != nil {
+ t.Errorf("TryGet failed with non-nil")
+ }
+
+ ch := make(chan bool)
+ go ResourceWait(p, t, ch)
+ time.Sleep(1e8)
+ for i := 0; i < 5; i++ {
+ p.Put(all[i])
+ }
+ // p = [1-5]
+ <-ch
+ // p = [1-5]
+ if p.waitCount != 1 {
+ t.Errorf("Expecting 1, received %d", p.waitCount)
+ }
+
+ for i := 0; i < 5; i++ {
+ all[i], _ = p.Get()
+ }
+ // all = [1-5], p is empty
+ all[0].(*TestResource).Close()
+ for i := 0; i < 5; i++ {
+ p.Put(all[i])
+ }
+ // p = [2-5]
+
+ for i := 0; i < 4; i++ {
+ r, _ := p.Get()
+ if r.(*TestResource).num != int64(i+2) {
+ t.Errorf("Expecting %d, received %d", i+2, r.(*TestResource).num)
+ }
+ p.Put(r)
+ }
+
+ p.SetCapacity(3)
+ // p = [2-4]
+ if p.size != 3 {
+ t.Errorf("Expecting 3, received %d", p.size)
+ }
+
+ p.SetIdleTimeout(time.Duration(1e8))
+ time.Sleep(2e8)
+ r, _ := p.Get()
+ if r.(*TestResource).num != 6 {
+ t.Errorf("Expecting 6, received %d", r.(*TestResource).num)
+ }
+ p.Put(r)
+ // p = [6]
+}
+
+func TestPoolFail(t *testing.T) {
+ p := NewRoundRobin(5, time.Duration(10e9))
+ p.Open(FailFactory)
+ defer p.Close()
+ if _, err := p.Get(); err.Error() != "Failed" {
+ t.Errorf("Expecting Failed, received %c", err)
+ }
+}
+
+func TestPoolFullFail(t *testing.T) {
+ p := NewRoundRobin(2, time.Duration(10e9))
+ p.Open(SlowFailFactory)
+ defer p.Close()
+ ch := make(chan bool)
+ // The third get should not wait indefinitely
+ for i := 0; i < 3; i++ {
+ go func() {
+ p.Get()
+ ch <- true
+ }()
+ }
+ for i := 0; i < 3; i++ {
+ <-ch
+ }
+}
+
+func ResourceWait(p *RoundRobin, t *testing.T, ch chan bool) {
+ for i := 0; i < 5; i++ {
+ if r, err := p.Get(); err != nil {
+ t.Errorf("TryGet failed: %v", err)
+ } else if r.(*TestResource).num != int64(i+1) {
+ t.Errorf("Expecting %d, received %d", i+1, r.(*TestResource).num)
+ } else {
+ p.Put(r)
+ }
+ }
+ ch <- true
+}
Please sign in to comment.
Something went wrong with that request. Please try again.