-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Write-ahead Logging #261
Conversation
There are some things that need help:
|
Codecov Report
@@ Coverage Diff @@
## main #261 +/- ##
==========================================
- Coverage 44.25% 39.94% -4.32%
==========================================
Files 93 100 +7
Lines 9814 10868 +1054
==========================================
- Hits 4343 4341 -2
- Misses 5053 6109 +1056
Partials 418 418
... and 5 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
You should have switched to a new flushChannel on switching the buffer. Theoretically, a flush channel sends a notice which indicates its corresponding buffer gets flushed. Based on that, the flush channel seems a field in the |
@HHoflittlefish777 Please fix test failures. |
I have two channel field in the |
Make sense to me. I submitted the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look at the implementation. There are several channels to sync operations. The std testing framework can't handle this pattern appropriately. I recommend :
- Using https://github.com/onsi/ginkgo as other packages to test async operations.
- https://github.com/uber-go/goleak will help check the goroutine leaks
There is a comprehensive test case which you could get some inspiration from:
https://github.com/apache/skywalking-banyandb/blob/359fd2a84a525351a8fc0e2313b31f7e601f88e7/banyand/tsdb/buffer_test.go
pkg/wal/wal.go
Outdated
timer := time.NewTimer(bufferBatchInterval * time.Millisecond) | ||
select { | ||
case request := <-log.writeChannel: | ||
bufferSize += seriesIDLength + timestampLength + len(request.data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The length of seriesID
is not fixed.
pkg/wal/wal.go
Outdated
buf := log.buffer | ||
// Clear buffer to receive Log request. | ||
log.newBuffer() | ||
if log.closeFlag { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't archive synchronization through a local varable. To safely share data between goroutines is by using channel or context.
pkg/wal/wal.go
Outdated
}() | ||
} | ||
|
||
func (log *Log) asyncBatchflush(buffer buffer, flushCh chan string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (log *Log) asyncBatchflush(buffer buffer, flushCh chan string) { | |
func (log *Log) asyncBatchFlush(buffer buffer, flushCh chan string) { |
pkg/wal/wal.go
Outdated
@@ -43,18 +128,450 @@ type WAL interface { | |||
// Write a logging entity. | |||
// It will return immediately when the data is written in the buffer, | |||
// The returned function will be called when the entity is flushed on the persistent storage. | |||
Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error) | |||
Write(seriesID []byte, timestamp time.Time, data []byte) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you drop off the returned function to get flushed event?
pkg/wal/wal.go
Outdated
} | ||
|
||
// Compression and flush. | ||
compressionData := snappy.Encode(nil, bytesBuffer.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of compressing the entire segment, it is recommended to separately encode/compress timestamps and values.
The pkg/encoding/xor
package offers methods for encoding uint64. These can be used to encode/decode the UNIX epoch (nanosecond) of a timestamp.
For quick compression of values, snappy can be utilized.
By following this approach, there is no need for bytesBuffer
during flushing. Instead, data can be written directly to workSegment.file
.
This PR seems not to update for weeks. Is this still being processed? |
Certainly, I will processed with it later, after completing load testing. |
I want to try to continue, Thanks |
@hailin0 Thanks for you interest in it, but WAL doesn't have much work to continue with, just fix the review. There is still a lot of work to be done in BanyanDB, and you can do some other work you like. |
@hailin0 Please address the issues outlined in the review. If you encounter any difficulties or have questions, feel free to reach out for assistance. @HHoflittlefish777 |
That is fine, Looking forward to your further contributions to BanyanDB @hailin0. |
thanks, i am doing |
@hailin0 any update? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add benchmark to the write and rotate operations?
Yes, I would add benchmark testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you to everyone who participated.
Close apache/skywalking#10301