Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions scripts/benchmark/query/load/tpch10.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ select version();
SQL

for t in customer lineitem nation orders partsupp part region supplier; do
echo "DROP TABLE IF EXISTS $t;" | bendsql query
echo "DROP TABLE IF EXISTS $t;" | bendsql query
done

cat <<SQL | bendsql query
Expand Down Expand Up @@ -113,8 +113,8 @@ cat <<SQL | bendsql query
SQL

for t in customer lineitem nation orders partsupp part region supplier; do
echo "loading into $t ..."
cat <<SQL | bendsql query
echo "loading into $t ..."
cat <<SQL | bendsql query
COPY INTO $t FROM 's3://repo.databend.rs/datasets/tpch10/${t}/'
credentials=(aws_key_id='$REPO_ACCESS_KEY_ID' aws_secret_key='$REPO_SECRET_ACCESS_KEY') pattern ='${t}.*'
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=0);
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ impl TableSchema {
Ok(())
}

pub fn to_column_id_set(&self) -> HashSet<ColumnId> {
HashSet::from_iter(self.to_column_ids().iter().cloned())
pub fn to_leaf_column_id_set(&self) -> HashSet<ColumnId> {
HashSet::from_iter(self.to_leaf_column_ids().iter().cloned())
}

pub fn to_column_ids(&self) -> Vec<ColumnId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn check_segment_column_ids(
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
put_cache: true,
put_cache: false,
};

let snapshot = snapshot_reader.read(&params).await?;
Expand All @@ -100,7 +100,7 @@ async fn check_segment_column_ids(
location: seg_loc.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
put_cache: true,
put_cache: false,
};
let segment_info = segment_reader.read(&params).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
put_cache: true,
put_cache: false,
};

let snapshot = reader.read(&load_params).await?;
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
put_cache: true,
put_cache: false,
};

let snapshot = reader.read(&params).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_storage::DataOperator;
use common_storages_fuse::io::MetaReaders;
use common_storages_fuse::io::SegmentInfoReader;
use common_storages_fuse::io::SegmentWriter;
use common_storages_fuse::io::SegmentsIO;
use common_storages_fuse::io::TableMetaLocationGenerator;
use common_storages_fuse::operations::CompactOptions;
use common_storages_fuse::operations::SegmentCompactMutator;
Expand Down Expand Up @@ -617,10 +618,9 @@ async fn test_segment_compactor() -> Result<()> {

struct CompactSegmentTestFixture {
threshold: u64,
ctx: Arc<dyn TableContext>,
data_accessor: DataOperator,
location_gen: TableMetaLocationGenerator,
input_segments: Vec<Arc<SegmentInfo>>,
input_segment_locations: Vec<Location>,
// blocks of input_segments, order by segment
input_blocks: Vec<BlockMeta>,
}
Expand All @@ -630,11 +630,10 @@ impl CompactSegmentTestFixture {
let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned());
let data_accessor = ctx.get_data_operator()?;
Ok(Self {
ctx: ctx.clone(),
threshold: block_per_seg,
data_accessor,
location_gen,
input_segments: vec![],
input_segment_locations: vec![],
input_blocks: vec![],
})
}
Expand All @@ -649,29 +648,37 @@ impl CompactSegmentTestFixture {
let location_gen = &self.location_gen;
let block_writer = BlockWriter::new(data_accessor, location_gen);

let schema = TestFixture::default_table_schema();
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(block_per_seg, segment_writer.clone());
let seg_acc = SegmentCompactor::new(
block_per_seg,
max_io_requests,
&fuse_segment_io,
segment_writer.clone(),
);

let (segments, locations, blocks) =
let (locations, blocks) =
Self::gen_segments(&block_writer, &segment_writer, num_block_of_segments).await?;
self.input_segments = segments;
self.input_segment_locations = locations;
self.input_blocks = blocks;
let limit = limit.unwrap_or(usize::MAX);
seg_acc
.compact(&self.input_segments, &self.input_segment_locations, limit)
.compact(locations, limit, |status| {
self.ctx.set_status_info(&status);
})
.await
}

async fn gen_segments(
block_writer: &BlockWriter<'_>,
segment_writer: &SegmentWriter<'_>,
block_num_of_segments: &[usize],
) -> Result<(Vec<Arc<SegmentInfo>>, Vec<Location>, Vec<BlockMeta>)> {
let mut segments = vec![];
) -> Result<(Vec<Location>, Vec<BlockMeta>)> {
let mut locations = vec![];
let mut collected_blocks = vec![];
for num_blocks in block_num_of_segments {
for num_blocks in block_num_of_segments.iter().rev() {
let (schema, blocks) = TestFixture::gen_sample_blocks_ex(*num_blocks, 1, 1);
let mut stats_acc = StatisticsAccumulator::default();
for block in blocks {
Expand All @@ -696,11 +703,10 @@ impl CompactSegmentTestFixture {
col_stats,
});
let location = segment_writer.write_segment_no_cache(&segment_info).await?;
segments.push(Arc::new(segment_info));
locations.push(location);
}

Ok((segments, locations, collected_blocks))
Ok((locations, collected_blocks))
}

// verify that newly generated segments contain the proper number of blocks
Expand All @@ -716,7 +722,7 @@ impl CompactSegmentTestFixture {
location: x.to_string(),
len_hint: None,
ver: SegmentInfo::VERSION,
put_cache: true,
put_cache: false,
};

let seg = segment_reader.read(&load_params).await?;
Expand Down Expand Up @@ -791,13 +797,12 @@ impl CompactCase {
let mut block_num_of_output_segments = vec![];

// 4. input blocks should be there and in the original order
// for location in r.segments_locations.iter().rev() {
for location in r.segments_locations.iter() {
for location in r.segments_locations.iter().rev() {
let load_params = LoadParams {
location: location.0.clone(),
len_hint: None,
ver: location.1,
put_cache: true,
put_cache: false,
};

let segment = segment_reader.read(&load_params).await?;
Expand All @@ -815,6 +820,7 @@ impl CompactCase {
idx += 1;
}
}
block_num_of_output_segments.reverse();

// 5. statistics should be the same
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn test_block_pruner() -> Result<()> {
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
put_cache: true,
put_cache: false,
};

let snapshot = reader.read(&load_params).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/tests/it/storages/fuse/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
.iter()
.map(|b| gen_columns_statistics(&b.clone().unwrap(), None, &schema))
.collect::<common_exception::Result<Vec<_>>>()?;
let r = reducers::reduce_block_statistics(&col_stats, None);
let r = reducers::reduce_block_statistics(&col_stats);
assert!(r.is_ok());
let r = r.unwrap();
assert_eq!(3, r.len());
Expand Down Expand Up @@ -210,7 +210,7 @@ fn test_reduce_block_statistics_in_memory_size() -> common_exception::Result<()>
// combine two statistics
let col_stats_left = HashMap::from_iter(iter(0).take(num_of_cols));
let col_stats_right = HashMap::from_iter(iter(0).take(num_of_cols));
let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right], None)?;
let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right])?;
assert_eq!(num_of_cols, r.len());
// there should be 100 columns in the result
for idx in 1..=100 {
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
chrono = { workspace = true }
futures = "0.3.24"
futures-util = "0.3.24"
itertools = "0.10.5"
metrics = "0.20.1"
opendal = { workspace = true }
serde = { workspace = true }
Expand Down
22 changes: 9 additions & 13 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::io::write::WriteSettings;
use crate::io::TableMetaLocationGenerator;
use crate::operations::util;
use crate::operations::BloomIndexState;
use crate::statistics::BlockStatistics;
use crate::statistics::gen_columns_statistics;

// TODO rename this, it is serialization, or pass in a writer(if not rename)
pub fn serialize_block(
Expand Down Expand Up @@ -122,14 +122,10 @@ impl BlockBuilder {
// TODO, generate the cluster stats
let cluster_stats = None;

// need to use BlockStatistics any more?
let block_statistics = BlockStatistics::from(
&data_block,
block_location.0.clone(),
cluster_stats,
column_distinct_count,
&self.source_schema,
)?;
let row_count = data_block.num_rows() as u64;
let block_size = data_block.memory_size() as u64;
let col_stats =
gen_columns_statistics(&data_block, column_distinct_count, &self.source_schema)?;

let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let (file_size, col_metas) = serialize_block(
Expand All @@ -140,12 +136,12 @@ impl BlockBuilder {
)?;

let block_meta = BlockMeta {
row_count: block_statistics.block_rows_size,
block_size: block_statistics.block_bytes_size,
row_count,
block_size,
file_size,
col_stats: block_statistics.block_column_statistics,
col_stats,
col_metas,
cluster_stats: block_statistics.block_cluster_statistics,
cluster_stats,
location: block_location,
bloom_filter_index_location: bloom_index_state.as_ref().map(|v| v.location.clone()),
bloom_filter_index_size: bloom_index_state
Expand Down
9 changes: 8 additions & 1 deletion src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl FuseTable {
acc.col_stats = if acc.col_stats.is_empty() {
stats.col_stats.clone()
} else {
statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats], None)?
statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats])?
};
seg_acc.push(location.clone());
Ok::<_, ErrorCode>((acc, seg_acc))
Expand Down Expand Up @@ -521,6 +521,13 @@ impl FuseTable {
// potentially concurrently appended segments, init it to empty
let mut concurrently_appended_segment_locations: &[Location] = &[];

// Status
{
let status = "mutation: begin try to commit";
ctx.set_status_info(status);
info!(status);
}

while retries < MAX_RETRIES {
let mut snapshot_tobe_committed =
TableSnapshot::from_previous(latest_snapshot.as_ref());
Expand Down
Loading