-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
engine: persist data to kernel #143
Conversation
it seems we have multiple usage modes, the two most obvious ways are "as-storage-lib" or "as-storage-service"..... it let us think a question:
|
Yes, we need to persist the kernel options once a kernel is created. It is very dangerous to use a kernel in one way and then switch to another way. So we should provide some validation when a kernel is open. |
pub async fn set(&self, ts: Timestamp, key: Vec<u8>, value: Vec<u8>) -> Result<()> { | ||
let current = self.current_version().await; | ||
current.set(ts, key, value).await?; | ||
if let Some((imm, version)) = current.should_flush().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe here has some concurrency problem? two thread set at the same time maybe generate two immutable table & version... 🧐
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's why I acquire a lock at https://github.com/engula/engula/pull/143/files#diff-4ee1308253a5d8885bc1d6a84ecf80e15233c086174740a737302e56654ac6a2R58.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to prevent flushing multiple memory tables at the same time, though. There are still a lot of work to do before this engine can work decently 🤣
@huachaohuang thanks for your reminder! I'm reviewing this PR now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for preparing this PR! Comments inline.
I'll take a closer look at engine.rs
later.
for object in &update.add_objects { | ||
// We assume that objects are flushed from the oldest immtable to the newest. | ||
let reader = version.bucket.new_sequential_reader(object).await?; | ||
let table_reader = TableReader::new(reader).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark a improvement~ maybe we can build all readers first then consume readers :)
logic like
for {
// build readers
}
for {
// build table reader
}
then for gRPC impl, it can pipeline send requests to remote
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds a good idea.
Co-authored-by: tison <wander4096@gmail.com>
for event in events { | ||
ts += 1; | ||
let (key, value) = codec::decode_record(&event.data)?; | ||
current.put(ts, key.to_owned(), value.to_owned()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to check MEMTABLE_SIZE here after recovery~?
it seems some rocksdb-like engines do that(it also has smaller memtable size threshold than normal flush operation, restart process maybe produce more sst file - -), I'm not sure is any practice need do it 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so. But maybe leave it as a future improvement. This PR has done enough.
@@ -40,8 +37,8 @@ pub struct Engine<K: Kernel> { | |||
stream: K::Stream, | |||
bucket: K::Bucket, | |||
current: Arc<Mutex<Arc<EngineVersion<K>>>>, | |||
last_ts: Arc<Mutex<Timestamp>>, | |||
last_number: Arc<AtomicU64>, | |||
last_timestamp: Arc<Mutex<Timestamp>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you rename the field to last_timestamp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I can't give a strong reason here. Just thought that if I have multiple last_xxx
fields here, maybe I'd better make them more verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tisonkun Any more comments? If not, I am going to merge it then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks for your review. I believe this is a big step to complement the bridge between |
|
||
let mut update = KernelUpdate::default(); | ||
let last_ts = encode_u64_meta(imm.last_update_timestamp().await); | ||
update.set_meta(LAST_TIMESTAMP, last_ts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after some learning, it seems we'd better add check last_ts > [current_last_ts]
before do update.
someone ingest SST maybe contains higher ts value than imm.last_update_timestamp
but it's ok for now, because we don't support ingest 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the engine is quite buggy for now.
This PR enables the hash engine to persist data in a kernel. It demonstrates the interaction between
Engine
andKernel
.Note that our kernel is still buggy as described here, so the engine recovery thing doesn't work if we test with more records here. I will fix that on the kernel side later.
Closes #59