Skip to content
Permalink
Browse files

Support and test mutation log queries at intermediate timestamps (#1380)

* Move HighWatermarks tests to integration/storagetest

* Test inter-quantum reads

* Don't return timestamps with a finer granularity than we actually have.

* New HighWatermark tests that respect interface.

Replace tests that wanted to predict the exact value of HighWatermark
with tests that rely on the behavior of ReadLog to assert that the value
returned is indeed correct.

This allows different storage layers to use their own strategies
for returning high watermarks.

It also removes the requirement that HighWatermark values themselves
be stored with nanosecond precision.
  • Loading branch information
gdbelvin committed Nov 12, 2019
1 parent 36d41a9 commit ea1b47d7bb96b2c759d220c51d54aae50ce018d0
@@ -16,6 +16,7 @@ package storagetest

import (
"context"
"fmt"
"testing"
"time"

@@ -70,7 +71,7 @@ func (mutationLogsTests) TestReadLog(ctx context.Context, t *testing.T, newForTe
}
}

for _, tc := range []struct {
for i, tc := range []struct {
limit int32
want int
}{
@@ -80,13 +81,15 @@ func (mutationLogsTests) TestReadLog(ctx context.Context, t *testing.T, newForTe
{limit: 4, want: 6}, // Reading 4 items gets us into the second batch of size 3.
{limit: 100, want: 30}, // Reading all the items gets us the 30 items we wrote.
} {
rows, err := m.ReadLog(ctx, directoryID, logID, minWatermark, time.Now(), tc.limit)
if err != nil {
t.Fatalf("ReadLog(%v): %v", tc.limit, err)
}
if got := len(rows); got != tc.want {
t.Fatalf("ReadLog(%v): len: %v, want %v", tc.limit, got, tc.want)
}
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
rows, err := m.ReadLog(ctx, directoryID, logID, minWatermark, time.Now(), tc.limit)
if err != nil {
t.Fatalf("ReadLog(%v): %v", tc.limit, err)
}
if got := len(rows); got != tc.want {
t.Fatalf("ReadLog(%v): len: %v, want %v", tc.limit, got, tc.want)
}
})
}
}

@@ -107,26 +110,31 @@ func (mutationLogsTests) TestReadLogExact(ctx context.Context, t *testing.T, new
idx = append(idx, ts)
}

for _, tc := range []struct {
low, high int
for i, tc := range []struct {
low, high time.Time
want []byte
}{
{low: 0, high: 0, want: []byte{}},
{low: 0, high: 1, want: []byte{0}},
{low: 0, high: 9, want: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8}},
{low: 1, high: 9, want: []byte{1, 2, 3, 4, 5, 6, 7, 8}},
{low: idx[0], high: idx[0], want: []byte{}},
{low: idx[0], high: idx[1], want: []byte{0}},
{low: idx[0], high: idx[9], want: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8}},
{low: idx[1], high: idx[9], want: []byte{1, 2, 3, 4, 5, 6, 7, 8}},
// Ensure that adding 1 correctly modifies the range semantics.
{low: idx[0].Add(1), high: idx[9], want: []byte{1, 2, 3, 4, 5, 6, 7, 8}},
{low: idx[0].Add(1), high: idx[9].Add(1), want: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}},
} {
rows, err := m.ReadLog(ctx, directoryID, logID, idx[tc.low], idx[tc.high], 100)
if err != nil {
t.Fatalf("ReadLog(): %v", err)
}
got := make([]byte, 0, len(rows))
for _, r := range rows {
i := r.Mutation.Entry[0]
got = append(got, i)
}
if !cmp.Equal(got, tc.want) {
t.Fatalf("ReadLog(%v,%v): got %v, want %v", tc.low, tc.high, got, tc.want)
}
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
rows, err := m.ReadLog(ctx, directoryID, logID, tc.low, tc.high, 100)
if err != nil {
t.Fatalf("ReadLog(): %v", err)
}
got := make([]byte, 0, len(rows))
for _, r := range rows {
i := r.Mutation.Entry[0]
got = append(got, i)
}
if !cmp.Equal(got, tc.want) {
t.Fatalf("ReadLog(%v,%v): got %v, want %v", tc.low, tc.high, got, tc.want)
}
})
}
}
@@ -0,0 +1,199 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagetest

import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/keytransparency/core/sequencer"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
)

// LogsReadWriter supports test's ability to write to and read from the mutation logs.
type LogsReadWriter interface {
sequencer.LogsReader
// Send submits the whole group of mutations atomically to a log.
// TODO(gbelvin): Create a batch level object to make it clear that this a batch of updates.
// Returns the timestamp that the mutation batch got written at.
// This timestamp can be used as an argument to the lower bound of ReadLog [low, high).
// To acquire a value to use for the upper bound of ReadLog [low, high), use HighWatermark.
Send(ctx context.Context, directoryID string, logID int64, mutation ...*pb.EntryUpdate) (time.Time, error)
}

// logsRWFactory returns a new database object, and a function for cleaning it up.
type logsRWFactory func(ctx context.Context, t *testing.T, dirID string, logIDs ...int64) (LogsReadWriter, func(context.Context))

// RunMutationLogsReaderTests runs all the tests against the provided storage implementation.
func RunMutationLogsReaderTests(t *testing.T, factory logsRWFactory) {
ctx := context.Background()
b := &mutationLogsReaderTests{}
type TestFunc func(ctx context.Context, t *testing.T, f logsRWFactory)
for name, f := range map[string]TestFunc{
// TODO(gbelvin): Discover test methods via reflection.
"TestHighWatermarkPreserve": b.TestHighWatermarkPreserve,
"TestHighWatermarkRead": b.TestHighWatermarkRead,
"TestHighWatermarkBatch": b.TestHighWatermarkBatch,
} {
t.Run(name, func(t *testing.T) { f(ctx, t, factory) })
}
}

type mutationLogsReaderTests struct{}

func setupWatermarks(ctx context.Context, t *testing.T, m LogsReadWriter, dirID string, logID int64, maxIndex int) ([]time.Time, []time.Time) {
t.Helper()
// Setup the test by writing 10 items to the mutation log and
// collecting the reported high water mark after each write.
sent := []time.Time{} // Timestamps that Send reported.
hwm := []time.Time{} // High water marks collected after each Send.
for i := 0; i <= maxIndex; i++ {
ts, err := m.Send(ctx, dirID, logID, &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: []byte{byte(i)}}})
if err != nil {
t.Fatalf("Send(%v): %v", logID, err)
}
count, wm, err := m.HighWatermark(ctx, dirID, logID, minWatermark, 100 /*batchSize*/)
if err != nil {
t.Fatalf("HighWatermark(): %v", err)
}
if want := int32(i) + 1; count != want {
t.Fatalf("HighWatermark(): count %v, want %v", count, want)
}
sent = append(sent, ts)
hwm = append(hwm, wm)
}
return sent, hwm
}

// Tests that query HighWatermarks with varying parameters and validate results directly.
func (mutationLogsReaderTests) TestHighWatermarkPreserve(ctx context.Context, t *testing.T, newForTest logsRWFactory) {
directoryID := "TestHighWatermarkPreserve"
logID := int64(1)
m, done := newForTest(ctx, t, directoryID, logID)
defer done(ctx)
maxIndex := 9
sent, _ := setupWatermarks(ctx, t, m, directoryID, logID, maxIndex)

arbitraryTime := time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC)
for _, tc := range []struct {
desc string
start time.Time
batch int32
want time.Time
}{
// Verify that high watermarks preserves the starting time when batch size is 0.
{desc: "batch 0", start: arbitraryTime, batch: 0, want: arbitraryTime},
// Verify that high watermarks preserves the starting time when there are no rows in the result.
{desc: "rows 0", start: sent[maxIndex].Add(time.Second), batch: 1, want: sent[maxIndex].Add(time.Second)},
} {
t.Run(tc.desc, func(t *testing.T) {
count, got, err := m.HighWatermark(ctx, directoryID, logID, tc.start, tc.batch)
if err != nil {
t.Errorf("HighWatermark(): %v", err)
}
if !got.Equal(tc.want) {
t.Errorf("HighWatermark(%v, %v) high: %v, want %v", tc.start, tc.batch, got, tc.want)
}
if count != 0 {
t.Errorf("HighWatermark(): count %v, want 0", count)
}
})
}
}

// Tests that use the watermarks defined during setup.
func (mutationLogsReaderTests) TestHighWatermarkRead(ctx context.Context, t *testing.T, newForTest logsRWFactory) {
directoryID := "TestHighWatermarkRead"
logID := int64(1)
m, done := newForTest(ctx, t, directoryID, logID)
defer done(ctx)
maxIndex := 9
_, hwm := setupWatermarks(ctx, t, m, directoryID, logID, maxIndex)
for _, tc := range []struct {
desc string
readHigh time.Time
want []byte
}{
// Verify that highwatermark can retrieve all the data written so far.
{desc: "all", readHigh: hwm[maxIndex], want: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}},
// Verify that data retrieved at highwatermark doesn't change when more data is written.
{desc: "stable", readHigh: hwm[2], want: []byte{0, 1, 2}},
} {
t.Run(tc.desc, func(t *testing.T) {
low := minWatermark
rows, err := m.ReadLog(ctx, directoryID, logID, low, tc.readHigh, 100)
if err != nil {
t.Fatalf("ReadLog(): %v", err)
}
got := make([]byte, 0, len(rows))
for _, r := range rows {
i := r.Mutation.Entry[0]
got = append(got, i)
}
if !cmp.Equal(got, tc.want) {
t.Fatalf("ReadLog(%v,%v): got %v, want %v", low, tc.readHigh, got, tc.want)
}
})
}
}

// Tests that query HighWatermarks with varying parameters and validate results using ReadLog.
func (mutationLogsReaderTests) TestHighWatermarkBatch(ctx context.Context, t *testing.T, newForTest logsRWFactory) {
directoryID := "TestHighWatermarkBatch"
logID := int64(1)
m, done := newForTest(ctx, t, directoryID, logID)
defer done(ctx)
maxIndex := 9
sent, _ := setupWatermarks(ctx, t, m, directoryID, logID, maxIndex)
for _, tc := range []struct {
desc string
start time.Time
batch int32
want []byte
}{
// Verify that limiting batch size controls the number of items returned.
{desc: "limit batch", start: sent[0], batch: 2, want: []byte{0, 1}},
// Verify that advancing start by 1 with the same batch size advances the results by one.
{desc: "start 1", start: sent[1], batch: 2, want: []byte{1, 2}},
// Verify that watermarks in between primary keys resolve correctly.
{desc: "start 0.1", start: sent[0].Add(1), batch: 2, want: []byte{1, 2}},
} {
t.Run(tc.desc, func(t *testing.T) {
count, wm, err := m.HighWatermark(ctx, directoryID, logID, tc.start, tc.batch)
if err != nil {
t.Errorf("HighWatermark(): %v", err)
}
if want := int32(len(tc.want)); count != want {
t.Errorf("HighWatermark() count: %v, want %v", count, want)
}

rows, err := m.ReadLog(ctx, directoryID, logID, tc.start, wm, tc.batch)
if err != nil {
t.Fatalf("ReadLog(): %v", err)
}
got := make([]byte, 0, len(rows))
for _, r := range rows {
i := r.Mutation.Entry[0]
got = append(got, i)
}
if !cmp.Equal(got, tc.want) {
t.Fatalf("ReadLog(%v,%v): got %v, want %v", tc.start, wm, got, tc.want)
}
})
}
}
@@ -33,3 +33,14 @@ func TestMutationLogsIntegration(t *testing.T) {
return m, func(context.Context) {}
})
}

func TestMutationLogsReaderIntegration(t *testing.T) {
storagetest.RunMutationLogsReaderTests(t,
func(ctx context.Context, t *testing.T, dirID string, logIDs ...int64) (storagetest.LogsReadWriter, func(context.Context)) {
m := NewMutationLogs()
if err := m.AddLogs(ctx, dirID, logIDs...); err != nil {
t.Fatal(err)
}
return m, func(context.Context) {}
})
}
@@ -28,6 +28,10 @@ import (
pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
)

// quantum is the fidelity of the Timestamp column.
// See https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
const quantum = time.Microsecond

// SetWritable enables or disables new writes from going to logID.
func (m *Mutations) SetWritable(ctx context.Context, directoryID string, logID int64, enabled bool) error {
result, err := m.db.ExecContext(ctx,
@@ -80,7 +84,7 @@ func (m *Mutations) Send(ctx context.Context, directoryID string, logID int64, u
}
// TODO(gbelvin): Implement retry with backoff for retryable errors if
// we get timestamp contention.
ts := time.Now()
ts := time.Now().Truncate(quantum)
if err := m.send(ctx, ts, directoryID, logID, updateData...); err != nil {
return time.Time{}, err
}
@@ -141,9 +145,6 @@ func (m *Mutations) send(ctx context.Context, ts time.Time, directoryID string,
return status.Errorf(codes.Internal, "could not find max timestamp: %v", err)
}

// The Timestamp column has a maximum fidelity of microseconds.
// See https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
ts = ts.Truncate(time.Microsecond)
if !ts.After(maxTime.Time) {
return status.Errorf(codes.Aborted,
"current timestamp: %v, want > max-timestamp of queued mutations: %v", ts, maxTime)
@@ -163,6 +164,7 @@ func (m *Mutations) send(ctx context.Context, ts time.Time, directoryID string,
// equal to batchSize items greater than start.
func (m *Mutations) HighWatermark(ctx context.Context, directoryID string, logID int64,
start time.Time, batchSize int32) (int32, time.Time, error) {
startQuery := start.Add(quantum - 1).Truncate(quantum)
var count int32
var high sql.NullTime
if err := m.db.QueryRowContext(ctx,
@@ -173,7 +175,7 @@ func (m *Mutations) HighWatermark(ctx context.Context, directoryID string, logID
ORDER BY Q.Time ASC
LIMIT ?
) AS T1`,
directoryID, logID, start, batchSize).
directoryID, logID, startQuery, batchSize).
Scan(&count, &high); err != nil {
return 0, start, err
}
@@ -188,6 +190,10 @@ func (m *Mutations) HighWatermark(ctx context.Context, directoryID string, logID
// ReadLog may return more rows than batchSize in order to fetch all the rows at a particular timestamp.
func (m *Mutations) ReadLog(ctx context.Context, directoryID string,
logID int64, low, high time.Time, batchSize int32) ([]*mutator.LogMessage, error) {
// Advance the low and high marks to the next highest quantum to preserve read semantics.
low = low.Add(quantum - 1).Truncate(quantum)
high = high.Add(quantum - 1).Truncate(quantum)

rows, err := m.db.QueryContext(ctx,
`SELECT Time, LocalID, Mutation FROM Queue
WHERE DirectoryID = ? AND LogID = ? AND Time >= ? AND Time < ?

0 comments on commit ea1b47d

Please sign in to comment.
You can’t perform that action at this time.