Skip to content
Permalink
Browse files

Set local ID for messages in memory impl (#1410)

  • Loading branch information
pavelkalinnikov committed Dec 20, 2019
1 parent 157c75f commit 36b088b2da035048609afe89958966558b5f7a4e
Showing with 35 additions and 10 deletions.
  1. +26 −8 core/integration/storagetest/mutation_logs.go
  2. +9 −2 impl/memory/mutation_logs.go
@@ -17,6 +17,7 @@ package storagetest
import (
"context"
"fmt"
"reflect"
"testing"

"github.com/golang/protobuf/proto"
@@ -60,18 +61,28 @@ func (mutationLogsTests) TestReadLog(ctx context.Context, t *testing.T, newForTe
logID := int64(5) // Any log ID.
m, done := newForTest(ctx, t, directoryID, logID)
defer done(ctx)

type entryID struct {
logID int64
wm water.Mark
localID int64
}
ids := make([]entryID, 0, 30)

// Write ten batches.
var lastWM water.Mark
for i := byte(0); i < 10; i++ {
entry := &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: mustMarshal(t, &pb.Entry{Index: []byte{i}})}}
wm, err := m.Send(ctx, directoryID, logID, entry, entry, entry)
wm, err := m.Send(ctx, directoryID, logID, entry, entry, entry) // Send 3 entries.
if err != nil {
t.Fatalf("Send(): %v", err)
}
lastWM = wm
for local := 0; local < 3; local++ { // Save the 3 entries' IDs.
ids = append(ids, entryID{logID: logID, wm: wm, localID: int64(local)})
}
}
highWM := ids[len(ids)-1].wm.Add(1)

for i, tc := range []struct {
for _, tc := range []struct {
limit int32
want int
}{
@@ -81,13 +92,20 @@ func (mutationLogsTests) TestReadLog(ctx context.Context, t *testing.T, newForTe
{limit: 4, want: 6}, // Reading 4 items gets us into the second batch of size 3.
{limit: 100, want: 30}, // Reading all the items gets us the 30 items we wrote.
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
rows, err := m.ReadLog(ctx, directoryID, logID, water.Mark{}, lastWM.Add(1), tc.limit)
t.Run(fmt.Sprintf("%d", tc.limit), func(t *testing.T) {
rows, err := m.ReadLog(ctx, directoryID, logID, water.Mark{}, highWM, tc.limit)
if err != nil {
t.Fatalf("ReadLog(%v): %v", tc.limit, err)
t.Fatalf("ReadLog: %v", err)
}
if got := len(rows); got != tc.want {
t.Fatalf("ReadLog(%v): len: %v, want %v", tc.limit, got, tc.want)
t.Fatalf("ReadLog: len: %v, want %v", got, tc.want)
}
gotIDs := make([]entryID, 0, len(rows))
for _, row := range rows {
gotIDs = append(gotIDs, entryID{logID: row.LogID, wm: row.ID, localID: row.LocalID})
}
if want := ids[:tc.want]; !reflect.DeepEqual(gotIDs, want) {
t.Errorf("ReadLog: IDs mismatch: got %v, want %v", gotIDs, want)
}
})
}
@@ -80,8 +80,15 @@ func (m MutationLogs) Send(_ context.Context, _ string, logID int64, mutation ..

// Convert []SignedEntry into []LogMessage for storage.
msgs := make([]*mutator.LogMessage, 0, len(entries))
for _, e := range entries {
msgs = append(msgs, &mutator.LogMessage{LogID: logID, ID: wm, CreatedAt: time.Now(), Mutation: e})
for i, e := range entries {
m := &mutator.LogMessage{
LogID: logID,
ID: wm,
LocalID: int64(i),
CreatedAt: time.Now(),
Mutation: e,
}
msgs = append(msgs, m)
}
m[logID] = append(logShard, batch{wm: wm, msgs: msgs})
return wm, nil

0 comments on commit 36b088b

Please sign in to comment.
You can’t perform that action at this time.