Skip to content

Commit

Permalink
Set datakey for first table in stream writer (#1054)
Browse files Browse the repository at this point in the history
If encryption is enabled on a DB and stream writer is used then the
first file SST would not be encrypted, only the SSTs after the first one
would be encrypted. This commit fixes it.
  • Loading branch information
Ibrahim Jarif committed Sep 30, 2019
1 parent a425b0e commit e3b5652
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
35 changes: 24 additions & 11 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,15 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
return err
}

for streamId, req := range streamReqs {
writer, ok := sw.writers[streamId]
for streamID, req := range streamReqs {
writer, ok := sw.writers[streamID]
if !ok {
writer = sw.newWriter(streamId)
sw.writers[streamId] = writer
var err error
writer, err = sw.newWriter(streamID)
if err != nil {
return errors.Wrapf(err, "failed to create writer with ID %d", streamID)
}
sw.writers[streamID] = writer
}
writer.reqCh <- req
}
Expand Down Expand Up @@ -157,7 +161,10 @@ func (sw *StreamWriter) Flush() error {

// Encode and write the value log head into a new table.
data := maxHead.Encode()
headWriter := sw.newWriter(headStreamId)
headWriter, err := sw.newWriter(headStreamId)
if err != nil {
return errors.Wrap(err, "failed to create head writer")
}
if err := headWriter.Add(
y.KeyWithTs(head, sw.maxVersion),
y.ValueStruct{Value: data}); err != nil {
Expand Down Expand Up @@ -201,26 +208,32 @@ type sortedWriter struct {

builder *table.Builder
lastKey []byte
streamId uint32
streamID uint32
reqCh chan *request
head valuePointer
}

func (sw *StreamWriter) newWriter(streamId uint32) *sortedWriter {
func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
dk, err := sw.db.registry.latestDataKey()
if err != nil {
return nil, err
}

bopts := table.Options{
BlockSize: sw.db.opt.BlockSize,
BloomFalsePositive: sw.db.opt.BloomFalsePositive,
DataKey: dk,
}
w := &sortedWriter{
db: sw.db,
streamId: streamId,
streamID: streamID,
throttle: sw.throttle,
builder: table.NewTableBuilder(bopts),
reqCh: make(chan *request, 3),
}
sw.closer.AddRunning(1)
go w.handleRequests(sw.closer)
return w
return w, nil
}

// ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add.
Expand Down Expand Up @@ -364,7 +377,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
// better than that.
lhandler = lc.levels[len(lc.levels)-1]
}
if w.streamId == headStreamId {
if w.streamID == headStreamId {
// This is a special !badger!head key. We should store it at level 0, separate from all the
// other keys to avoid an overlap.
lhandler = lc.levels[0]
Expand All @@ -384,6 +397,6 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
// Release the ref held by OpenTable.
_ = tbl.DecrRef()
w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
fileID, lhandler.level, w.streamId, humanize.Bytes(uint64(tbl.Size())))
fileID, lhandler.level, w.streamID, humanize.Bytes(uint64(tbl.Size())))
return nil
}
1 change: 0 additions & 1 deletion stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func TestStreamWriter5(t *testing.T) {
// if those are going to same table.
func TestStreamWriter6(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
fmt.Println(db.opt.Dir)
list := &pb.KVList{}
str := []string{"a", "a", "b", "b", "c", "c"}
ver := 1
Expand Down

0 comments on commit e3b5652

Please sign in to comment.