Permalink
Browse files

Separate queue from mutation storage (#903)

* wip: split sequenced from unsequenced mutations

* wip: sequencer

* Split queue from mutations

* Finish sequencer changes

* Fixup keyserver and tests

* Reviewer comments
  • Loading branch information...
gdbelvin committed Jan 11, 2018
1 parent ede30a9 commit b0e58c4ddc6403881657c2514b59bff7db014803
@@ -21,12 +21,12 @@ import (
"github.com/google/keytransparency/core/crypto/vrf/p256"
"github.com/google/keytransparency/core/fake"
"github.com/google/keytransparency/core/internal"
"github.com/google/trillian"
"github.com/google/trillian/crypto/keys/pem"
"github.com/google/trillian/merkle/hashers"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
pb "github.com/google/keytransparency/core/api/v1/keytransparency_proto"
@@ -54,11 +54,14 @@ hnGbXDPbdFlL1nmayhnqyEfRdXNlpBT2U9hXcSxliKI1rHrAJFDx3ncttA==
)
func mustMetadataAsAny(t *testing.T, meta *pb.MapperMetadata) *any.Any {
m, err := internal.MetadataAsAny(meta)
if meta == nil {
meta = &pb.MapperMetadata{}
}
metaAny, err := ptypes.MarshalAny(meta)
if err != nil {
t.Fatal(err)
}
return m
return metaAny
}
// signs signs smr with s.
@@ -16,50 +16,49 @@ package fake
import (
"context"
"fmt"
pb "github.com/google/keytransparency/core/api/v1/keytransparency_proto"
"github.com/google/keytransparency/core/mutator"
)
// MutationStorage implements mutator.Mutation
type MutationStorage struct {
mtns map[string][]*pb.EntryUpdate
// mtns is a map of domains to epoch numbers to a list of mutations.
mtns map[string]map[int64][]*pb.Entry
}
// NewMutationStorage returns a fake mutator.Mutation
func NewMutationStorage() *MutationStorage {
return &MutationStorage{
mtns: make(map[string][]*pb.EntryUpdate),
mtns: make(map[string]map[int64][]*pb.Entry),
}
}
// ReadPage paginates through the list of mutations
func (m *MutationStorage) ReadPage(_ context.Context, domainID string, start, end int64, pageSize int32) (int64, []*pb.Entry, error) {
if start > int64(len(m.mtns[domainID])) {
panic("start > len(m.mtns[domainID])")
func (m *MutationStorage) ReadPage(_ context.Context, domainID string, revision, start int64, pageSize int32) (int64, []*pb.Entry, error) {
domain, ok := m.mtns[domainID]
if !ok {
return 0, nil, fmt.Errorf("DomainID %v not found", domainID)
}
// Adjust end.
if end-start > int64(pageSize) {
end = start + int64(pageSize)
mutationList, ok := domain[revision]
if !ok {
return 0, nil, fmt.Errorf("DomainID: %v, revision %v not found", domainID, revision)
}
if end > int64(len(m.mtns[domainID])) {
end = int64(len(m.mtns[domainID]))
if int(start) > len(mutationList) {
return start, nil, nil
}
entryUpdates := m.mtns[domainID][start:end]
mutations := make([]*pb.Entry, 0, len(entryUpdates))
for _, e := range entryUpdates {
mutations = append(mutations, e.Mutation)
end := int(start) + int(pageSize)
if end > len(mutationList) {
end = len(mutationList)
}
return end, mutations, nil
return int64(end), mutationList[int(start):end], nil
}
// ReadBatch is unimplemented
func (m *MutationStorage) ReadBatch(context.Context, string, int64, int32) (int64, []*mutator.QueueMessage, error) {
return 0, nil, nil
}
// Write stores a mutation
func (m *MutationStorage) Write(_ context.Context, domainID string, mutation *pb.EntryUpdate) (int64, error) {
m.mtns[domainID] = append(m.mtns[domainID], mutation)
return int64(len(m.mtns[domainID])), nil
// WriteBatch stores a set of mutations that are associated with a revision.
func (m *MutationStorage) WriteBatch(_ context.Context, domainID string, revision int64, mutations []*pb.Entry) error {
if _, ok := m.mtns[domainID]; !ok {
m.mtns[domainID] = make(map[int64][]*pb.Entry)
}
m.mtns[domainID][revision] = mutations
return nil
}

This file was deleted.

Oops, something went wrong.
View
@@ -19,8 +19,6 @@ import (
"fmt"
"strconv"
"github.com/google/keytransparency/core/internal"
"github.com/golang/glog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -78,7 +76,7 @@ func (s *Server) GetEpoch(ctx context.Context, in *pb.GetEpochRequest) (*pb.Epoc
}, nil
}
// GetEpochStream is a streaming API similar to GetMutations.
// GetEpochStream is a streaming API similar to ListMutations.
func (*Server) GetEpochStream(in *pb.GetEpochRequest, stream pb.KeyTransparency_GetEpochStreamServer) error {
return status.Errorf(codes.Unimplemented, "GetEpochStream is unimplemented")
}
@@ -96,18 +94,14 @@ func (s *Server) ListMutations(ctx context.Context, in *pb.ListMutationsRequest)
return nil, status.Errorf(codes.Internal, "Cannot fetch domain info")
}
lowestSeq, err := s.lowestSequenceNumber(ctx, domain.MapID, in.Epoch, in.PageToken)
if err != nil {
return nil, err
}
highestSeq, err := s.getHighestFullyCompletedSeq(ctx, domain.MapID, in.Epoch)
start, err := parseToken(in.PageToken)
if err != nil {
return nil, err
}
// Read mutations from the database.
maxSequence, entries, err := s.mutations.ReadPage(ctx, domain.DomainID, lowestSeq, highestSeq, in.PageSize)
max, entries, err := s.mutations.ReadPage(ctx, domain.DomainID, in.GetEpoch(), start, in.GetPageSize())
if err != nil {
glog.Errorf("mutations.ReadRange(%v, %v, %v): %v", lowestSeq, highestSeq, in.PageSize, err)
glog.Errorf("mutations.ReadRange(%v, %v, %v, %v): %v", domain.MapID, in.GetEpoch(), start, in.GetPageSize(), err)
return nil, status.Error(codes.Internal, "Reading mutations range failed")
}
indexes := make([][]byte, 0, len(entries))
@@ -126,8 +120,8 @@ func (s *Server) ListMutations(ctx context.Context, in *pb.ListMutationsRequest)
}
nextPageToken := ""
if len(mutations) == int(in.PageSize) && maxSequence != highestSeq {
nextPageToken = fmt.Sprintf("%d", maxSequence)
if len(mutations) == int(in.PageSize) {
nextPageToken = fmt.Sprintf("%d", max+1)
}
return &pb.ListMutationsResponse{
Mutations: mutations,
@@ -184,46 +178,9 @@ func (s *Server) logProofs(ctx context.Context, logID, firstTreeSize int64, epoc
return logRoot, logConsistency, logInclusion, nil
}
// lowestSequenceNumber picks a lower bound on sequence number.
// It returns the max between token and the high water mark of the previous epoch.
func (s *Server) lowestSequenceNumber(ctx context.Context, mapID, epoch int64, token string) (int64, error) {
// Pick starting location from token or the last epoch.
lastSeq, err := s.getHighestFullyCompletedSeq(ctx, mapID, epoch-1)
if err != nil {
return 0, err
}
tokenSeq, err := s.parseToken(token)
if err != nil {
return 0, err
}
return max(lastSeq, tokenSeq), nil
}
// getHighestFullyCompletedSeq fetches the map root at epoch and returns
// the highest fully completed sequence number.
func (s *Server) getHighestFullyCompletedSeq(ctx context.Context, mapID, epoch int64) (int64, error) {
// Get signed map root by revision.
resp, err := s.tmap.GetSignedMapRootByRevision(ctx, &tpb.GetSignedMapRootByRevisionRequest{
MapId: mapID,
Revision: epoch,
})
if err != nil {
glog.Errorf("GetSignedMapRootByRevision(%v, %v): %v", mapID, epoch, err)
return 0, status.Error(codes.Internal, "Get signed map root failed")
}
// Get highest and lowest sequence number.
meta, err := internal.MetadataFromMapRoot(resp.GetMapRoot())
if err != nil {
return 0, status.Error(codes.Internal, err.Error())
}
return meta.GetHighestFullyCompletedSeq(), nil
}
// parseToken returns the sequence number in token.
// If token is unset, return 0.
func (s *Server) parseToken(token string) (int64, error) {
func parseToken(token string) (int64, error) {
if token == "" {
return 0, nil
}
@@ -235,13 +192,6 @@ func (s *Server) parseToken(token string) (int64, error) {
return seq, nil
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
func (s *Server) inclusionProofs(ctx context.Context, domainID string, indexes [][]byte, epoch int64) ([]*tpb.MapLeafInclusion, error) {
// Lookup log and map info.
domain, err := s.domains.Read(ctx, domainID, false)
Oops, something went wrong.

0 comments on commit b0e58c4

Please sign in to comment.