Skip to content
Permalink
Browse files

Queue integration test (#1364)

* MutationStorage integration test

* Separate queue admin tests from queue tests

* Fix license year

* Document MutationLogs

* Improve LogsAdmin tests

* Fix linter errs

* Nits

* nits
  • Loading branch information...
gdbelvin committed Oct 8, 2019
1 parent 7526357 commit cc70d02c264c28f939e26485001a73c65a8aa477
@@ -1,3 +1,17 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagetest

import (
@@ -0,0 +1,84 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagetest

import (
"context"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/keytransparency/core/keyserver"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
)

type MutationLogsFactory func(ctx context.Context, t *testing.T, dirID string, logIDs ...int64) keyserver.MutationLogs

// RunMutationLogsTests runs all the tests against the provided storage implementation.
func RunMutationLogsTests(t *testing.T, factory MutationLogsFactory) {
ctx := context.Background()
b := &mutationLogsTests{}
for name, f := range map[string]func(ctx context.Context, t *testing.T, f MutationLogsFactory){
// TODO(gbelvin): Discover test methods via reflection.
"TestReadLog": b.TestReadLog,
} {
t.Run(name, func(t *testing.T) { f(ctx, t, factory) })
}
}

type mutationLogsTests struct{}

func mustMarshal(t *testing.T, p proto.Message) []byte {
t.Helper()
b, err := proto.Marshal(p)
if err != nil {
t.Fatalf("proto.Marshal(): %v", err)
}
return b
}

// TestReadLog ensures that reads happen in atomic units of batch size.
func (mutationLogsTests) TestReadLog(ctx context.Context, t *testing.T, newForTest MutationLogsFactory) {
directoryID := "TestReadLog"
logID := int64(5) // Any log ID.
m := newForTest(ctx, t, directoryID, logID)
// Write ten batches, three entries each.
for i := byte(0); i < 10; i++ {
entry := &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: mustMarshal(t, &pb.Entry{Index: []byte{i}})}}
if _, err := m.Send(ctx, directoryID, entry, entry, entry); err != nil {
t.Fatalf("Send(): %v", err)
}
}

for _, tc := range []struct {
limit int32
count int
}{
{limit: 0, count: 0},
{limit: 1, count: 3}, // We asked for 1 item, which gets us into the first batch, so we return 3 items.
{limit: 3, count: 3}, // We asked for 3 items, which gets us through the first batch, so we return 3 items.
{limit: 4, count: 6}, // Reading 4 items gets us into the second batch of size 3.
{limit: 100, count: 30}, // Reading all the items gets us the 30 items we wrote.
} {
rows, err := m.ReadLog(ctx, directoryID, logID, 0, time.Now().UnixNano(), tc.limit)
if err != nil {
t.Fatalf("ReadLog(%v): %v", tc.limit, err)
}
if got, want := len(rows), tc.count; got != want {
t.Fatalf("ReadLog(%v): len: %v, want %v", tc.limit, got, want)
}
}
}
@@ -0,0 +1,95 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagetest

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/keytransparency/core/adminserver"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type LogsAdminFactory func(ctx context.Context, t *testing.T, dirID string, logIDs ...int64) adminserver.LogsAdmin

// RunLogsAdminTests runs all the admin tests against the provided storage implementation.
func RunLogsAdminTests(t *testing.T, factory LogsAdminFactory) {
ctx := context.Background()
b := &logsAdminTests{}
for name, f := range map[string]func(ctx context.Context, t *testing.T, f LogsAdminFactory){
// TODO(gbelvin): Discover test methods via reflection.
"TestSetWritable": b.TestSetWritable,
"TestListLogs": b.TestListLogs,
} {
t.Run(name, func(t *testing.T) { f(ctx, t, factory) })
}
}

type logsAdminTests struct{}

func (logsAdminTests) TestSetWritable(ctx context.Context, t *testing.T, f LogsAdminFactory) {
directoryID := "TestSetWritable"
m := f(ctx, t, directoryID, 1)
if st := status.Convert(m.SetWritable(ctx, directoryID, 2, true)); st.Code() != codes.NotFound {
t.Errorf("SetWritable(non-existent logid): %v, want %v", st, codes.NotFound)
}
}

func (logsAdminTests) TestListLogs(ctx context.Context, t *testing.T, f LogsAdminFactory) {
directoryID := "TestListLogs"
for _, tc := range []struct {
desc string
logIDs []int64
setWritable map[int64]bool // Explicitly call SetWritable with true or false for each log in this map.
wantLogIDs []int64
wantCode codes.Code
}{
{desc: "one row", logIDs: []int64{10}, wantLogIDs: []int64{10}},
{desc: "one row disabled", logIDs: []int64{10}, setWritable: map[int64]bool{10: false}, wantCode: codes.NotFound},
{desc: "one row enabled", logIDs: []int64{1, 2, 3}, setWritable: map[int64]bool{1: false, 2: false}, wantLogIDs: []int64{3}},
{desc: "multi", logIDs: []int64{1, 2, 3}, setWritable: map[int64]bool{1: true, 2: false}, wantLogIDs: []int64{1, 3}},
} {
t.Run(tc.desc, func(t *testing.T) {
m := f(ctx, t, directoryID, tc.logIDs...)
wantLogs := make(map[int64]bool)
for _, logID := range tc.wantLogIDs {
wantLogs[logID] = true
}

for logID, enabled := range tc.setWritable {
if err := m.SetWritable(ctx, directoryID, logID, enabled); err != nil {
t.Errorf("SetWritable(): %v", err)
}
}

logIDs, err := m.ListLogs(ctx, directoryID, true /* Only Writable */)
if status.Code(err) != tc.wantCode {
t.Errorf("ListLogs(): %v, want %v", err, tc.wantCode)
}
if err != nil {
return
}
logs := make(map[int64]bool)
for _, log := range logIDs {
logs[log] = true
}
if got, want := logs, wantLogs; !cmp.Equal(got, want) {
t.Errorf("ListLogs(): %v, want %v", got, want)
}
})
}
}
@@ -72,11 +72,15 @@ type WriteWatermark struct {

// MutationLogs provides sets of time ordered message logs.
type MutationLogs interface {
// Send submits an item to a random log.
// Send submits the whole group of mutations atomically to a random log.
// TODO(gbelvin): Create a batch level object to make it clear that this a batch of updates.
Send(ctx context.Context, directoryID string, mutation ...*pb.EntryUpdate) (*WriteWatermark, error)
// ReadLog returns the messages in the (low, high] range stored in the specified log.
// ReadLog returns the messages in the (low, high] range stored in the
// specified log. ReadLog always returns complete units of the original
// batches sent via Send, and will return more items than limit if
// needed to do so.
ReadLog(ctx context.Context, directoryID string, logID, low, high int64,
batchSize int32) ([]*mutator.LogMessage, error)
limit int32) ([]*mutator.LogMessage, error)
}

// BatchReader reads batch definitions.
@@ -20,77 +20,45 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/keytransparency/core/adminserver"
"github.com/google/keytransparency/core/integration/storagetest"
"github.com/google/keytransparency/core/keyserver"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
_ "github.com/mattn/go-sqlite3"
)

const directoryID = "default"

func newForTest(ctx context.Context, t testing.TB, logIDs ...int64) *Mutations {
func newForTest(ctx context.Context, t testing.TB, dirID string, logIDs ...int64) *Mutations {
m, err := New(newDB(t))
if err != nil {
t.Fatalf("Failed to create Mutations: %v", err)
t.Fatalf("Failed to create mutation storage: %v", err)
}
if err := m.AddLogs(ctx, directoryID, logIDs...); err != nil {
if err := m.AddLogs(ctx, dirID, logIDs...); err != nil {
t.Fatalf("AddLogs(): %v", err)
}
return m
}

func TestSetWritable(t *testing.T) {
ctx := context.Background()

for _, tc := range []struct {
desc string
logIDs []int64
set map[int64]bool
wantLogIDs []int64
wantCode codes.Code
}{
{desc: "one row", logIDs: []int64{10}, wantLogIDs: []int64{10}},
{desc: "one row disabled", logIDs: []int64{10}, set: map[int64]bool{10: false}, wantCode: codes.NotFound},
{desc: "one row enabled", logIDs: []int64{1, 2, 3}, set: map[int64]bool{1: false, 2: false}, wantLogIDs: []int64{3}},
{desc: "multi", logIDs: []int64{1, 2, 3}, set: map[int64]bool{1: true, 2: false}, wantLogIDs: []int64{1, 3}},
} {
t.Run(tc.desc, func(t *testing.T) {
m := newForTest(ctx, t, tc.logIDs...)
wantLogs := make(map[int64]bool)
for _, logID := range tc.wantLogIDs {
wantLogs[logID] = true
}

for logID, enabled := range tc.set {
if err := m.SetWritable(ctx, directoryID, logID, enabled); err != nil {
t.Errorf("SetWritable(): %v", err)
}
}
func TestMutationLogsIntegration(t *testing.T) {
storagetest.RunMutationLogsTests(t,
func(ctx context.Context, t *testing.T, dirID string, logIDs ...int64) keyserver.MutationLogs {
return newForTest(ctx, t, dirID, logIDs...)
})
}

logs := make(map[int64]bool)
// Run enough times to ensure that random sampling will get us all logs.
for i := 0; i < 10*len(tc.logIDs); i++ {
log, err := m.randLog(ctx, directoryID)
if status.Code(err) != tc.wantCode {
t.Errorf("randLog(): %v, want %v", err, tc.wantCode)
}
if err != nil {
break
}
logs[log] = true
}
if got, want := logs, wantLogs; !cmp.Equal(got, want) {
t.Errorf("randLog(): %v, want %v", got, want)
}
func TestLogsAdminIntegration(t *testing.T) {
storagetest.RunLogsAdminTests(t,
func(ctx context.Context, t *testing.T, dirID string, logIDs ...int64) adminserver.LogsAdmin {
return newForTest(ctx, t, dirID, logIDs...)
})
}
}

func TestRandLog(t *testing.T) {
ctx := context.Background()
directoryID := "TestRandLog"

for _, tc := range []struct {
desc string
@@ -107,7 +75,7 @@ func TestRandLog(t *testing.T) {
}},
} {
t.Run(tc.desc, func(t *testing.T) {
m := newForTest(ctx, t, tc.send...)
m := newForTest(ctx, t, directoryID, tc.send...)
logs := make(map[int64]bool)
for i := 0; i < 10*len(tc.wantLogs); i++ {
logID, err := m.randLog(ctx, directoryID)
@@ -128,8 +96,9 @@ func TestRandLog(t *testing.T) {

func BenchmarkSend(b *testing.B) {
ctx := context.Background()
directoryID := "BenchmarkSend"
logID := int64(1)
m := newForTest(ctx, b, logID)
m := newForTest(ctx, b, directoryID, logID)

update := &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: []byte("xxxxxxxxxxxxxxxxxx")}}
for _, tc := range []struct {
@@ -162,7 +131,8 @@ func BenchmarkSend(b *testing.B) {
func TestSend(t *testing.T) {
ctx := context.Background()

m := newForTest(ctx, t, 1, 2)
directoryID := "TestSend"
m := newForTest(ctx, t, directoryID, 1, 2)
update := []byte("bar")
ts1 := time.Now()
ts2 := ts1.Add(time.Duration(1))
@@ -190,8 +160,9 @@ func TestSend(t *testing.T) {

func TestWatermark(t *testing.T) {
ctx := context.Background()
directoryID := "TestWatermark"
logIDs := []int64{1, 2}
m := newForTest(ctx, t, logIDs...)
m := newForTest(ctx, t, directoryID, logIDs...)
update := []byte("bar")

startTS := time.Now()
@@ -234,42 +205,3 @@ func TestWatermark(t *testing.T) {
})
}
}

func mustMarshal(t *testing.T, p proto.Message) []byte {
t.Helper()
b, err := proto.Marshal(p)
if err != nil {
t.Fatalf("proto.Marshal(): %v", err)
}
return b
}

func TestReadLog(t *testing.T) {
ctx := context.Background()
logID := int64(5)
m := newForTest(ctx, t, logID)
for i := byte(0); i < 10; i++ {
entry := &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: mustMarshal(t, &pb.Entry{Index: []byte{i}})}}
if _, err := m.Send(ctx, directoryID, entry, entry, entry); err != nil {
t.Fatalf("Send(): %v", err)
}
}

for _, tc := range []struct {
batchSize int32
count int
}{
{batchSize: 0, count: 0},
{batchSize: 1, count: 3},
{batchSize: 4, count: 6},
{batchSize: 100, count: 30},
} {
rows, err := m.ReadLog(ctx, directoryID, logID, 0, time.Now().UnixNano(), tc.batchSize)
if err != nil {
t.Fatalf("ReadLog(%v): %v", tc.batchSize, err)
}
if got, want := len(rows), tc.count; got != want {
t.Fatalf("ReadLog(%v): len: %v, want %v", tc.batchSize, got, want)
}
}
}

0 comments on commit cc70d02

Please sign in to comment.
You can’t perform that action at this time.