Permalink
Browse files

Replace a Map of LogSources with a List of LogSources (#1124)

* Remove map from map metadata
* Switch from map[logID]watermark to []SourceSlice
* Disable noizy golangci scopelint
* s/ShardId/SliceIndex/g
  • Loading branch information...
gdbelvin committed Nov 26, 2018
1 parent 4930b71 commit a85af3408ae985350b9c46660bf1ab7abf2f2bd1
@@ -17,6 +17,7 @@ linters:
enable-all: true
disable:
- maligned
- scopelint
issues:
exclude:
@@ -16,8 +16,6 @@ package keyserver
import (
"encoding/base64"
"math"
"sort"
"github.com/golang/protobuf/proto"
"github.com/google/keytransparency/core/mutator"
@@ -44,11 +42,11 @@ func DecodeToken(token string, msg proto.Message) error {
return proto.Unmarshal(b, msg)
}
// SourceMap is a paginator for a map of sources.
type SourceMap map[int64]*spb.MapMetadata_SourceSlice
// SourceList is a paginator for a list of source slices.
type SourceList []*spb.MapMetadata_SourceSlice
// ParseToken will return the first token if token is "", otherwise it will try to parse the read token.
func (s SourceMap) ParseToken(token string) (*rtpb.ReadToken, error) {
func (s SourceList) ParseToken(token string) (*rtpb.ReadToken, error) {
if token == "" {
return s.First(), nil
}
@@ -60,77 +58,36 @@ func (s SourceMap) ParseToken(token string) (*rtpb.ReadToken, error) {
}
// First returns the first read parameters for this source.
func (s SourceMap) First() *rtpb.ReadToken {
func (s SourceList) First() *rtpb.ReadToken {
if len(s) == 0 {
// Empty struct means there is nothing else to page through.
return &rtpb.ReadToken{}
}
firstLog := int64(math.MaxInt64)
for logID := range s {
if logID < firstLog {
firstLog = logID
}
}
return &rtpb.ReadToken{
ShardId: firstLog,
LowWatermark: s[firstLog].LowestWatermark,
SliceIndex: 0,
LowWatermark: s[0].LowestWatermark,
}
}
// Next returns the next read token. Returns an empty struct when the read is finished.
// lastRow is the (batchSize)th row from the last read, or nil if fewer than
// batchSize + 1 rows were returned.
func (s SourceMap) Next(rt *rtpb.ReadToken, lastRow *mutator.LogMessage) *rtpb.ReadToken {
func (s SourceList) Next(rt *rtpb.ReadToken, lastRow *mutator.LogMessage) *rtpb.ReadToken {
if lastRow != nil {
// There are more items in this shard.
// There are more items in this source slice.
return &rtpb.ReadToken{
ShardId: rt.ShardId,
SliceIndex: rt.SliceIndex,
LowWatermark: lastRow.ID,
}
}
// Advance to the next shard.
nextShard, ok := s.NextShard(rt.ShardId)
if !ok {
// Advance to the next slice.
if rt.SliceIndex >= int64(len(s))-1 {
// There are no more source slices to iterate over.
return &rtpb.ReadToken{} // Encodes to ""
}
return &rtpb.ReadToken{
ShardId: nextShard,
LowWatermark: s[nextShard].LowestWatermark,
}
}
// NextShard returns the next shardID from the SourceMap.
// Returns false if there are no more shards or shardID is not in SourceMap.
func (s SourceMap) NextShard(shardID int64) (int64, bool) {
// Sorted list of shardIDs.
shardIDs := sortedKeys(s)
// Index of current shard.
i := sort.Search(len(shardIDs), func(i int) bool { return shardIDs[i] >= shardID })
if i == -1 {
// shardID isn't in SourceMap.
return 0, false
SliceIndex: rt.SliceIndex + 1,
LowWatermark: s[rt.SliceIndex+1].LowestWatermark,
}
if i >= len(shardIDs)-1 {
// there are no more shards to iterate over.
return 0, false
}
return shardIDs[i+1], true
}
// sortedSources returns the map keys, sorted low to high.
func sortedKeys(sources SourceMap) []int64 {
shardIDs := make(int64Slice, 0, len(sources))
for id := range sources {
shardIDs = append(shardIDs, id)
}
sort.Sort(shardIDs)
return shardIDs
}
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
@@ -21,7 +21,6 @@ import (
rtpb "github.com/google/keytransparency/core/keyserver/readtoken_go_proto"
"github.com/google/keytransparency/core/mutator"
spb "github.com/google/keytransparency/core/sequencer/sequencer_go_proto"
)
func TestEncodeToken(t *testing.T) {
@@ -43,7 +42,7 @@ func TestEncodeToken(t *testing.T) {
}
func TestTokenEncodeDecode(t *testing.T) {
rt1 := &rtpb.ReadToken{ShardId: 2, LowWatermark: 5}
rt1 := &rtpb.ReadToken{SliceIndex: 2, LowWatermark: 5}
rt1Token, err := EncodeToken(rt1)
if err != nil {
t.Fatalf("EncodeToken(%v): %v", rt1, err)
@@ -70,17 +69,17 @@ func TestTokenEncodeDecode(t *testing.T) {
func TestFirst(t *testing.T) {
for _, tc := range []struct {
s SourceMap
s SourceList
want *rtpb.ReadToken
}{
{
s: SourceMap{
2: &spb.MapMetadata_SourceSlice{LowestWatermark: 1, HighestWatermark: 10},
3: &spb.MapMetadata_SourceSlice{LowestWatermark: 10, HighestWatermark: 20},
s: SourceList{
{LogId: 2, LowestWatermark: 1, HighestWatermark: 10},
{LogId: 3, LowestWatermark: 10, HighestWatermark: 20},
},
want: &rtpb.ReadToken{ShardId: 2, LowWatermark: 1},
want: &rtpb.ReadToken{SliceIndex: 0, LowWatermark: 1},
},
{s: SourceMap{}, want: &rtpb.ReadToken{}},
{s: SourceList{}, want: &rtpb.ReadToken{}},
} {
if got := tc.s.First(); !proto.Equal(got, tc.want) {
t.Errorf("First(): %v, want %v", got, tc.want)
@@ -89,12 +88,12 @@ func TestFirst(t *testing.T) {
}
func TestNext(t *testing.T) {
a := SourceMap{
2: &spb.MapMetadata_SourceSlice{LowestWatermark: 1, HighestWatermark: 10},
3: &spb.MapMetadata_SourceSlice{LowestWatermark: 10, HighestWatermark: 20},
a := SourceList{
{LogId: 2, LowestWatermark: 1, HighestWatermark: 10},
{LogId: 3, LowestWatermark: 10, HighestWatermark: 20},
}
for _, tc := range []struct {
s SourceMap
s SourceList
desc string
rt *rtpb.ReadToken
lastRow *mutator.LogMessage
@@ -103,28 +102,28 @@ func TestNext(t *testing.T) {
{
desc: "first page",
s: a,
rt: &rtpb.ReadToken{ShardId: 2, LowWatermark: 1},
rt: &rtpb.ReadToken{SliceIndex: 0, LowWatermark: 1},
lastRow: &mutator.LogMessage{ID: 6},
want: &rtpb.ReadToken{ShardId: 2, LowWatermark: 6},
want: &rtpb.ReadToken{SliceIndex: 0, LowWatermark: 6},
},
{
desc: "next source",
s: a,
rt: &rtpb.ReadToken{ShardId: 2, LowWatermark: 1},
rt: &rtpb.ReadToken{SliceIndex: 0, LowWatermark: 1},
lastRow: nil,
want: &rtpb.ReadToken{ShardId: 3, LowWatermark: 10},
want: &rtpb.ReadToken{SliceIndex: 1, LowWatermark: 10},
},
{
desc: "last page",
s: a,
rt: &rtpb.ReadToken{ShardId: 3, LowWatermark: 1},
rt: &rtpb.ReadToken{SliceIndex: 1, LowWatermark: 1},
lastRow: nil,
want: &rtpb.ReadToken{},
},
{
desc: "empty",
s: SourceMap{},
rt: &rtpb.ReadToken{ShardId: 3, LowWatermark: 1},
s: SourceList{},
rt: &rtpb.ReadToken{SliceIndex: 1, LowWatermark: 1},
lastRow: nil,
want: &rtpb.ReadToken{},
},
@@ -22,8 +22,8 @@ import "v1/keytransparency.proto";
// ReadToken can be serialized and handed to users for pagination.
message ReadToken {
// shard_id identifies the source for reading.
int64 shard_id = 1;
// slice_index identifies the source for reading.
int64 slice_index = 1;
// low_watemark identifies the lowest (exclusive) row to return.
int64 low_watermark = 2;
}

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -133,17 +133,18 @@ func (s *Server) ListMutations(ctx context.Context, in *pb.ListMutationsRequest)
if err != nil {
return nil, status.Errorf(codes.Internal, "ReadBatch(%v, %v): %v", in.DirectoryId, in.Revision, err)
}
rt, err := SourceMap(meta.Sources).ParseToken(in.PageToken)
rt, err := SourceList(meta.Sources).ParseToken(in.PageToken)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Failed parsing page_token: %v: %v", in.PageToken, err)
}
// Read PageSize + 1 messages from the log to see if there is another page.
high := meta.Sources[rt.ShardId].HighestWatermark
msgs, err := s.logs.ReadLog(ctx, d.DirectoryID, rt.ShardId, rt.LowWatermark, high, in.PageSize+1)
high := meta.Sources[rt.SliceIndex].HighestWatermark
logID := meta.Sources[rt.SliceIndex].LogId
msgs, err := s.logs.ReadLog(ctx, d.DirectoryID, logID, rt.LowWatermark, high, in.PageSize+1)
if err != nil {
glog.Errorf("ListMutations(): ReadLog(%v, log: %v/(%v, %v], batchSize: %v): %v",
d.DirectoryID, rt.ShardId, rt.LowWatermark, high, in.PageSize, err)
d.DirectoryID, logID, rt.LowWatermark, high, in.PageSize, err)
return nil, status.Error(codes.Internal, "Reading mutations range failed")
}
moreInLogID := len(msgs) == int(in.PageSize+1)
@@ -172,7 +173,7 @@ func (s *Server) ListMutations(ctx context.Context, in *pb.ListMutationsRequest)
for i, p := range proofs {
mutations[i].LeafProof = p
}
nextToken, err := EncodeToken(SourceMap(meta.Sources).Next(rt, lastRow))
nextToken, err := EncodeToken(SourceList(meta.Sources).Next(rt, lastRow))
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed creating next token: %v", err)
Oops, something went wrong.

0 comments on commit a85af34

Please sign in to comment.