Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
197 lines (161 sloc) 5.04 KB
package hystrix
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// CircuitBreaker is created for each ExecutorPool to track whether requests
// should be attempted, or rejected if the Health of the circuit is too low.
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64
executorPool *executorPool
metrics *metricExchange
}
var (
circuitBreakersMutex *sync.RWMutex
circuitBreakers map[string]*CircuitBreaker
)
func init() {
circuitBreakersMutex = &sync.RWMutex{}
circuitBreakers = make(map[string]*CircuitBreaker)
}
// GetCircuit returns the circuit for the given command and whether this call created it.
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
if !ok {
circuitBreakersMutex.RUnlock()
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
// because we released the rlock before we obtained the exclusive lock,
// we need to double check that some other thread didn't beat us to
// creation.
if cb, ok := circuitBreakers[name]; ok {
return cb, false, nil
}
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}
return circuitBreakers[name], !ok, nil
}
// Flush purges all circuit and metric information from memory.
func Flush() {
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
for name, cb := range circuitBreakers {
cb.metrics.Reset()
cb.executorPool.Metrics.Reset()
delete(circuitBreakers, name)
}
}
// newCircuitBreaker creates a CircuitBreaker with associated Health
func newCircuitBreaker(name string) *CircuitBreaker {
c := &CircuitBreaker{}
c.Name = name
c.metrics = newMetricExchange(name)
c.executorPool = newExecutorPool(name)
c.mutex = &sync.RWMutex{}
return c
}
// toggleForceOpen allows manually causing the fallback logic for all instances
// of a given command.
func (circuit *CircuitBreaker) toggleForceOpen(toggle bool) error {
circuit, _, err := GetCircuit(circuit.Name)
if err != nil {
return err
}
circuit.forceOpen = toggle
return nil
}
// IsOpen is called before any Command execution to check whether or
// not it should be attempted. An "open" circuit means it is disabled.
func (circuit *CircuitBreaker) IsOpen() bool {
circuit.mutex.RLock()
o := circuit.forceOpen || circuit.open
circuit.mutex.RUnlock()
if o {
return true
}
if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
return false
}
if !circuit.metrics.IsHealthy(time.Now()) {
// too many failures, open the circuit
circuit.setOpen()
return true
}
return false
}
// AllowRequest is checked before a command executes, ensuring that circuit state and metric health allow it.
// When the circuit is open, this call will occasionally return true to measure whether the external service
// has recovered.
func (circuit *CircuitBreaker) AllowRequest() bool {
return !circuit.IsOpen() || circuit.allowSingleTest()
}
func (circuit *CircuitBreaker) allowSingleTest() bool {
circuit.mutex.RLock()
defer circuit.mutex.RUnlock()
now := time.Now().UnixNano()
openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
if swapped {
log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
}
return swapped
}
return false
}
func (circuit *CircuitBreaker) setOpen() {
circuit.mutex.Lock()
defer circuit.mutex.Unlock()
if circuit.open {
return
}
log.Printf("hystrix-go: opening circuit %v", circuit.Name)
circuit.openedOrLastTestedTime = time.Now().UnixNano()
circuit.open = true
}
func (circuit *CircuitBreaker) setClose() {
circuit.mutex.Lock()
defer circuit.mutex.Unlock()
if !circuit.open {
return
}
log.Printf("hystrix-go: closing circuit %v", circuit.Name)
circuit.open = false
circuit.metrics.Reset()
}
// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
if len(eventTypes) == 0 {
return fmt.Errorf("no event types sent for metrics")
}
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
if eventTypes[0] == "success" && o {
circuit.setClose()
}
var concurrencyInUse float64
if circuit.executorPool.Max > 0 {
concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
}
select {
case circuit.metrics.Updates <- &commandExecution{
Types: eventTypes,
Start: start,
RunDuration: runDuration,
ConcurrencyInUse: concurrencyInUse,
}:
default:
return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
}
return nil
}