Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLE
[log]

[log.file]
level = "INFO"
level = "DEBUG"
format = "text"
dir = "./.databend/logs_2"
limit = 12 # 12 files, 1 file per hour
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLE
[log]

[log.file]
level = "INFO"
level = "DEBUG"
format = "text"
limit = 12 # 12 files, 1 file per hour
dir = "./.databend/logs_3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ fn build_partition_bucket_experimental(
ctx.clone(),
output_num,
shared_partition_stream.clone(),
true,
)?;
let input_port = InputPort::create();
let output_port = OutputPort::create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_pipeline_transforms::MemorySettings;
use databend_common_storage::DataOperator;
use databend_common_storages_parquet::ReadSettings;
use log::debug;
use log::info;
use parking_lot::Mutex;
use parquet::file::metadata::RowGroupMetaData;

Expand All @@ -46,6 +47,8 @@ use crate::spillers::SpillsDataWriter;

struct PayloadWriter {
path: String,
// TODO: this may change to lazy init, for now it will create 128*thread_num files at most even
// if the writer not used to write.
writer: SpillsDataWriter,
}

Expand Down Expand Up @@ -110,16 +113,23 @@ struct AggregatePayloadWriters {
writers: Vec<PayloadWriter>,
write_stats: WriteStats,
ctx: Arc<QueryContext>,
is_local: bool,
}

impl AggregatePayloadWriters {
pub fn create(prefix: &str, partition_count: usize, ctx: Arc<QueryContext>) -> Self {
pub fn create(
prefix: &str,
partition_count: usize,
ctx: Arc<QueryContext>,
is_local: bool,
) -> Self {
AggregatePayloadWriters {
spill_prefix: prefix.to_string(),
partition_count,
writers: vec![],
write_stats: WriteStats::default(),
ctx,
is_local,
}
}

Expand Down Expand Up @@ -166,6 +176,18 @@ impl AggregatePayloadWriters {
for (partition_id, writer) in writers.into_iter().enumerate() {
let (path, written_size, row_groups) = writer.close()?;

if written_size != 0 {
info!(
"Write aggregate spill finished({}): (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
if self.is_local { "local" } else { "exchange" },
partition_id,
path,
written_size,
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
row_groups.len()
);
}

self.ctx.add_spill_file(
Location::Remote(path.clone()),
Layout::Aggregate,
Expand Down Expand Up @@ -276,13 +298,15 @@ impl NewAggregateSpiller {
ctx: Arc<QueryContext>,
partition_count: usize,
partition_stream: SharedPartitionStream,
is_local: bool,
) -> Result<Self> {
let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?;
let table_ctx: Arc<dyn TableContext> = ctx.clone();
let read_setting = ReadSettings::from_settings(&table_ctx.get_settings())?;
let spill_prefix = ctx.query_id_spill_prefix();

let payload_writers = AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx);
let payload_writers =
AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx, is_local);

Ok(Self {
memory_settings,
Expand Down Expand Up @@ -320,12 +344,21 @@ impl NewAggregateSpiller {

let operator = DataOperator::instance().spill_operator();
let buffer_pool = SpillsBufferPool::instance();
let mut reader = buffer_pool.reader(operator.clone(), location, vec![row_group.clone()])?;

let read_bytes = row_group.total_byte_size() as usize;
let mut reader =
buffer_pool.reader(operator.clone(), location.clone(), vec![row_group.clone()])?;

let instant = Instant::now();
let data_block = reader.read(self.read_setting)?;
flush_read_profile(&instant, read_bytes);
let elapsed = instant.elapsed();

let read_size = reader.read_bytes();
flush_read_profile(&elapsed, read_size);

info!(
"Read aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, elapsed: {:?})",
bucket, location, read_size, row_group.num_rows(), elapsed
);

if let Some(block) = data_block {
Ok(AggregateMeta::Serialized(SerializedPayload {
Expand All @@ -339,12 +372,12 @@ impl NewAggregateSpiller {
}
}

fn flush_read_profile(instant: &Instant, read_bytes: usize) {
fn flush_read_profile(elapsed: &Duration, read_bytes: usize) {
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1);
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes);
Profile::record_usize_profile(
ProfileStatisticsName::RemoteSpillReadTime,
instant.elapsed().as_millis() as usize,
elapsed.as_millis() as usize,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl NewTransformAggregateSpillWriter {
) -> Result<Box<dyn Processor>> {
let partition_count = MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize;
let spiller =
NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream)?;
NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream, true)?;

Ok(AccumulatingTransformer::create(
input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub fn agg_spilling_aggregate_payload(
let mut spilled_buckets_payloads = Vec::with_capacity(partition_count);
// Record how many rows are spilled.
let mut rows = 0;
let mut buckets_count = 0;
let location = spiller.create_unique_location();
for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() {
if payload.len() == 0 {
Expand All @@ -200,6 +201,7 @@ pub fn agg_spilling_aggregate_payload(

let data_block = payload.aggregate_flush_all()?.consume_convert_to_full();
rows += data_block.num_rows();
buckets_count += 1;

let begin = write_size;
let mut columns_data = Vec::with_capacity(data_block.num_columns());
Expand Down Expand Up @@ -227,6 +229,7 @@ pub fn agg_spilling_aggregate_payload(
let (location, write_bytes) = spiller
.spill_stream_aggregate_buffer(Some(location), write_data)
.await?;
let elapsed = instant.elapsed();
// perf
{
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1);
Expand All @@ -236,7 +239,7 @@ pub fn agg_spilling_aggregate_payload(
);
Profile::record_usize_profile(
ProfileStatisticsName::RemoteSpillWriteTime,
instant.elapsed().as_millis() as usize,
elapsed.as_millis() as usize,
);
}

Expand All @@ -249,9 +252,8 @@ pub fn agg_spilling_aggregate_payload(
}

info!(
"Write aggregate spill {} successfully, elapsed: {:?}",
location,
instant.elapsed()
"Write aggregate spill finished(local): (location: {}, bytes: {}, rows: {}, buckets_count: {}, elapsed: {:?})",
location, write_bytes, rows, buckets_count, elapsed
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ impl TransformExchangeAggregateSerializer {
let spiller = if params.enable_experiment_aggregate {
let spillers = partition_streams
.into_iter()
.map(|stream| {
.enumerate()
.map(|(pos, stream)| {
NewAggregateSpiller::try_create(
ctx.clone(),
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
stream.clone(),
pos == local_pos,
)
})
.collect::<Result<Vec<NewAggregateSpiller>>>()?;
Expand Down Expand Up @@ -356,6 +358,7 @@ fn exchange_agg_spilling_aggregate_payload(
) -> Result<BoxFuture<'static, Result<DataBlock>>> {
let partition_count = partitioned_payload.partition_count();
let mut write_size = 0;
let mut buckets_count = 0;
let mut write_data = Vec::with_capacity(partition_count);
let mut buckets_column_data = Vec::with_capacity(partition_count);
let mut data_range_start_column_data = Vec::with_capacity(partition_count);
Expand All @@ -371,6 +374,7 @@ fn exchange_agg_spilling_aggregate_payload(

let data_block = payload.aggregate_flush_all()?;
rows += data_block.num_rows();
buckets_count += 1;

let old_write_size = write_size;
let columns = data_block.columns().to_vec();
Expand Down Expand Up @@ -398,6 +402,7 @@ fn exchange_agg_spilling_aggregate_payload(
let (location, write_bytes) = spiller
.spill_stream_aggregate_buffer(None, write_data)
.await?;
let elapsed = instant.elapsed();
// perf
{
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1);
Expand All @@ -407,7 +412,7 @@ fn exchange_agg_spilling_aggregate_payload(
);
Profile::record_usize_profile(
ProfileStatisticsName::RemoteSpillWriteTime,
instant.elapsed().as_millis() as usize,
elapsed.as_millis() as usize,
);
}

Expand All @@ -422,9 +427,8 @@ fn exchange_agg_spilling_aggregate_payload(
}

info!(
"Write aggregate spill {} successfully, elapsed: {:?}",
location,
instant.elapsed()
"Write aggregate spill finished(exchange): (location: {}, bytes: {}, rows: {}, buckets_count: {}, elapsed: {:?})",
location, write_bytes, rows, buckets_count, elapsed
);

let data_block = DataBlock::new_from_columns(vec![
Expand Down
Loading
Loading