Skip to content

Commit

Permalink
Merge branch 'rpclock'
Browse files Browse the repository at this point in the history
This simplifies the rpc implementation by removing the monitor/coordinate
goroutine.  Table state is now protected by a conn-level mutex.  Many false
abstractions removed, since most everything needs to be able to operate at the
Conn level.  Teardown is now guaranteed to happen regardless of abort or close.

There's no change in exported API, save for a new ConnOption: ConnLog.  ConnLog
sets the connection's logging destination.

The PingPong benchmark shows some small performance gains, but nothing
substantial.  Future work will be to reduce the number of allocations made.
  • Loading branch information
zombiezen committed Sep 9, 2016
2 parents 26069c8 + 10d94aa commit c55cda4
Show file tree
Hide file tree
Showing 20 changed files with 1,328 additions and 1,263 deletions.
98 changes: 54 additions & 44 deletions internal/fulfiller/fulfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,53 +178,49 @@ type pcall struct {
ecall
}

// embargoClient is a client that flushes a queue of calls.
type embargoClient struct {
// EmbargoClient is a client that flushes a queue of calls.
// Fulfiller will create these automatically when pipelined calls are
// made on unresolved answers. EmbargoClient is exported so that rpc
// can avoid making calls on its own Conn.
type EmbargoClient struct {
client capnp.Client

mu sync.RWMutex
q queue.Queue
mu sync.RWMutex
q queue.Queue
calls ecallList
}

func newEmbargoClient(client capnp.Client, queue []ecall) capnp.Client {
ec := &embargoClient{client: client}
qq := make(ecallList, callQueueSize)
n := copy(qq, queue)
ec.q.Init(qq, n)
ec := &EmbargoClient{
client: client,
calls: make(ecallList, callQueueSize),
}
ec.q.Init(ec.calls, copy(ec.calls, queue))
go ec.flushQueue()
return ec
}

func (ec *embargoClient) push(cl *capnp.Call) capnp.Answer {
func (ec *EmbargoClient) push(cl *capnp.Call) capnp.Answer {
f := new(Fulfiller)
cl, err := cl.Copy(nil)
if err != nil {
return capnp.ErrorAnswer(err)
}
if ok := ec.q.Push(ecall{cl, f}); !ok {
i := ec.q.Push()
if i == -1 {
return capnp.ErrorAnswer(errCallQueueFull)
}
ec.calls[i] = ecall{cl, f}
return f
}

func (ec *embargoClient) peek() ecall {
if ec.q.Len() == 0 {
return ecall{}
}
return ec.q.Peek().(ecall)
}

func (ec *embargoClient) pop() ecall {
if ec.q.Len() == 0 {
return ecall{}
}
return ec.q.Pop().(ecall)
}

// flushQueue is run in its own goroutine.
func (ec *embargoClient) flushQueue() {
func (ec *EmbargoClient) flushQueue() {
var c ecall
ec.mu.Lock()
c := ec.peek()
if i := ec.q.Front(); i != -1 {
c = ec.calls[i]
}
ec.mu.Unlock()
for c.call != nil {
ans := ec.client.Call(c.call)
Expand All @@ -237,13 +233,19 @@ func (ec *embargoClient) flushQueue() {
}
}(c.f, ans)
ec.mu.Lock()
ec.pop()
c = ec.peek()
ec.q.Pop()
if i := ec.q.Front(); i != -1 {
c = ec.calls[i]
} else {
c = ecall{}
}
ec.mu.Unlock()
}
}

func (ec *embargoClient) WrappedClient() capnp.Client {
// Client returns the underlying client if the embargo has been lifted
// and nil otherwise.
func (ec *EmbargoClient) Client() capnp.Client {
ec.mu.RLock()
ok := ec.isPassthrough()
ec.mu.RUnlock()
Expand All @@ -253,11 +255,13 @@ func (ec *embargoClient) WrappedClient() capnp.Client {
return ec.client
}

func (ec *embargoClient) isPassthrough() bool {
func (ec *EmbargoClient) isPassthrough() bool {
return ec.q.Len() == 0
}

func (ec *embargoClient) Call(cl *capnp.Call) capnp.Answer {
// Call either queues a call to the underlying client or starts a call
// if the embargo has been lifted.
func (ec *EmbargoClient) Call(cl *capnp.Call) capnp.Answer {
// Fast path: queue is flushed.
ec.mu.RLock()
ok := ec.isPassthrough()
Expand All @@ -278,12 +282,26 @@ func (ec *embargoClient) Call(cl *capnp.Call) capnp.Answer {
return ans
}

func (ec *embargoClient) Close() error {
// TryQueue will attempt to queue a call or return nil if the embargo
// has been lifted.
func (ec *EmbargoClient) TryQueue(cl *capnp.Call) capnp.Answer {
ec.mu.Lock()
if ec.isPassthrough() {
ec.mu.Unlock()
return nil
}
ans := ec.push(cl)
ec.mu.Unlock()
return ans
}

// Close closes the underlying client, rejecting any queued calls.
func (ec *EmbargoClient) Close() error {
ec.mu.Lock()
// reject all queued calls
for ec.q.Len() > 0 {
c := ec.pop()
c.f.Reject(errQueueCallCancel)
ec.calls[ec.q.Front()].f.Reject(errQueueCallCancel)
ec.q.Pop()
}
ec.mu.Unlock()
return ec.client.Close()
Expand All @@ -301,16 +319,8 @@ func (el ecallList) Len() int {
return len(el)
}

func (el ecallList) At(i int) interface{} {
return el[i]
}

func (el ecallList) Set(i int, x interface{}) {
if x == nil {
el[i] = ecall{}
} else {
el[i] = x.(ecall)
}
func (el ecallList) Clear(i int) {
el[i] = ecall{}
}

var (
Expand Down
57 changes: 28 additions & 29 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ type Queue struct {
q Interface
start int
n int
cap int
}

// New creates a new queue that starts with n elements.
// New creates a new queue that starts with n elements. The interface's
// length must not change over the course of the queue's usage.
func New(q Interface, n int) *Queue {
qq := new(Queue)
qq.Init(q, n)
Expand All @@ -18,54 +20,51 @@ func New(q Interface, n int) *Queue {
// Init initializes a queue. The old queue is untouched.
func (q *Queue) Init(r Interface, n int) {
q.q = r
q.start, q.n = 0, n
q.start = 0
q.n = n
q.cap = r.Len()
}

// Len returns the length of the queue. This is different from the
// underlying interface's length.
// underlying interface's length, which is the queue's capacity.
func (q *Queue) Len() int {
return q.n
}

// Push pushes an element on the queue. If the queue is full,
// Push returns false. If x is nil, Push panics.
func (q *Queue) Push(x interface{}) bool {
n := q.q.Len()
if q.n >= n {
return false
// Push reserves space for an element on the queue, returning its index.
// If the queue is full, Push returns -1.
func (q *Queue) Push() int {
if q.n >= q.cap {
return -1
}
i := (q.start + q.n) % n
q.q.Set(i, x)
i := (q.start + q.n) % q.cap
q.n++
return true
return i
}

// Peek returns the element at the front of the queue.
// If the queue is empty, Peek panics.
func (q *Queue) Peek() interface{} {
// Front returns the index of the front of the queue, or -1 if the queue is empty.
func (q *Queue) Front() int {
if q.n == 0 {
panic("Queue.Pop called on empty queue")
return -1
}
return q.q.At(q.start)
return q.start
}

// Pop pops an element from the queue.
// If the queue is empty, Pop panics.
func (q *Queue) Pop() interface{} {
x := q.Peek()
q.q.Set(q.start, nil)
q.start = (q.start + 1) % q.q.Len()
// Pop pops an element from the queue, returning whether it succeeded.
func (q *Queue) Pop() bool {
if q.n == 0 {
return false
}
q.q.Clear(q.start)
q.start = (q.start + 1) % q.cap
q.n--
return x
return true
}

// A type implementing Interface can be used to store elements in a Queue.
type Interface interface {
// Len returns the number of elements available.
Len() int
// At returns the element at i.
At(i int) interface{}
// Set sets the element at i to x.
// If x is nil, that element should be cleared.
Set(i int, x interface{})
// Clear removes the element at i.
Clear(i int)
}
Loading

0 comments on commit c55cda4

Please sign in to comment.