From d31be6f93a8e2de2e0db2c3d87dc1b8121a07dce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Mon, 21 Feb 2022 15:55:44 +0800 Subject: [PATCH] omit coping in EventIterator, use write batch for apply (#18) * omit coping in EventIterator, use write batch for apply Signed-off-by: Yu Juncen * remove unused import Signed-off-by: Yu Juncen * address comments Signed-off-by: Yu Juncen --- components/sst_importer/src/sst_importer.rs | 11 ++-- .../tikv_util/src/codec/stream_event.rs | 54 ++++++++++--------- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 25294645ce0..f1caa5a819f 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -15,9 +15,9 @@ use kvproto::import_sstpb::*; use encryption::{encryption_method_to_db_encryption_method, DataKeyManager}; use engine_rocks::{get_env, RocksSstReader}; use engine_traits::{ - name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, SSTMetaInfo, - SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder, CF_DEFAULT, - CF_WRITE, + name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, Mutable, + SSTMetaInfo, SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder, + WriteBatch, CF_DEFAULT, CF_WRITE, }; use file_system::{get_io_rate_limiter, OpenOptions}; use kvproto::kvrpcpb::ApiVersion; @@ -333,6 +333,7 @@ impl SSTImporter { let mut largest_key = None; let start = Instant::now(); + let mut wb = engine.write_batch(); loop { if !event_iter.valid() { break; @@ -382,9 +383,9 @@ impl SSTImporter { } let value = Cow::Borrowed(event_iter.value()); // TODO handle delete cf - engine.put_cf(cf, &key, &value)?; + wb.put_cf(cf, &key, &value)?; } - engine.flush_cf(cf, true)?; + wb.write()?; let label = if perform_rewrite { "rewrite" } else { "normal" }; info!("apply file finished {}", name); IMPORTER_APPLY_DURATION diff --git a/components/tikv_util/src/codec/stream_event.rs b/components/tikv_util/src/codec/stream_event.rs index e5c9dab2d34..43b63c4a892 100644 --- a/components/tikv_util/src/codec/stream_event.rs +++ b/components/tikv_util/src/codec/stream_event.rs @@ -1,5 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. use crate::{codec::Result, Either}; +use byteorder::ByteOrder; use bytes::{Buf, Bytes}; use std::io::prelude::*; use std::io::Cursor; @@ -15,54 +16,57 @@ pub trait Iterator { } pub struct EventIterator { - buf: Cursor>, - index: usize, - len: usize, - key: Vec, - val: Vec, + buf: Vec, + offset: usize, + key_offset: usize, + value_offset: usize, + key_len: usize, + value_len: usize, } impl EventIterator { pub fn new(buf: Vec) -> EventIterator { - let len = buf.len(); EventIterator { - buf: Cursor::new(buf), - index: 0, - len, - key: vec![], - val: vec![], + buf, + offset: 0, + key_offset: 0, + key_len: 0, + value_offset: 0, + value_len: 0, } } + + fn get_size(&mut self) -> u32 { + let result = byteorder::LE::read_u32(&self.buf[self.offset..]); + self.offset += 4; + result + } } impl Iterator for EventIterator { fn next(&mut self) -> Result<()> { if self.valid() { - let len = self.buf.get_u32_le() as usize; - self.index += 4; - self.key.resize(len, 0); - self.buf.read_exact(self.key.as_mut_slice())?; - self.index += len; - - let len = self.buf.get_u32_le() as usize; - self.index += 4; - self.val.resize(len, 0); - self.buf.read_exact(self.val.as_mut_slice())?; - self.index += len; + self.key_len = self.get_size() as usize; + self.key_offset = self.offset; + self.offset += self.key_len; + + self.value_len = self.get_size() as usize; + self.value_offset = self.offset; + self.offset += self.value_len; } Ok(()) } fn valid(&self) -> bool { - self.index < self.len + self.offset < self.buf.len() } fn key(&self) -> &[u8] { - &self.key + &self.buf[self.key_offset..self.key_offset + self.key_len] } fn value(&self) -> &[u8] { - &self.val + &self.buf[self.value_offset..self.value_offset + self.value_len] } }