Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

Commit

Permalink
Use a real Trillian Log in integration tests! (#918)
Browse files Browse the repository at this point in the history
* Use a real log env

* Don't request proofs for things that haven't been sequenced yet

* CreateDomain: Wait for first MapHead to be sequenced.

The log's SignedLogRoot isn't always immediately available.
Retry a few times before giving up.

* Add ctx to checkProfile

* fix ListHistory setup timing bug

Make WaitForUserUpdate wait for multiple epochs

* Comments and nits

* Fix fakes
  • Loading branch information
gdbelvin committed Feb 16, 2018
1 parent 5dc53e8 commit 3c1a648
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 66 deletions.
42 changes: 22 additions & 20 deletions core/adminserver/admin_server.go
Expand Up @@ -174,11 +174,11 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
// Generate VRF key.
wrapped, err := s.keygen(ctx, vrfKeySpec)
if err != nil {
return nil, fmt.Errorf("keygen: %v", err)
return nil, fmt.Errorf("adminserver: keygen(): %v", err)
}
vrfPriv, err := p256.NewFromWrappedKey(ctx, wrapped)
if err != nil {
return nil, fmt.Errorf("NewFromWrappedKey(): %v", err)
return nil, fmt.Errorf("adminserver: NewFromWrappedKey(): %v", err)
}
vrfPublicPB, err := der.ToPublicProto(vrfPriv.Public())
if err != nil {
Expand All @@ -190,26 +190,26 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
logTreeArgs.Tree.Description = fmt.Sprintf("KT domain %s's SMH Log", in.GetDomainId())
logTree, err := client.CreateAndInitTree(ctx, &logTreeArgs, s.logAdmin, s.tmap, s.tlog)
if err != nil {
return nil, fmt.Errorf("CreateTree(log): %v", err)
return nil, fmt.Errorf("adminserver: CreateTree(log): %v", err)
}
mapTreeArgs := *mapArgs
mapTreeArgs.Tree.Description = fmt.Sprintf("KT domain %s's Map", in.GetDomainId())
mapTree, err := client.CreateAndInitTree(ctx, &mapTreeArgs, s.mapAdmin, s.tmap, s.tlog)
if err != nil {
return nil, fmt.Errorf("CreateAndInitTree(map): %v", err)
return nil, fmt.Errorf("adminserver: CreateAndInitTree(map): %v", err)
}
minInterval, err := ptypes.Duration(in.MinInterval)
if err != nil {
return nil, fmt.Errorf("Duration(%v): %v", in.MinInterval, err)
return nil, fmt.Errorf("adminserver: Duration(%v): %v", in.MinInterval, err)
}
maxInterval, err := ptypes.Duration(in.MaxInterval)
if err != nil {
return nil, fmt.Errorf("Duration(%v): %v", in.MaxInterval, err)
return nil, fmt.Errorf("adminserver: Duration(%v): %v", in.MaxInterval, err)
}

// Initialize log with first map root.
if err := s.initialize(ctx, logTree, mapTree); err != nil {
return nil, fmt.Errorf("initialize of log %v and map %v failed: %v",
return nil, fmt.Errorf("adminserver: initialize of log %v and map %v failed: %v",
logTree.TreeId, mapTree.TreeId, err)
}

Expand All @@ -222,7 +222,7 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
MinInterval: minInterval,
MaxInterval: maxInterval,
}); err != nil {
return nil, fmt.Errorf("adminstorage.Write(): %v", err)
return nil, fmt.Errorf("adminserver: domains.Write(): %v", err)
}
glog.Infof("Created domain %v", in.GetDomainId())
return &pb.Domain{
Expand All @@ -240,20 +240,27 @@ func (s *Server) initialize(ctx context.Context, logTree, mapTree *tpb.Tree) err
logID := logTree.GetTreeId()
mapID := mapTree.GetTreeId()

logRoot, err := s.tlog.GetLatestSignedLogRoot(ctx,
&tpb.GetLatestSignedLogRootRequest{LogId: logID})
logClient, err := client.NewFromTree(s.tlog, logTree)
if err != nil {
return fmt.Errorf("adminserver: could not create log client: %v", err)
}

// Wait for the latest log root to become available.
logRoot, err := logClient.UpdateRoot(ctx)
if err != nil {
return fmt.Errorf("GetLatestSignedLogRoot(%v): %v", logID, err)
return fmt.Errorf("adminserver: UpdateRoot(): %v", err)
}

// TODO(gbelvin): does this need to be in a retry loop?
mapRoot, err := s.tmap.GetSignedMapRoot(ctx,
&tpb.GetSignedMapRootRequest{MapId: mapID})
if err != nil {
return fmt.Errorf("GetSignedMapRoot(%v): %v", mapID, err)
return fmt.Errorf("adminserver: GetSignedMapRoot(%v): %v", mapID, err)
}

// If the tree is empty and the map is empty,
// add the empty map root to the log.
if logRoot.GetSignedLogRoot().GetTreeSize() != 0 ||
if logRoot.GetTreeSize() != 0 ||
mapRoot.GetMapRoot().GetMapRevision() != 0 {
return nil // Init not needed.
}
Expand All @@ -265,13 +272,8 @@ func (s *Server) initialize(ctx context.Context, logTree, mapTree *tpb.Tree) err
return err
}

logClient, err := client.NewFromTree(s.tlog, logTree)
if err != nil {
return fmt.Errorf("could not create log client: %v", err)
}
if err := logClient.QueueLeaf(ctx, smrJSON); err != nil {
return fmt.Errorf("trillianLog.QueueLeaf(logID: %v, leaf: %v): %v",
logID, smrJSON, err)
if err := logClient.AddLeaf(ctx, smrJSON); err != nil {
return fmt.Errorf("adminserver: log.AddLeaf(): %v", err)
}
return nil
}
Expand Down
11 changes: 9 additions & 2 deletions core/adminserver/admin_server_test.go
Expand Up @@ -50,9 +50,16 @@ func TestCreateRead(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create trillian map server: %v", err)
}
tlog := fake.NewTrillianLogClient()

svr := New(tlog, mapEnv.Map, mapEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
// Log server
numSequencers := 1
unused := ""
logEnv, err := integration.NewLogEnv(ctx, numSequencers, unused)
if err != nil {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down
56 changes: 41 additions & 15 deletions core/client/client.go
Expand Up @@ -218,7 +218,7 @@ func (c *Client) Update(ctx context.Context, u *tpb.User, signers []signatures.S
}

// 3. Wait for update.
m, err = c.WaitForUserUpdate(ctx, m)
m, err = c.waitOnceForUserUpdate(ctx, m)
for {
switch {
case err == ErrWait:
Expand All @@ -234,7 +234,7 @@ func (c *Client) Update(ctx context.Context, u *tpb.User, signers []signatures.S
default:
return m, err
}
m, err = c.WaitForUserUpdate(ctx, m)
m, err = c.waitOnceForUserUpdate(ctx, m)
}
}

Expand Down Expand Up @@ -284,19 +284,35 @@ func (c *Client) newMutation(ctx context.Context, u *tpb.User) (*entry.Mutation,
return mutation, nil
}

// WaitForUserUpdate waits for the STH to be updated, indicating the next epoch has been created,
// WaitForUserUpdate waits for the mutation to be applied or the context to timeout or cancel.
func (c *Client) WaitForUserUpdate(ctx context.Context, m *entry.Mutation) (*entry.Mutation, error) {
for {
m, err := c.waitOnceForUserUpdate(ctx, m)
switch {
case err == ErrWait:
// Try again.
case status.Code(err) == codes.DeadlineExceeded:
// Sometimes the timeout occurs during an rpc.
// Convert to a standard context.DeadlineExceeded for consistent error handling.
return m, context.DeadlineExceeded
default:
return m, err
}
}
}

// waitOnceForUserUpdate waits for the STH to be updated, indicating the next epoch has been created,
// it then queries the current value for the user and checks it against the requested mutation.
// If the current value has not changed, WaitForUpdate returns ErrWait.
// If the current value has changed, but does not match the requested mutation,
// WaitForUpdate returns a new mutation, built with the current value and ErrRetry.
// If the current value matches the request, no mutation and no error are returned.
func (c *Client) WaitForUserUpdate(ctx context.Context, m *entry.Mutation) (*entry.Mutation, error) {
func (c *Client) waitOnceForUserUpdate(ctx context.Context, m *entry.Mutation) (*entry.Mutation, error) {
if m == nil {
return nil, fmt.Errorf("nil mutation")
}
sth := &c.trusted
// Wait for STH to change.
if err := c.WaitForSTHUpdate(ctx, sth); err != nil {
if err := c.WaitForSTHUpdate(ctx, c.trusted.TreeSize+1); err != nil {
return m, err
}

Expand Down Expand Up @@ -333,9 +349,20 @@ func (c *Client) WaitForUserUpdate(ctx context.Context, m *entry.Mutation) (*ent
}
}

// sthForRevision returns the minimum STH.TreeSize that will contain the map revision.
// Map revision N is stored at Log index N, the minimum TreeSize will be N+1.
func sthForRevision(revision int64) int64 {
return revision + 1
}

// WaitForRevision waits until a given map revision is available.
func (c *Client) WaitForRevision(ctx context.Context, revision int64) error {
return c.WaitForSTHUpdate(ctx, sthForRevision(revision))
}

// WaitForSTHUpdate blocks until the log root reported by the server has moved
// beyond sth or times out.
func (c *Client) WaitForSTHUpdate(ctx context.Context, sth *trillian.SignedLogRoot) error {
// to at least treeSize or times out.
func (c *Client) WaitForSTHUpdate(ctx context.Context, treeSize int64) error {
b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 10 * time.Second,
Expand All @@ -347,18 +374,17 @@ func (c *Client) WaitForSTHUpdate(ctx context.Context, sth *trillian.SignedLogRo
select {
case <-time.After(b.Duration()):
resp, err := c.cli.GetLatestEpoch(ctx, &pb.GetLatestEpochRequest{
DomainId: c.domainID,
FirstTreeSize: sth.TreeSize,
DomainId: c.domainID,
})
if err != nil {
return err
}
if resp.GetLogRoot().TreeSize <= sth.TreeSize {
// The LogRoot is not updated yet.
// Wait some more.
continue
if resp.GetLogRoot().TreeSize >= treeSize {
return nil // We're done!
}
return nil // We're done!
// The LogRoot is not updated yet.
// Wait some more.
continue

case <-ctx.Done():
return ctx.Err()
Expand Down
2 changes: 2 additions & 0 deletions core/client/get_and_verify.go
Expand Up @@ -17,6 +17,7 @@ package client
import (
"context"

"github.com/golang/glog"
pb "github.com/google/keytransparency/core/api/v1/keytransparency_proto"
)

Expand All @@ -36,6 +37,7 @@ func (c *Client) VerifiedGetEntry(ctx context.Context, appID, userID string) (*p
return nil, err
}
c.trusted = *e.GetLogRoot()
glog.Infof("VerifiedGetEntry: Trusted root updated to TreeSize %v", c.trusted.TreeSize)
Vlog.Printf("✓ Log root updated.")

return e, nil
Expand Down
10 changes: 10 additions & 0 deletions core/fake/trillian_log_client.go
Expand Up @@ -101,3 +101,13 @@ func (l *LogServer) InitLog(ctx context.Context, in *tpb.InitLogRequest, opts ..
l.TreeSize = 0
return &tpb.InitLogResponse{}, nil
}

// AddSequencedLeaf adds a single leaf with an assigned sequence number.
func (l *LogServer) AddSequencedLeaf(ctx context.Context, in *tpb.AddSequencedLeafRequest, opts ...grpc.CallOption) (*tpb.AddSequencedLeafResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "")
}

// AddSequencedLeaves Adds a batch of leaves with assigned sequence numbers to the tree.
func (l *LogServer) AddSequencedLeaves(ctx context.Context, in *tpb.AddSequencedLeavesRequest, opts ...grpc.CallOption) (*tpb.AddSequencedLeavesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "")
}
30 changes: 18 additions & 12 deletions core/integration/client_tests.go
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/google/keytransparency/core/authentication"
"github.com/google/keytransparency/core/client"
"github.com/google/keytransparency/core/crypto/dev"
"github.com/google/keytransparency/core/crypto/signatures"
"github.com/google/keytransparency/core/crypto/signatures/factory"
Expand Down Expand Up @@ -181,7 +180,7 @@ func TestEmptyGetAndUpdate(ctx context.Context, env *Env, t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
// Check profile.
if err := env.checkProfile(tc.userID, appID, tc.want); err != nil {
if err := env.checkProfile(ctx, tc.userID, appID, tc.want); err != nil {
t.Errorf("checkProfile(%v, %v) failed: %v", tc.userID, tc.want, err)
}
// Update profile.
Expand All @@ -201,7 +200,7 @@ func TestEmptyGetAndUpdate(ctx context.Context, env *Env, t *testing.T) {
}
env.Receiver.Flush(tc.ctx)
if _, err := env.Client.WaitForUserUpdate(tc.ctx, m); err != nil {
t.Errorf("Retry(%v): %v, want nil", m, err)
t.Errorf("WaitForUserUpdate(%v): %v, want nil", m, err)
}
}
})
Expand All @@ -210,8 +209,8 @@ func TestEmptyGetAndUpdate(ctx context.Context, env *Env, t *testing.T) {

// checkProfile ensures that the returned profile is as expected along with the
// keys it carries.
func (e *Env) checkProfile(userID, appID string, want bool) error {
profile, _, err := e.Client.GetEntry(context.Background(), userID, appID)
func (e *Env) checkProfile(ctx context.Context, userID, appID string, want bool) error {
profile, _, err := e.Client.GetEntry(ctx, userID, appID)
if err != nil {
return fmt.Errorf("GetEntry(%v): %v, want nil", userID, err)
}
Expand Down Expand Up @@ -265,10 +264,10 @@ func TestUpdateValidation(ctx context.Context, env *Env, t *testing.T) {
}
env.Receiver.Flush(tc.ctx)
if _, err := env.Client.WaitForUserUpdate(tc.ctx, m); err != nil {
t.Errorf("Retry(%v): %v, want nil", m, err)
t.Errorf("WaitForUserUpdate(): %v, want nil", err)
}
} else {
if got, want := err, client.ErrWait; got == want {
if got, want := err, context.DeadlineExceeded; got == want {
t.Fatalf("Update(%v): %v, don't want %v", tc.userID, got, want)
}
}
Expand Down Expand Up @@ -329,9 +328,9 @@ func (e *Env) setupHistory(ctx context.Context, domain *pb.Domain, userID string
// that the user is submitting. The user profile history contains the
// following profiles:
// Profile Value: err nil 1 2 2 2 3 3 4 5 5 5 5 5 5 6 6 5 7 7
// Map Revision: 0 1 2 3 4 5 6 7 8 9 10 ...
// Log Max Index: 0 1 2 3 4 5 6 7 8 9 10 ...
// Log TreeSize: 0 1 2 3 4 5 6 7 8 9 10 ...
// Map Revision: 1 2 3 4 5 6 7 8 9 10 ...
// Log Max Index: 1 2 3 4 5 6 7 8 9 10 ...
// Log TreeSize: 2 3 4 5 6 7 8 9 10 11 ...
// Note that profile 5 is submitted twice by the user to test that
// filtering case.
for i, p := range [][]byte{
Expand All @@ -350,11 +349,18 @@ func (e *Env) setupHistory(ctx context.Context, domain *pb.Domain, userID string
cctx, cancel := context.WithTimeout(ctx, updateTimeout)
defer cancel()
// The first update response is always a retry.
if _, err := e.Client.Update(cctx, u, signers); err != context.DeadlineExceeded {
m, err := e.Client.Update(cctx, u, signers)
if err != context.DeadlineExceeded {
return fmt.Errorf("Update(%v, %v): %v, want %v", userID, i, err, context.DeadlineExceeded)
}
e.Receiver.Flush(ctx)
if _, err := e.Client.WaitForUserUpdate(ctx, m); err != nil {
return fmt.Errorf("WaitForUserUpdate(%v): %v, want nil", m, err)
}
} else {
// Create an empty epoch.
e.Receiver.Flush(ctx)
}
e.Receiver.Flush(ctx)
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions core/integration/monitor_tests.go
Expand Up @@ -119,6 +119,9 @@ func TestMonitor(ctx context.Context, env *Env, t *testing.T) {
}

env.Receiver.Flush(ctx)
if err := env.Client.WaitForRevision(ctx, tc.queryEpoch); err != nil {
t.Fatalf("WaitForRevision(): %v", err)
}

domainID := env.Domain.DomainId
cctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
Expand Down
3 changes: 3 additions & 0 deletions core/keyserver/epochs.go
Expand Up @@ -180,6 +180,9 @@ func (s *Server) logProofs(ctx context.Context, d *domain.Domain, firstTreeSize

// Inclusion proof.
secondTreeSize := logRoot.GetTreeSize()
if epoch >= secondTreeSize {
return nil, status.Errorf(codes.NotFound, "keyserver: Epoch %v has not been released yet", epoch)
}
logInclusion, err := s.tlog.GetInclusionProof(ctx,
&tpb.GetInclusionProofRequest{
LogId: d.LogID,
Expand Down

0 comments on commit 3c1a648

Please sign in to comment.