@@ -493,7 +493,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32,
493493}
494494
495495func (vlog * valueLog ) rewrite (f * logFile , tr trace.Trace ) error {
496- maxFid := atomic .LoadUint32 (& vlog .maxFid )
496+ vlog .filesLock .RLock ()
497+ maxFid := vlog .maxFid
498+ vlog .filesLock .RUnlock ()
497499 y .AssertTruef (uint32 (f .fid ) < maxFid , "fid to move: %d. Current max fid: %d" , f .fid , maxFid )
498500 tr .LazyPrintf ("Rewriting fid: %d" , f .fid )
499501
@@ -808,10 +810,9 @@ func (vlog *valueLog) dropAll() (int, error) {
808810 }
809811
810812 vlog .db .opt .Infof ("Value logs deleted. Creating value log file: 0" )
811- if _ , err := vlog .createVlogFile (0 ); err != nil {
813+ if _ , err := vlog .createVlogFile (0 ); err != nil { // Called while writes are stopped.
812814 return count , err
813815 }
814- atomic .StoreUint32 (& vlog .maxFid , 0 )
815816 return count , nil
816817}
817818
@@ -832,12 +833,12 @@ type valueLog struct {
832833 // guards our view of which files exist, which to be deleted, how many active iterators
833834 filesLock sync.RWMutex
834835 filesMap map [uint32 ]* logFile
836+ maxFid uint32
835837 filesToBeDeleted []uint32
836838 // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
837839 numActiveIterators int32
838840
839841 db * DB
840- maxFid uint32 // accessed via atomics.
841842 writableLogOffset uint32 // read by read, written by write. Must access via atomics.
842843 numEntriesWritten uint32
843844 opt Options
@@ -997,14 +998,15 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) {
997998 if err = lf .mmap (2 * vlog .opt .ValueLogFileSize ); err != nil {
998999 return nil , errFile (err , lf .path , "Mmap value log file" )
9991000 }
1001+
1002+ vlog .filesLock .Lock ()
1003+ vlog .filesMap [fid ] = lf
1004+ vlog .maxFid = fid
10001005 // writableLogOffset is only written by write func, by read by Read func.
10011006 // To avoid a race condition, all reads and updates to this variable must be
10021007 // done via atomics.
10031008 atomic .StoreUint32 (& vlog .writableLogOffset , vlogHeaderSize )
10041009 vlog .numEntriesWritten = 0
1005-
1006- vlog .filesLock .Lock ()
1007- vlog .filesMap [fid ] = lf
10081010 vlog .filesLock .Unlock ()
10091011
10101012 return lf , nil
@@ -1155,12 +1157,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
11551157 // plain text mode or vice versa. A single vlog file can't have both
11561158 // encrypted entries and plain text entries.
11571159 if last .encryptionEnabled () != vlog .db .shouldEncrypt () {
1158- newid := atomic . AddUint32 ( & vlog .maxFid , 1 )
1160+ newid := vlog .maxFid + 1
11591161 _ , err := vlog .createVlogFile (newid )
11601162 if err != nil {
11611163 return y .Wrapf (err , "Error while creating log file %d in valueLog.open" , newid )
11621164 }
1163- last , ok = vlog .filesMap [vlog . maxFid ]
1165+ last , ok = vlog .filesMap [newid ]
11641166 y .AssertTrue (ok )
11651167 }
11661168 lastOffset , err := last .fd .Seek (0 , io .SeekEnd )
@@ -1222,7 +1224,7 @@ func (vlog *valueLog) Close() error {
12221224 err = munmapErr
12231225 }
12241226
1225- maxFid := atomic . LoadUint32 ( & vlog .maxFid )
1227+ maxFid := vlog .maxFid
12261228 if ! vlog .opt .ReadOnly && id == maxFid {
12271229 // truncate writable log file to correct offset.
12281230 if truncErr := f .fd .Truncate (
@@ -1320,7 +1322,7 @@ func (vlog *valueLog) sync(fid uint32) error {
13201322 }
13211323
13221324 vlog .filesLock .RLock ()
1323- maxFid := atomic . LoadUint32 ( & vlog .maxFid )
1325+ maxFid := vlog .maxFid
13241326 // During replay it is possible to get sync call with fid less than maxFid.
13251327 // Because older file has already been synced, we can return from here.
13261328 if fid < maxFid || len (vlog .filesMap ) == 0 {
@@ -1353,7 +1355,7 @@ func (vlog *valueLog) write(reqs []*request) error {
13531355 return nil
13541356 }
13551357 vlog .filesLock .RLock ()
1356- maxFid := atomic . LoadUint32 ( & vlog .maxFid )
1358+ maxFid := vlog .maxFid
13571359 curlf := vlog .filesMap [maxFid ]
13581360 vlog .filesLock .RUnlock ()
13591361
@@ -1385,7 +1387,7 @@ func (vlog *valueLog) write(reqs []*request) error {
13851387 return err
13861388 }
13871389
1388- newid := atomic . AddUint32 ( & vlog .maxFid , 1 )
1390+ newid := vlog .maxFid + 1
13891391 y .AssertTruef (newid > 0 , "newid has overflown uint32: %v" , newid )
13901392 newlf , err := vlog .createVlogFile (newid )
13911393 if err != nil {
@@ -1446,28 +1448,33 @@ func (vlog *valueLog) write(reqs []*request) error {
14461448
14471449// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
14481450// (if non-nil)
1449- func (vlog * valueLog ) getFileRLocked (fid uint32 ) (* logFile , error ) {
1451+ func (vlog * valueLog ) getFileRLocked (vp valuePointer ) (* logFile , error ) {
14501452 vlog .filesLock .RLock ()
14511453 defer vlog .filesLock .RUnlock ()
1452- ret , ok := vlog .filesMap [fid ]
1454+ ret , ok := vlog .filesMap [vp . Fid ]
14531455 if ! ok {
14541456 // log file has gone away, will need to retry the operation.
14551457 return nil , ErrRetry
14561458 }
1459+
1460+ // Check for valid offset if we are reading from writable log.
1461+ maxFid := vlog .maxFid
1462+ if vp .Fid == maxFid {
1463+ currentOffset := vlog .woffset ()
1464+ if vp .Offset >= currentOffset {
1465+ return nil , errors .Errorf (
1466+ "Invalid value pointer offset: %d greater than current offset: %d" ,
1467+ vp .Offset , currentOffset )
1468+ }
1469+ }
1470+
14571471 ret .lock .RLock ()
14581472 return ret , nil
14591473}
14601474
14611475// Read reads the value log at a given location.
14621476// TODO: Make this read private.
14631477func (vlog * valueLog ) Read (vp valuePointer , s * y.Slice ) ([]byte , func (), error ) {
1464- // Check for valid offset if we are reading from writable log.
1465- maxFid := atomic .LoadUint32 (& vlog .maxFid )
1466- if vp .Fid == maxFid && vp .Offset >= vlog .woffset () {
1467- return nil , nil , errors .Errorf (
1468- "Invalid value pointer offset: %d greater than current offset: %d" ,
1469- vp .Offset , vlog .woffset ())
1470- }
14711478 buf , lf , err := vlog .readValueBytes (vp , s )
14721479 // log file is locked so, decide whether to lock immediately or let the caller to
14731480 // unlock it, after caller uses it.
@@ -1517,10 +1524,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
15171524// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
15181525// logFile unlocking.
15191526func (vlog * valueLog ) readValueBytes (vp valuePointer , s * y.Slice ) ([]byte , * logFile , error ) {
1520- lf , err := vlog .getFileRLocked (vp . Fid )
1527+ lf , err := vlog .getFileRLocked (vp )
15211528 if err != nil {
15221529 return nil , nil , err
15231530 }
1531+
15241532 buf , err := lf .read (vp , s )
15251533 return buf , lf , err
15261534}
0 commit comments