@@ -773,19 +773,23 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
773773 lastRecordIndex := record.PendingSyncIndex {Index : ww .q .getLastIndex ()}
774774 ww .mu .Lock ()
775775 defer ww .mu .Unlock ()
776+ numWriters := ww .mu .nextWriterIndex
776777 // Every iteration starts and ends with the mutex held.
777778 //
778- // Invariant: ww.mu.nextWriterIndex >= 1.
779+ // Invariant: numWriters >= 1.
779780 //
780781 // We will loop until we have closed the lastWriter (and use
781782 // lastWriter.err). We also need to call close on all LogWriters
782783 // that will not close themselves, i.e., those that have already been
783784 // created and installed in failoverWriter.writers (this set may change
784785 // while failoverWriter.Close runs).
785- for ! lastWriter .closed {
786- numWriters := ww .mu .nextWriterIndex
786+ for ! lastWriter .closed || numWriters > lastWriter .index + 1 {
787787 if numWriters > closeCalledCount {
788- // INVARIANT: numWriters > closeCalledCount.
788+ // lastWriter.index may or may not have advanced. If it has advanced, we
789+ // need to reinitialize lastWriterState. If it hasn't advanced, and
790+ // numWriters > closeCalledCount, we know that we haven't called close
791+ // on it, so nothing in lastWriterState needs to be retained. For
792+ // simplicity, we overwrite in both cases.
789793 lastWriter = lastWriterState {
790794 index : numWriters - 1 ,
791795 }
@@ -857,7 +861,34 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
857861 if ! lastWriter .closed {
858862 // Either waiting for creation of last writer or waiting for the close
859863 // to finish, or something else to become the last writer.
864+ //
865+ // It is possible that what we think of as the last writer (lastWriter)
866+ // closes itself while ww.mu is no longer held here, and a new LogWriter
867+ // is created too. All the records are synced, but the real last writer
868+ // may still be writing some records. Specifically, consider the
869+ // following sequence while this wait does not hold the mutex:
870+ //
871+ // - recordQueue has an entry, with index 50, that does not require a
872+ // sync.
873+ // - Last writer created at index 10 and entry 50 is handed to it.
874+ // - lastWriter.index is still 9 and it closes itself and signals this
875+ // cond. It has written entry 50 and synced (since close syncs).
876+ // - The wait completes.
877+ //
878+ // Now the writer at index 10 will never be closed and will never sync.
879+ // A crash can cause some part of what it writes to be lost. Note that
880+ // there is no data loss, but there are some unfortunate consequences:
881+ //
882+ // - We never closed a file descriptor.
883+ // - virtualWALReader.NextRecord can return an error on finding a
884+ // malformed chunk in the last writer (at index 10) instead of
885+ // swallowing the error. This can cause DB.Open to fail.
886+ //
887+ // To avoid this, we grab the latest value of numWriters on reacquiring
888+ // the mutex, and will continue looping until the writer at index 10 is
889+ // closed (or writer at index 11 is created).
860890 ww .mu .cond .Wait ()
891+ numWriters = ww .mu .nextWriterIndex
861892 }
862893 }
863894 if ww .mu .writers [lastWriter .index ].w != nil {
@@ -867,7 +898,10 @@ func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
867898 err = lastWriter .err
868899 ww .mu .metrics = lastWriter .metrics
869900 ww .mu .closed = true
870- _ , _ = ww .q .popAll (err )
901+ n , m := ww .q .popAll (err )
902+ if err == nil && (n > 0 || m > 0 ) {
903+ panic (errors .AssertionFailedf ("no error but recordQueue had %d records and %d syncs" , n , m ))
904+ }
871905 return logicalOffset , err
872906}
873907
0 commit comments