Permalink
Browse files

Set write watermark in keyserver (#1208)

  • Loading branch information...
gdbelvin committed Feb 11, 2019
1 parent d4a0678 commit 96003962ac966edf6ac6020192c337db9aecea9a
@@ -17,6 +17,7 @@ package keyserver

import (
"context"
"fmt"
"sync"

"github.com/golang/glog"
@@ -54,10 +55,16 @@ func createMetrics(mf monitoring.MetricFactory) {
directoryIDLabel, logIDLabel)
}

// WriteWatermark is the metadata that Send creates.
type WriteWatermark struct {
LogID int64
Watermark int64
}

// MutationLogs provides sets of time ordered message logs.
type MutationLogs interface {
// Send submits an item to a random log.
Send(ctx context.Context, directoryID string, mutation ...*pb.EntryUpdate) error
Send(ctx context.Context, directoryID string, mutation ...*pb.EntryUpdate) (*WriteWatermark, error)
// ReadLog returns the messages in the (low, high] range stored in the specified log.
ReadLog(ctx context.Context, directoryID string, logID, low, high int64,
batchSize int32) ([]*mutator.LogMessage, error)
@@ -562,13 +569,18 @@ func (s *Server) BatchQueueUserUpdate(ctx context.Context, in *pb.BatchQueueUser
}
}

// TODO(gbelvin): Should we validate mutations here? It is expensive in terms of latency.
// TODO(#1177): Verify parts of the mutation that don't reference the current map value here.

// Save mutation to the database.
if err := s.logs.Send(ctx, directory.DirectoryID, in.Updates...); err != nil {
wm, err := s.logs.Send(ctx, directory.DirectoryID, in.Updates...)
if err != nil {
glog.Errorf("mutations.Write failed: %v", err)
return nil, status.Errorf(codes.Internal, "Mutation write error")
}
if wm != nil {
watermarkWritten.Set(float64(wm.Watermark), directory.DirectoryID, fmt.Sprintf("%v", wm.LogID))
}

return &empty.Empty{}, nil
}

@@ -69,8 +69,8 @@ func (b batchStorage) ReadBatch(ctx context.Context, dirID string, rev int64) (*

type mutations map[int64][]*mutator.LogMessage // Map of logID to Slice of LogMessages

func (m *mutations) Send(ctx context.Context, dirID string, mutation ...*pb.EntryUpdate) error {
return errors.New("unimplemented")
func (m *mutations) Send(ctx context.Context, dirID string, mutation ...*pb.EntryUpdate) (*WriteWatermark, error) {
return nil, errors.New("unimplemented")
}

func (m *mutations) ReadLog(ctx context.Context, dirID string,
@@ -22,6 +22,7 @@ import (

"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/google/keytransparency/core/keyserver"
"github.com/google/keytransparency/core/mutator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -46,27 +47,32 @@ func (m *Mutations) AddLogs(ctx context.Context, directoryID string, logIDs ...i
}

// Send writes mutations to the leading edge (by sequence number) of the mutations table.
// Returns the logID/watermark pair that was written, or nil if nothing was written.
// TODO(gbelvin): Make updates a slice.
func (m *Mutations) Send(ctx context.Context, directoryID string, updates ...*pb.EntryUpdate) error {
func (m *Mutations) Send(ctx context.Context, directoryID string, updates ...*pb.EntryUpdate) (*keyserver.WriteWatermark, error) {
glog.Infof("mutationstorage: Send(%v, <mutation>)", directoryID)
if len(updates) == 0 {
return nil
return nil, nil
}
logID, err := m.randLog(ctx, directoryID)
if err != nil {
return err
return nil, err
}
updateData := make([][]byte, 0, len(updates))
for _, u := range updates {
data, err := proto.Marshal(u)
if err != nil {
return err
return nil, err
}
updateData = append(updateData, data)
}
// TODO(gbelvin): Implement retry with backoff for retryable errors if
// we get timestamp contention.
return m.send(ctx, time.Now(), directoryID, logID, updateData...)
ts := time.Now()
if err := m.send(ctx, ts, directoryID, logID, updateData...); err != nil {
return nil, err
}
return &keyserver.WriteWatermark{LogID: logID, Watermark: ts.UnixNano()}, nil
}

// ListLogs returns a list of all logs for directoryID, optionally filtered for writable logs.
@@ -102,7 +102,7 @@ func BenchmarkSend(b *testing.B) {
updates = append(updates, update)
}
for n := 0; n < b.N; n++ {
if err := m.Send(ctx, directoryID, updates...); err != nil {
if _, err := m.Send(ctx, directoryID, updates...); err != nil {
b.Errorf("Send(): %v", err)
}
}
@@ -201,7 +201,7 @@ func TestReadLog(t *testing.T) {
m := newForTest(ctx, t, logID)
for i := byte(0); i < 10; i++ {
entry := &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: mustMarshal(t, &pb.Entry{Index: []byte{i}})}}
if err := m.Send(ctx, directoryID, entry, entry, entry); err != nil {
if _, err := m.Send(ctx, directoryID, entry, entry, entry); err != nil {
t.Fatalf("Send(): %v", err)
}
}

0 comments on commit 9600396

Please sign in to comment.