Skip to content

Commit

Permalink
omit coping in EventIterator, use write batch for apply (tikv#18)
Browse files Browse the repository at this point in the history
* omit coping in EventIterator, use write batch for apply

Signed-off-by: Yu Juncen <yujuncen@pingcap.com>

* remove unused import

Signed-off-by: Yu Juncen <yujuncen@pingcap.com>

* address comments

Signed-off-by: Yu Juncen <yu745514916@live.com>
  • Loading branch information
YuJuncen committed Feb 21, 2022
1 parent c724d85 commit d31be6f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
11 changes: 6 additions & 5 deletions components/sst_importer/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use kvproto::import_sstpb::*;
use encryption::{encryption_method_to_db_encryption_method, DataKeyManager};
use engine_rocks::{get_env, RocksSstReader};
use engine_traits::{
name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, SSTMetaInfo,
SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder, CF_DEFAULT,
CF_WRITE,
name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, Mutable,
SSTMetaInfo, SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder,
WriteBatch, CF_DEFAULT, CF_WRITE,
};
use file_system::{get_io_rate_limiter, OpenOptions};
use kvproto::kvrpcpb::ApiVersion;
Expand Down Expand Up @@ -333,6 +333,7 @@ impl SSTImporter {
let mut largest_key = None;

let start = Instant::now();
let mut wb = engine.write_batch();
loop {
if !event_iter.valid() {
break;
Expand Down Expand Up @@ -382,9 +383,9 @@ impl SSTImporter {
}
let value = Cow::Borrowed(event_iter.value());
// TODO handle delete cf
engine.put_cf(cf, &key, &value)?;
wb.put_cf(cf, &key, &value)?;
}
engine.flush_cf(cf, true)?;
wb.write()?;
let label = if perform_rewrite { "rewrite" } else { "normal" };
info!("apply file finished {}", name);
IMPORTER_APPLY_DURATION
Expand Down
54 changes: 29 additions & 25 deletions components/tikv_util/src/codec/stream_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{codec::Result, Either};
use byteorder::ByteOrder;
use bytes::{Buf, Bytes};
use std::io::prelude::*;
use std::io::Cursor;
Expand All @@ -15,54 +16,57 @@ pub trait Iterator {
}

pub struct EventIterator {
buf: Cursor<Vec<u8>>,
index: usize,
len: usize,
key: Vec<u8>,
val: Vec<u8>,
buf: Vec<u8>,
offset: usize,
key_offset: usize,
value_offset: usize,
key_len: usize,
value_len: usize,
}

impl EventIterator {
pub fn new(buf: Vec<u8>) -> EventIterator {
let len = buf.len();
EventIterator {
buf: Cursor::new(buf),
index: 0,
len,
key: vec![],
val: vec![],
buf,
offset: 0,
key_offset: 0,
key_len: 0,
value_offset: 0,
value_len: 0,
}
}

fn get_size(&mut self) -> u32 {
let result = byteorder::LE::read_u32(&self.buf[self.offset..]);
self.offset += 4;
result
}
}

impl Iterator for EventIterator {
fn next(&mut self) -> Result<()> {
if self.valid() {
let len = self.buf.get_u32_le() as usize;
self.index += 4;
self.key.resize(len, 0);
self.buf.read_exact(self.key.as_mut_slice())?;
self.index += len;

let len = self.buf.get_u32_le() as usize;
self.index += 4;
self.val.resize(len, 0);
self.buf.read_exact(self.val.as_mut_slice())?;
self.index += len;
self.key_len = self.get_size() as usize;
self.key_offset = self.offset;
self.offset += self.key_len;

self.value_len = self.get_size() as usize;
self.value_offset = self.offset;
self.offset += self.value_len;
}
Ok(())
}

fn valid(&self) -> bool {
self.index < self.len
self.offset < self.buf.len()
}

fn key(&self) -> &[u8] {
&self.key
&self.buf[self.key_offset..self.key_offset + self.key_len]
}

fn value(&self) -> &[u8] {
&self.val
&self.buf[self.value_offset..self.value_offset + self.value_len]
}
}

Expand Down

0 comments on commit d31be6f

Please sign in to comment.