Skip to content

Commit d981f47

Browse files
authored
return error if the vlog writes exceeds more that 4GB. (#1400)
1 parent 7f4e4b5 commit d981f47

File tree

2 files changed

+107
-0
lines changed

2 files changed

+107
-0
lines changed

value.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ import (
4545
"golang.org/x/net/trace"
4646
)
4747

48+
// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
49+
// uint32, so limiting at max uint32.
50+
var maxVlogFileSize = math.MaxUint32
51+
4852
// Values have their first byte being byteData or byteDelete. This helps us distinguish between
4953
// a key that has never been seen and a key that has been explicitly deleted.
5054
const (
@@ -1364,11 +1368,52 @@ func (vlog *valueLog) woffset() uint32 {
13641368
return atomic.LoadUint32(&vlog.writableLogOffset)
13651369
}
13661370

1371+
// validateWrites will check whether the given requests can fit into 4GB vlog file.
1372+
// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
1373+
// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
1374+
func (vlog *valueLog) validateWrites(reqs []*request) error {
1375+
vlogOffset := uint64(vlog.woffset())
1376+
for _, req := range reqs {
1377+
// calculate size of the request.
1378+
size := estimateRequestSize(req)
1379+
estimatedVlogOffset := vlogOffset + size
1380+
if estimatedVlogOffset > uint64(maxVlogFileSize) {
1381+
return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
1382+
estimatedVlogOffset, maxVlogFileSize)
1383+
}
1384+
1385+
if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
1386+
// We'll create a new vlog file if the estimated offset is greater or equal to
1387+
// max vlog size. So, resetting the vlogOffset.
1388+
vlogOffset = 0
1389+
continue
1390+
}
1391+
// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
1392+
vlogOffset = estimatedVlogOffset
1393+
}
1394+
return nil
1395+
}
1396+
1397+
// estimateRequestSize returns the size that needed to be written for the given request.
1398+
func estimateRequestSize(req *request) uint64 {
1399+
size := uint64(0)
1400+
for _, e := range req.Entries {
1401+
size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
1402+
}
1403+
return size
1404+
}
1405+
13671406
// write is thread-unsafe by design and should not be called concurrently.
13681407
func (vlog *valueLog) write(reqs []*request) error {
13691408
if vlog.db.opt.InMemory {
13701409
return nil
13711410
}
1411+
// Validate writes before writing to vlog. Because, we don't want to partially write and return
1412+
// an error.
1413+
if err := vlog.validateWrites(reqs); err != nil {
1414+
return err
1415+
}
1416+
13721417
vlog.filesLock.RLock()
13731418
maxFid := vlog.maxFid
13741419
curlf := vlog.filesMap[maxFid]

value_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"io/ioutil"
24+
"math"
2425
"math/rand"
2526
"os"
2627
"reflect"
@@ -1237,3 +1238,64 @@ func TestValueEntryChecksum(t *testing.T) {
12371238
require.NoError(t, db.Close())
12381239
})
12391240
}
1241+
1242+
func TestValidateWrite(t *testing.T) {
1243+
// Mocking the file size, so that we don't allocate big memory while running test.
1244+
maxVlogFileSize = 400
1245+
defer func() {
1246+
maxVlogFileSize = math.MaxUint32
1247+
}()
1248+
1249+
bigBuf := make([]byte, maxVlogFileSize+1)
1250+
log := &valueLog{
1251+
opt: DefaultOptions("."),
1252+
}
1253+
1254+
// Sending a request with big values which will overflow uint32.
1255+
key := []byte("HelloKey")
1256+
req := &request{
1257+
Entries: []*Entry{
1258+
{
1259+
Key: key,
1260+
Value: bigBuf,
1261+
},
1262+
{
1263+
Key: key,
1264+
Value: bigBuf,
1265+
},
1266+
{
1267+
Key: key,
1268+
Value: bigBuf,
1269+
},
1270+
},
1271+
}
1272+
1273+
err := log.validateWrites([]*request{req})
1274+
require.Error(t, err)
1275+
1276+
// Testing with small values.
1277+
smallBuf := make([]byte, 4)
1278+
req1 := &request{
1279+
Entries: []*Entry{
1280+
{
1281+
Key: key,
1282+
Value: smallBuf,
1283+
},
1284+
{
1285+
Key: key,
1286+
Value: smallBuf,
1287+
},
1288+
{
1289+
Key: key,
1290+
Value: smallBuf,
1291+
},
1292+
},
1293+
}
1294+
1295+
err = log.validateWrites([]*request{req1})
1296+
require.NoError(t, err)
1297+
1298+
// Batching small and big request.
1299+
err = log.validateWrites([]*request{req1, req})
1300+
require.Error(t, err)
1301+
}

0 commit comments

Comments
 (0)