Skip to content

Commit

Permalink
Merge pull request #1104 from ByConity/cherry-pick3
Browse files Browse the repository at this point in the history
Daily cherry-pick from internal
  • Loading branch information
dmthuc committed Jan 26, 2024
2 parents 3b68158 + 1fb520c commit cc4e467
Show file tree
Hide file tree
Showing 22 changed files with 80 additions and 34 deletions.
10 changes: 2 additions & 8 deletions src/CloudServices/CnchBGThreadPartitionSelector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ Strings CnchBGThreadPartitionSelector::selectForMerge(const StoragePtr & storage

std::unique_lock lock(mutex);
auto estimators = container[uuid];
/// TODO: (zuochuang.zema) lock guard the whole scope or just copy elements?
// lock.unlock();
lock.unlock();

/// Sometimes there might not be enough data in system.server_part_log, eg: server start without binding K8s PVC.
/// Then we need to select more partitions from Catalog (by round-robin strategy).
Expand Down Expand Up @@ -217,8 +216,6 @@ Strings CnchBGThreadPartitionSelector::selectForMerge(const StoragePtr & storage
/// candidates.size() <= n
if (auto size = candidates.size(); size <= n)
{
lock.unlock();

Strings all;
all.reserve(size);
for (const auto & candidate : candidates)
Expand Down Expand Up @@ -251,7 +248,6 @@ Strings CnchBGThreadPartitionSelector::selectForMerge(const StoragePtr & storage
[](auto & lhs, auto & rhs) { return lhs->merge_speed < rhs->merge_speed; }
);
}
lock.unlock();

for (const auto & candidate : candidates)
{
Expand All @@ -277,8 +273,7 @@ Strings CnchBGThreadPartitionSelector::selectForGC(const StoragePtr & storage)

std::unique_lock lock(mutex);
auto estimators = container[uuid];
/// TODO: (zuochuang.zema) lock guard the whole scope or just copy elements?
// lock.unlock();
lock.unlock();

if (estimators.empty())
{
Expand Down Expand Up @@ -310,7 +305,6 @@ Strings CnchBGThreadPartitionSelector::selectForGC(const StoragePtr & storage)
candidates.end(),
[](auto & lhs, auto & rhs) { return lhs->gc_speed < rhs->gc_speed; }
);
lock.unlock();

for (const auto & candidate : candidates)
{
Expand Down
15 changes: 15 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ struct ContextSharedPart
BackgroundSchedulePool::TaskHolder meta_checker;

std::optional<CnchHiveSettings> cnchhive_settings;
std::optional<CnchHiveSettings> las_settings;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
std::optional<CnchFileSettings> cnch_file_settings; /// Settings of CnchFile engines.
std::optional<MergeTreeSettings> replicated_merge_tree_settings; /// Settings of ReplicatedMergeTree* engines.
Expand Down Expand Up @@ -3993,6 +3994,20 @@ const CnchHiveSettings & Context::getCnchHiveSettings() const
return *shared->cnchhive_settings;
}

const CnchHiveSettings & Context::getCnchLasSettings() const
{
auto lock = getLock();

if (!shared->las_settings)
{
const auto & config = getConfigRef();
CnchHiveSettings las_settings;
las_settings.loadFromConfig("las", config);
shared->las_settings.emplace(las_settings);
}
return *shared->las_settings;
}

const MergeTreeSettings & Context::getMergeTreeSettings() const
{
auto lock = getLock();
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ class Context : public std::enable_shared_from_this<Context>
const MergeTreeSettings & getReplicatedMergeTreeSettings() const;
const StorageS3Settings & getStorageS3Settings() const;
const CnchHiveSettings & getCnchHiveSettings() const;
const CnchHiveSettings & getCnchLasSettings() const;
const CnchFileSettings & getCnchFileSettings() const;

/// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check)
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/DataLakes/HiveFile/JNIArrowSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ Chunk JNIArrowSource::generate()
THROW_RESULT_NOT_OK(record_batch);
auto table = arrow::Table::FromRecordBatches(schema, {record_batch.ValueOrDie()});
THROW_RESULT_NOT_OK(table);
std::cout << table.ValueOrDie()->ToString() << std::endl;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table.ValueOrDie());
std::cout << res.getNumRows() << std::endl;
}
return res;
}
Expand Down
45 changes: 30 additions & 15 deletions src/Storages/DataLakes/StorageCnchLas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,8 @@ PrepareContextResult StorageCnchLas::prepareReadContext(
read_properties["buffer_size"] = std::to_string(settings.max_read_buffer_size);

watch.restart();
HiveFiles hive_files;
for (const auto & partition : partitions)
{
HiveFiles res = hive_client->getFilesInPartition(partition);
hive_files.insert(hive_files.end(), std::make_move_iterator(res.begin()), std::make_move_iterator(res.end()));
}
size_t num_workers = local_context->getCurrentWorkerGroup()->getShardsInfo().size();
HiveFiles hive_files = jni_meta_client->getFilesInPartition(partitions, num_workers, static_cast<size_t>(num_streams));

LOG_TRACE(log, "Elapsed {} ms to get {} FileSplits", watch.elapsedMilliseconds(), hive_files.size());
PrepareContextResult result{.hive_files = std::move(hive_files)};
Expand Down Expand Up @@ -165,25 +161,44 @@ void registerStorageLas(StorageFactory & factory)
factory.registerStorage("CnchLas", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage CnchHudi require 3 arguments: hive_metastore_url, hudi_db_name and hudi_table_name.");

String hive_metastore_url;
String hive_database;
String hive_table;

for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());

String hive_metastore_url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String hive_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String hive_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
if (engine_args.size() == 3)
{
hive_metastore_url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
hive_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
hive_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else if (engine_args.size() == 2)
{
hive_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
hive_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
else
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage CnchHudi require 2 or 3 arguments: {[hive_metastore_url, ] db_name, able_name}.");
}

StorageInMemoryMetadata metadata;
std::shared_ptr<CnchHiveSettings> hive_settings = std::make_shared<CnchHiveSettings>(args.getContext()->getCnchHiveSettings());
std::shared_ptr<CnchHiveSettings> las_settings = std::make_shared<CnchHiveSettings>(args.getContext()->getCnchLasSettings());
if (args.storage_def->settings)
{
hive_settings->loadFromQuery(*args.storage_def);
las_settings->loadFromQuery(*args.storage_def);
metadata.settings_changes = args.storage_def->settings->ptr();
}

if (hive_metastore_url.empty())
{
hive_metastore_url = las_settings->hive_metastore_url;
}

if (!args.columns.empty())
metadata.setColumns(args.columns);

Expand All @@ -204,7 +219,7 @@ void registerStorageLas(StorageFactory & factory)
std::move(metadata),
args.getContext(),
args.hive_client,
hive_settings);
las_settings);
},
features);
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/Hive/CnchHiveSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct Settings;
M(UInt64, max_read_row_group_threads, 32, "", 0) \
M(Bool, cnch_temporary_table, 0, "", 0) \
M(Bool, enable_local_disk_cache, false, "", 0) \
M(String, hive_metastore_url, "", "Hive metastore url", 0) \
M(String, hdfs_fs, "", "Hdfs namenode url", 0) \
M(String, endpoint, "", "S3 endpoint", 0) \
M(String, ak_id, "", "S3 access key", 0) \
Expand Down
4 changes: 0 additions & 4 deletions src/Storages/Hive/Metastore/IMetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ class IMetaClient

// each partition is identified by a `key1=val1/key2=val2`
virtual ApacheHive::PartitionsStatsResult getPartitionStats(const String & db_name, const String & table_name, const Strings& col_names, const Strings& partition_keys, const std::vector<Strings>& partition_vals ) = 0;

/// Normally hive files are fetched directly with dist
/// This method is for JNIHiveMetaClient
virtual HiveFiles getFilesInPartition(const HivePartitionPtr &) { return {}; }
};
using IMetaClientPtr = std::shared_ptr<IMetaClient>;

Expand Down
10 changes: 6 additions & 4 deletions src/Storages/Hive/Metastore/JNIHiveMetastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ JNIHiveMetastoreClient::getPartitionsByFilter(const String &, const String &, co
return partitions_spec.partitions;
}

HiveFiles JNIHiveMetastoreClient::getFilesInPartition(const HivePartitionPtr & partition)
HiveFiles JNIHiveMetastoreClient::getFilesInPartition(const HivePartitions & partitions, size_t min_split_num, size_t max_threads)
{
Protos::PartitionPaths required_partitions;
required_partitions.set_split_num(1);
required_partitions.add_paths(partition->location);
required_partitions.set_split_num(min_split_num);
required_partitions.set_max_threads(max_threads);
for (const auto & partition : partitions)
required_partitions.add_paths(partition->location);

String input_splits_bytes = jni_metaclient->getFilesInPartition(required_partitions.SerializeAsString());
Protos::InputSplits input_splits;
Expand All @@ -64,7 +66,7 @@ HiveFiles JNIHiveMetastoreClient::getFilesInPartition(const HivePartitionPtr & p
for (const auto & input_split : input_splits.input_splits()) {
auto it = hive_files.emplace_back(std::make_shared<HiveInputSplitFile>(input_split, properties));
it->format = IHiveFile::FileFormat::InputSplit;
it->partition = partition;
it->partition = partitions.at(input_split.partition_index());
}
return hive_files;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/Hive/Metastore/JNIHiveMetastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class JNIHiveMetastoreClient : public IMetaClient
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getAllDatabases is not implement");
}

HiveFiles getFilesInPartition(const HivePartitionPtr & partition) override;
HiveFiles getFilesInPartition(const HivePartitions & partitions, size_t min_split_num, size_t max_threads);

std::unordered_map<String, String> & getProperties() { return properties; }

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCloudTables.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class StorageSystemCloudTables final : public shared_ptr_helper<StorageSystemClo
size_t max_block_size,
unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCloudTables(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchKafkaTables.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class StorageSystemCnchKafkaTables : public shared_ptr_helper<StorageSystemCnchK
size_t /*max_block_size*/,
unsigned /*num_streams*/) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCnchKafkaTables(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchMaterializedMySQL.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class StorageSystemCnchMaterializedMySQL : public shared_ptr_helper<StorageSyste
size_t /*max_block_size*/,
unsigned /*num_streams*/) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCnchMaterializedMySQL(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchPartsInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class StorageSystemCnchPartsInfo : public shared_ptr_helper<StorageSystemCnchPar
size_t max_block_size,
unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCnchPartsInfo(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchPartsInfoLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class StorageSystemCnchPartsInfoLocal final : public shared_ptr_helper<StorageSy
size_t max_block_size,
unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCnchPartsInfoLocal(const StorageID & table_id_);
};
Expand Down
1 change: 1 addition & 0 deletions src/Storages/System/StorageSystemCnchTableInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class StorageSystemCnchTableInfo : public shared_ptr_helper<StorageSystemCnchTab
const size_t max_block_size,
const unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
StorageSystemCnchTableInfo(const StorageID & table_id_);
Expand Down
1 change: 1 addition & 0 deletions src/Storages/System/StorageSystemCnchTables.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class StorageSystemCnchTables : public shared_ptr_helper<StorageSystemCnchTables
const size_t max_block_size,
const unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
StorageSystemCnchTables(const StorageID & table_id_);
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchTablesHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class StorageSystemCnchTablesHistory : public shared_ptr_helper<StorageSystemCnc
const size_t max_block_size,
const unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
StorageSystemCnchTablesHistory(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchTrashItemsInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class StorageSystemCnchTrashItemsInfo : public shared_ptr_helper<StorageSystemCn
size_t max_block_size,
unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCnchTrashItemsInfo(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchTrashItemsInfoLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class StorageSystemCnchTrashItemsInfoLocal final : public shared_ptr_helper<Stor
size_t max_block_size,
unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemCnchTrashItemsInfoLocal(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemCnchViewTables.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class StorageSystemCnchViewTables : public shared_ptr_helper<StorageSystemCnchVi
const size_t max_block_size,
const unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
StorageSystemCnchViewTables(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemKafkaTables.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class StorageSystemKafkaTables : public shared_ptr_helper<StorageSystemKafkaTabl
size_t max_block_size,
unsigned num_streams) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemKafkaTables(const StorageID & table_id_);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemMaterializedMySQL.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class StorageSystemMaterializedMySQL : public shared_ptr_helper<StorageSystemMat
size_t /*max_block_size*/,
unsigned /*num_streams*/) override;

bool isSystemStorage() const override { return true; }

protected:
explicit StorageSystemMaterializedMySQL(const StorageID & table_id_);
};
Expand Down

0 comments on commit cc4e467

Please sign in to comment.