Skip to content
Permalink
Browse files

implement hybrid logical clock

Summary:
Implementation closely followed http://www.cse.buffalo.edu/tech-reports/2014-04.pdf. I rearranged the update algorithms to a form that I found more intuitive and practical, so it is advisable that someone look over the tests to rule out that I got something wrong and subsequently wrote wrong tests.
Nothing else with regards to clock skew has been done. In practice, via SetMaxDiff the allowed skew can continuously be updated depending on what delays are being gossipped/measured.

Test Plan: read paper and compare update routines, manual step-by-step study of some test cases, make test, make testrace, make coverage

Reviewers: andybons, spencerkimball

Reviewed By: spencerkimball

Differential Revision: http://phabricator.cockroachdb.org/D78
  • Loading branch information
tbg committed Jul 16, 2014
1 parent 447610d commit aebb70b0d3e2f0a71e06cbedef45a4fd731f5367
Showing with 451 additions and 53 deletions.
  1. +0 −12 TODO
  2. +4 −14 gossip/client_test.go
  3. +1 −1 gossip/doc.go
  4. +1 −1 gossip/group.go
  5. +3 −1 gossip/group_test.go
  6. +14 −19 gossip/info.go
  7. +227 −0 hlc/hlc.go
  8. +165 −0 hlc/hlc_test.go
  9. +6 −2 rpc/client_test.go
  10. +4 −1 simulation/gossip.go
  11. +1 −2 storage/config.go
  12. +25 −0 util/ordered.go
12 TODO
@@ -1,15 +1,3 @@
Cockroach TODO

* HLC Timestamp. Suggested HLC struct:

type HLTimestamp struct {
WallTime int64 // nanos since epoch
Logical int32
}

- Disallow regression


* Transactions

- Transaction ID: generated via allocation of block of transaction
@@ -26,18 +26,6 @@ import (
"github.com/golang/glog"
)

func waitFor(cond func() bool, desc string, t *testing.T) {
const maxTime = 500 * time.Millisecond
for elapsed := 0 * time.Nanosecond; elapsed < maxTime; {
if cond() {
return
}
time.Sleep(maxTime / 100)
elapsed += maxTime / 100
}
t.Errorf("exceeded %s waiting for %s", maxTime, desc)
}

// startGossip creates local and remote gossip instances.
// The remote gossip instance launches its gossip service.
func startGossip(t *testing.T) (local, remote *Gossip, lserver, rserver *rpc.Server) {
@@ -65,11 +53,13 @@ func TestClientGossip(t *testing.T) {
client := newClient(remote.is.NodeAddr)
go client.start(local, disconnected)

waitFor(func() bool {
if err := util.IsTrueWithin(func() bool {
_, lerr := remote.GetInfo("local-key")
_, rerr := local.GetInfo("remote-key")
return lerr == nil && rerr == nil
}, "gossip exchange", t)
}, 500*time.Millisecond); err != nil {
t.Errorf("gossip exchange failed or taking too long")
}

remote.stop()
local.stop()
@@ -37,7 +37,7 @@ least loaded nodes or the 100 disks with most unused capacity).
Single-valued info values can have any type. Values to be used with
groups must either be of type int64, float64, string or implement the
gossip.Ordered interface.
util.Ordered interface.
A map of info objects and a map of Group objects are kept by a Gossip
instance. Single-valued info objects can be added via
@@ -154,7 +154,7 @@ func (g *group) getInfo(key string) *info {
// Check TTL and discard if too old.
now := time.Now().UnixNano()
if i.TTLStamp <= now {
delete(g.Infos, key)
g.removeInternal(i)
return nil
}
return i
@@ -23,6 +23,8 @@ import (
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/util"
)

func newTestInfo(key string, val interface{}) *info {
@@ -316,7 +318,7 @@ type testValue struct {
stringVal string
}

func (t *testValue) Less(o Ordered) bool {
func (t *testValue) Less(o util.Ordered) bool {
return t.intVal < o.(*testValue).intVal
}

@@ -21,29 +21,24 @@ import (
"net"
"strings"

"github.com/cockroachdb/cockroach/util"
"github.com/golang/glog"
)

// Ordered is used to compare info values when managing info groups.
// Info values which are not int64, float64 or string must implement
// this interface to be used with groups.
type Ordered interface {
// Returns true if the supplied Ordered value is less than this
// object.
Less(b Ordered) bool
}

// info is the basic unit of information traded over the gossip
// network.
type info struct {
Key string // Info key
Val interface{} // Info value: must be one of {int64, float64, string}
Timestamp int64 // Wall time at origination (Unix-nanos)
TTLStamp int64 // Wall time before info is discarded (Unix-nanos)
Hops uint32 // Number of hops from originator
NodeAddr net.Addr // Originating node in "host:port" format
peerAddr net.Addr // Proximate peer which passed us the info
seq int64 // Sequence number for incremental updates
Key string // Info key
// Info value: must be one of {int65, float64, string} or
// implement the util.Ordered interface to be used with groups.
// For single infos any type is allowed.
Val interface{}
Timestamp int64 // Wall time at origination (Unix-nanos)
TTLStamp int64 // Wall time before info is discarded (Unix-nanos)
Hops uint32 // Number of hops from originator
NodeAddr net.Addr // Originating node in "host:port" format
peerAddr net.Addr // Proximate peer which passed us the info
seq int64 // Sequence number for incremental updates
}

// infoPrefix returns the text preceding the last period within
@@ -66,8 +61,8 @@ func (i *info) less(b *info) bool {
case string:
return t < b.Val.(string)
default:
if ord, ok := i.Val.(Ordered); ok {
return ord.Less(b.Val.(Ordered))
if ord, ok := i.Val.(util.Ordered); ok {
return ord.Less(b.Val.(util.Ordered))
}
glog.Fatalf("unhandled info value type: %s", t)
}
@@ -0,0 +1,227 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com)

// Package hlc implements the Hybrid Logical Clock outlined in
// "Logical Physical Clocks and Consistent Snapshots in Globally
// Distributed Databases", available online at
// http://www.cse.buffalo.edu/tech-reports/2014-04.pdf.
package hlc

import (
"sync"
"time"

"github.com/cockroachdb/cockroach/util"
)

// TODO(Tobias): Figure out if it would make sense to save some
// history of the physical clock and react if it jumps backwards
// repeatedly. This is expected during NTP updates, but may
// indicate a broken clock in some cases.

// HLTimestamp represents a state of the hybrid
// logical clock.
type HLTimestamp struct {
// Holds a wall time, typically a unix epoch time
// expressed in nanoseconds.
WallTime int64
// The logical component captures causality for
// events whose wall times are equal. It is
// effectively bounded by
// (maximum clock skew)/(minimal ns between events)
// and nearly impossible to overflow.
Logical uint64
}

// Less implements the util.Ordered interface, allowing
// the comparison of timestamps.
func (t HLTimestamp) Less(s HLTimestamp) bool {
return t.WallTime < s.WallTime && t.Logical < s.Logical
}

// HLClock is a hybrid logical clock. Objects of this
// type model causality while maintaining a relation
// to physical time. Roughly speaking, timestamps
// consist of the largest wall clock time among all
// events, and a logical clock that ticks whenever
// an event happens in the future of the local physical
// clock.
// The data structure is thread safe and thus can safely
// be shared by multiple goroutines.
//
// See NewHLClock for details.
type HLClock struct {
physicalClock func() int64
// HLClock contains a mutex used to lock the below
// fields while methods operate on them.
sync.Mutex
state HLTimestamp
// maxDrift specifies how far ahead of the physical
// clock the wall time can be.
// See SetMaxDrift.
maxDrift uint
}

// ManualClock is a convenience type to facilitate
// creating a hybrid logical clock whose physical clock
// is manually controlled.
type ManualClock int64

// UnixNano returns the underlying manual clock's timestamp.
func (m *ManualClock) UnixNano() int64 {
return int64(*m)
}

// UnixNano returns the local machine's physical nanosecond
// unix epoch timestamp as a convenience to create a HLC via
// c := hlc.NewHLClock(hlc.UnixNano).
func UnixNano() int64 {
return time.Now().UnixNano()
}

// NewHLClock creates a new hybrid logical clock associated
// with the given physical clock, initializing both wall time
// and logical time with zero.
//
// The physical clock is typically given by the wall time
// of the local machine in unix epoch nanoseconds, using
// hlc.UnixNano. This is not a requirement.
func NewHLClock(physicalClock func() int64) *HLClock {
return &HLClock{
physicalClock: physicalClock,
}
}

// SetMaxDrift sets the maximal drift in nanoseconds
// from the physical clock that a call to Update may cause.
// A well-chosen value is large enough to ignore a
// reasonable amount of clock skew but will prevent
// ill-configured nodes from dramatically skewing the
// wall time of the clock into the future.
//
// A value of zero disables this safety feature.
// The default value for a new instance is zero.
func (c *HLClock) SetMaxDrift(delta uint) {
c.Lock()
defer c.Unlock()
c.maxDrift = delta
}

// MaxDrift returns the maximal drift allowed.
// A value of 0 means drift checking is disabled.
// See SetMaxDrift for details.
func (c *HLClock) MaxDrift() uint {
c.Lock()
defer c.Unlock()
return c.maxDrift
}

// Timestamp returns a copy of the clock's current timestamp,
// without performing a clock adjustment.
func (c *HLClock) Timestamp() HLTimestamp {
c.Lock()
defer c.Unlock()
return c.timestamp()
}

// timestamp returns the state as a timestamp, without
// a lock on the clock's state, for internal usage.
func (c *HLClock) timestamp() HLTimestamp {
return HLTimestamp{
WallTime: c.state.WallTime,
Logical: c.state.Logical,
}
}

// Now returns a timestamp associated with an event from
// the local machine that may be sent to other members
// of the distributed network. This is the counterpart
// of Update, which is passed a timestamp received from
// another member of the distributed network.
func (c *HLClock) Now() (result HLTimestamp) {
c.Lock()
defer c.Unlock()
defer func() {
result = c.timestamp()
}()

physicalClock := c.physicalClock()
if c.state.WallTime >= physicalClock {
// The wall time is ahead, so the logical clock ticks.
c.state.Logical++
} else {
// Use the physical clock, and reset the logical one.
c.state.WallTime = physicalClock
c.state.Logical = 0
}
return
}

// Update takes a hybrid timestamp, usually originating from
// an event received from another member of a distributed
// system. The clock is updated and the hybrid timestamp
// associated to the receipt of the event returned.
// An error may only occur if drift checking is active and
// the remote timestamp was rejected due to clock drift,
// in which case the state of the clock will not have been
// altered.
// To timestamp events of local origin, use Now instead.
func (c *HLClock) Update(rt HLTimestamp) (result HLTimestamp, err error) {
c.Lock()
defer c.Unlock()
defer func() {
result = c.timestamp()
}()
physicalClock := c.physicalClock()

if physicalClock > c.state.WallTime && physicalClock > rt.WallTime {
// Our physical clock is ahead of both wall times. It is used
// as the new wall time and the logical clock is reset.
c.state.WallTime = physicalClock
c.state.Logical = 0
return
}

// In the remaining cases, our physical clock plays no role
// as it is behind the local and remote wall times. Instead,
// the logical clock comes into play.
if rt.WallTime > c.state.WallTime {
if c.maxDrift > 0 && uint(rt.WallTime-physicalClock) > c.maxDrift {
// The remote wall time is too far ahead to be trustworthy.
err = util.Errorf("Remote wall time drifts from local physical clock: %d (%dns ahead)", rt.WallTime, rt.WallTime-physicalClock)
return
}
// The remote clock is ahead of ours, and we update
// our own logical clock with theirs.
c.state.WallTime = rt.WallTime
c.state.Logical = rt.Logical + 1
} else if c.state.WallTime > rt.WallTime {
// Our wall time is larger, so it remains but we tick
// the logical clock.
c.state.Logical++
} else {
// Both wall times are equal, and the larger logical
// clock is used for the update.
if rt.Logical > c.state.Logical {
c.state.Logical = rt.Logical
}
c.state.Logical++
}
// The variable result will be updated via defer just
// before the object is unlocked.
return
}

0 comments on commit aebb70b

Please sign in to comment.
You can’t perform that action at this time.