Skip to content

Commit

Permalink
removed superfluous stats
Browse files Browse the repository at this point in the history
  • Loading branch information
elmacnifico committed Nov 16, 2013
1 parent d5e296f commit 894827d
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 35 deletions.
6 changes: 0 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ func (consumer *Consumer) RequeueWorking() error {

func (consumer *Consumer) ackPackage(p *Package) error {
answer := consumer.Queue.redisClient.RPop(consumerWorkingQueueKey(consumer.Queue.Name, consumer.Name))
consumer.Queue.trackStats(
consumerAckRateKey(consumer.Queue.Name, consumer.Name),
1,
true,
)
return answer.Err()
}

Expand All @@ -167,7 +162,6 @@ func (consumer *Consumer) failPackage(p *Package) error {
consumerWorkingQueueKey(consumer.Queue.Name, consumer.Name),
queueFailedKey(consumer.Queue.Name),
)
consumer.Queue.trackStats(queueFailedRateKey(consumer.Queue.Name), 1, true)
return answer.Err()
}

Expand Down
12 changes: 0 additions & 12 deletions key_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ func queueInputRateKey(queue string) string {
return queueInputKey(queue) + "::rate"
}

func queueFailedRateKey(queue string) string {
return queueFailedKey(queue) + "::rate"
}

func queueInputSizeKey(queue string) string {
return queueInputKey(queue) + "::size"
}
Expand All @@ -40,21 +36,13 @@ func queueWorkingPrefix(queue string) string {
return "redismq::" + queue + "::working"
}

func queueAckPrefix(queue string) string {
return "redismq::" + queue + "::ack"
}

func consumerWorkingQueueKey(queue, consumer string) string {
return queueWorkingPrefix(queue) + "::" + consumer
}
func consumerWorkingRateKey(queue, consumer string) string {
return consumerWorkingQueueKey(queue, consumer) + "::rate"
}

func consumerAckRateKey(queue, consumer string) string {
return queueAckPrefix(queue) + "::" + consumer + "::rate"
}

func consumerHeartbeatKey(queue, consumer string) string {
return consumerWorkingQueueKey(queue, consumer) + "::heartbeat"
}
23 changes: 7 additions & 16 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ type observer struct {
type queueStat struct {
InputRate int64
WorkRate int64
AckRate int64
FailRate int64
InputSize int64
FailSize int64
consumerStats map[string]*consumerStat
ConsumerStats map[string]*consumerStat
}

type consumerStat struct {
WorkRate int64
AckRate int64
}

func newObserver(redisURL, redisPassword string, redisDb int64) *observer {
Expand Down Expand Up @@ -68,15 +65,13 @@ func (observer *observer) update() {

func (observer *observer) poll(queue string) {
if observer.Stats[queue] == nil {
observer.Stats[queue] = &queueStat{consumerStats: make(map[string]*consumerStat)}
observer.Stats[queue] = &queueStat{ConsumerStats: make(map[string]*consumerStat)}
}
observer.Stats[queue].InputRate = observer.fetchStat(queueInputRateKey(queue))
observer.Stats[queue].FailRate = observer.fetchStat(queueFailedRateKey(queue))
observer.Stats[queue].InputSize = observer.fetchStat(queueInputSizeKey(queue))
observer.Stats[queue].FailSize = observer.fetchStat(queueFailedSizeKey(queue))

observer.Stats[queue].WorkRate = 0
observer.Stats[queue].AckRate = 0

consumers, err := observer.getConsumers(queue)
if err != nil {
Expand All @@ -85,20 +80,16 @@ func (observer *observer) poll(queue string) {
}

for _, consumer := range consumers {
if observer.Stats[queue].consumerStats[consumer] == nil {
observer.Stats[queue].consumerStats[consumer] = &consumerStat{}
if observer.Stats[queue].ConsumerStats[consumer] == nil {
observer.Stats[queue].ConsumerStats[consumer] = &consumerStat{}
}
observer.Stats[queue].consumerStats[consumer] = &consumerStat{}
observer.Stats[queue].consumerStats[consumer].AckRate = observer.fetchStat(consumerAckRateKey(queue, consumer))
observer.Stats[queue].consumerStats[consumer].WorkRate = observer.fetchStat(consumerWorkingRateKey(queue, consumer))

observer.Stats[queue].AckRate += observer.Stats[queue].consumerStats[consumer].AckRate
observer.Stats[queue].WorkRate += observer.Stats[queue].consumerStats[consumer].WorkRate
observer.Stats[queue].ConsumerStats[consumer].WorkRate = observer.fetchStat(consumerWorkingRateKey(queue, consumer))
observer.Stats[queue].WorkRate += observer.Stats[queue].ConsumerStats[consumer].WorkRate
}
}

func (observer *observer) fetchStat(keyName string) int64 {
now := time.Now().UTC().Unix() - 2 // we can only look for already written stats
now := time.Now().UTC().Unix() - 3 // we can only look for already written stats
key := fmt.Sprintf("%s::%d", keyName, now)
answer := observer.redisClient.Get(key)
if answer.Err() != nil {
Expand Down
2 changes: 1 addition & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (queue *Queue) writeStatsCacheToRedis(now int64) {
fmt.Printf("start writing for %s -> %d\n", queue.Name, now)

for sec := range queue.statsCache {
if sec == now {
if sec >= now-1 {
fmt.Println("skipped now")
continue
}
Expand Down

0 comments on commit 894827d

Please sign in to comment.