Skip to content
Permalink
Browse files

Initial commit

  • Loading branch information...
tomwilkie committed Jun 22, 2016
0 parents commit 1482486bb347690af94c6777554438b697d37207
Showing with 368 additions and 0 deletions.
  1. +168 −0 consul_client.go
  2. +66 −0 distributor.go
  3. +40 −0 notes.md
  4. +5 −0 prog/main.go
  5. +89 −0 ring.go
@@ -0,0 +1,168 @@
package frankenstein

import (
"bytes"
"encoding/json"
"fmt"
"time"

log "github.com/Sirupsen/logrus"
consul "github.com/hashicorp/consul/api"
)

const (
longPollDuration = 10 * time.Second
)

// ConsulClient is a high-level client for Consul, that exposes operations
// such as CAS and Watch which take callbacks. It also deals with serialisation.
type ConsulClient interface {
Get(key string, out interface{}) error
CAS(key string, out interface{}, f CASCallback) error
WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool)
}

// CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil.
type CASCallback func(in interface{}) (out interface{}, retry bool, err error)

// NewConsulClient returns a new ConsulClient
func NewConsulClient(addr string) (ConsulClient, error) {
client, err := consul.NewClient(&consul.Config{
Address: addr,
Scheme: "http",
})
if err != nil {
return nil, err
}
return &consulClient{client.KV()}, nil
}

var (
queryOptions = &consul.QueryOptions{
RequireConsistent: true,
}
writeOptions = &consul.WriteOptions{}

// ErrNotFound is returned by ConsulClient.Get
ErrNotFound = fmt.Errorf("Not found")
)

type kv interface {
CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error)
Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error)
List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error)
}

type consulClient struct {
kv
}

// Get and deserialise a JSON value from consul.
func (c *consulClient) Get(key string, out interface{}) error {
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
return err
}
if kvp == nil {
return ErrNotFound
}
if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil {
return err
}
return nil
}

// CAS atomically modify a value in a callback.
// If value doesn't exist you'll get nil as a argument to your callback.
func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
var (
index = uint64(0)
retries = 10
retry = true
intermediate interface{}
)
for i := 0; i < retries; i++ {
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
log.Errorf("Error getting %s: %v", key, err)
continue
}
if kvp != nil {
if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil {
log.Errorf("Error deserialising %s: %v", key, err)
continue
}
index = kvp.ModifyIndex // if key doesn't exist, index will be 0
intermediate = out
}

intermediate, retry, err = f(intermediate)
if err != nil {
log.Errorf("Error CASing %s: %v", key, err)
if !retry {
return err
}
continue
}

if intermediate == nil {
panic("Callback must instantiate value!")
}

value := bytes.Buffer{}
if err := json.NewEncoder(&value).Encode(intermediate); err != nil {
log.Errorf("Error serialising value for %s: %v", key, err)
continue
}
ok, _, err := c.kv.CAS(&consul.KVPair{
Key: key,
Value: value.Bytes(),
ModifyIndex: index,
}, writeOptions)
if err != nil {
log.Errorf("Error CASing %s: %v", key, err)
continue
}
if !ok {
log.Errorf("Error CASing %s, trying again %d", key, index)
continue
}
return nil
}
return fmt.Errorf("Failed to CAS %s", key)
}

func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) {
index := uint64(0)
for {
select {
case <-done:
return
default:
}

kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{
RequireConsistent: true,
WaitIndex: index,
WaitTime: longPollDuration,
})
if err != nil {
log.Errorf("Error getting path %s: %v", prefix, err)
continue
}
if index == meta.LastIndex {
continue
}
index = meta.LastIndex

for _, kvp := range kvps {
if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil {
log.Errorf("Error deserialising %s: %v", kvp.Key, err)
continue
}
if !f(kvp.Key, out) {
return
}
}
}
}
@@ -0,0 +1,66 @@
package frankenstein

import (
"sync"

"github.com/prometheus/common/model"
)

type distributor struct {
consul ConsulClient
ring *Ring
cfg distributorConfig

quit chan struct{}
done sync.WaitGroup
}

type distributorConfig struct {
consulHost string
consulPrefix string
}

type collector struct {
hostname string
tokens []uint64
}

func NewDistributor(cfg distributorConfig) (*distributor, error) {
consul, err := NewConsulClient(cfg.consulHost)
if err != nil {
return nil, err
}
d := &distributor{
consul: consul,
ring: NewRing(),
cfg: cfg,
quit: make(chan struct{}),
}
d.done.Add(1)
go d.loop()
return d, nil
}

func (d *distributor) Stop() {
close(d.quit)
d.done.Wait()
}

func (d *distributor) loop() {
defer d.done.Done()
d.consul.WatchPrefix(d.cfg.consulPrefix, &collector{}, d.quit, func(key string, value interface{}) bool {
c := *value.(*collector)
d.ring.Update(c)
return true
})
}

// Append implements storage.SampleAppender
func (d *distributor) Append(*model.Sample) error {
return nil
}

// NeedsThrottling implements storage.SampleAppender
func (*distributor) NeedsThrottling() bool {
return false
}
@@ -0,0 +1,40 @@
= Project Frankenstein

Design Doc: https://docs.google.com/document/d/1C7yhMnb1x2sfeoe45f4mnnKConvroWhJ8KQZwIHJOuw/edit#heading=h.f7lkb8wswewc

== Retrieval

Use existing prometheus binary; add a --retrieval-only flag to existing prometheus? and use one of the remote storage protocols or add a new one.

- Brian's generic write PR https://github.com/prometheus/prometheus/pull/1487

== Distribution

Use a consistent hasing library to distribute timeseries to collectors

== Collection

Use existing prometheus binary with addind push interface. Adapt memorySeriesStorage with support for flushing chunks to something else.

== Query

Use existing prometheus binary with flags to point it at distribution?

== Storage

Hash key = [userid, hour bucket, metric name]
Range key = [label, value, chunk id]

What queries don't use the metric name?
- number of metrics
- timeseries per job

Would adding a layer of indirection for metric name hurt?

We need to know what labels an existing chunk has, to return to the user.
- each chunk could contain these value
- or the chunk id could be the finger print, and have a separate lookup table

How to assign ids?
- hashes have collisions (fnv1a)
- unique prefixes and locally
@@ -0,0 +1,5 @@
package main

func main() {

}
89 ring.go
@@ -0,0 +1,89 @@
package frankenstein

// Based on https://raw.githubusercontent.com/stathat/consistent/master/consistent.go

import (
"errors"
"sort"
"strconv"
"sync"
)

type uint64s []uint64

func (x uint64s) Len() int { return len(x) }
func (x uint64s) Less(i, j int) bool { return x[i] < x[j] }
func (x uint64s) Swap(i, j int) { x[i], x[j] = x[j], x[i] }

// ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
var ErrEmptyRing = errors.New("empty circle")

// Ring holds the information about the members of the consistent hash circle.
type Ring struct {
sync.RWMutex

circle map[uint64]collector
sortedHashes uint64s
count int64
scratch [64]byte
}

// NewRing creates a new Ring object with a default setting of 20 replicas for each entry.
//
// To change the number of replicas, set NumberOfReplicas before adding entries.
func NewRing() *Ring {
return &Ring{
circle: map[uint64]collector{},
}
}

// eltKey generates a string key for an element with an index.
func (c *Ring) eltKey(elt string, idx int) string {
// return elt + "|" + strconv.Itoa(idx)
return strconv.Itoa(idx) + elt
}

// Add inserts a string element in the consistent hash.
func (c *Ring) Update(col collector) {
c.Lock()
defer c.Unlock()
for _, token := range col.tokens {
c.circle[token] = col
}
c.updateSortedHashes()
}

// Get returns an element close to the hash in the circle.
func (c *Ring) Get(key uint64) (collector, error) {
c.RLock()
defer c.RUnlock()
if len(c.circle) == 0 {
return collector{}, ErrEmptyRing
}
i := c.search(key)
return c.circle[c.sortedHashes[i]], nil
}

func (c *Ring) search(key uint64) (i int) {
f := func(x int) bool {
return c.sortedHashes[x] > key
}
i = sort.Search(len(c.sortedHashes), f)
if i >= len(c.sortedHashes) {
i = 0
}
return
}

func (c *Ring) updateSortedHashes() {
hashes := c.sortedHashes[:0]
//reallocate if we're holding on to too much (1/4th)
if cap(c.sortedHashes) < len(c.circle) {
hashes = nil
}
for k := range c.circle {
hashes = append(hashes, k)
}
sort.Sort(hashes)
c.sortedHashes = hashes
}

0 comments on commit 1482486

Please sign in to comment.
You can’t perform that action at this time.