Skip to content

Commit

Permalink
Merge pull request #1120 from ByConity/cherry-pick3
Browse files Browse the repository at this point in the history
Daily cherry-pick from internal
  • Loading branch information
dmthuc committed Jan 30, 2024
2 parents 6c24d54 + 46fee9a commit 333a3d9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
16 changes: 10 additions & 6 deletions src/CloudServices/CnchServerServiceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
#include <CloudServices/CnchDataWriter.h>
#include <CloudServices/DedupWorkerManager.h>
#include <CloudServices/DedupWorkerStatus.h>
#include <CloudServices/CnchBGThreadsMap.h>
#include <Catalog/CatalogUtils.h>
#include <WorkerTasks/ManipulationType.h>
#include <Storages/Kafka/CnchKafkaConsumeManager.h>
#include <Storages/PartCacheManager.h>
#include <Access/AccessControlManager.h>
#include <Access/KVAccessStorage.h>


#if USE_MYSQL
#include <Storages/StorageMaterializeMySQL.h>
#include <Databases/MySQL/MaterializedMySQLSyncThreadManager.h>
Expand Down Expand Up @@ -659,7 +661,7 @@ void CnchServerServiceImpl::getBackgroundThreadStatus(
RPCHelpers::serviceHandler(
done,
response,
[request = request, response = response, done = done, log = log] {
[request = request, response = response, done = done, global_context = getContext(), log = log] {
brpc::ClosureGuard done_guard(done);

try
Expand All @@ -669,10 +671,8 @@ void CnchServerServiceImpl::getBackgroundThreadStatus(
auto type = CnchBGThreadType(request->type());
if (type >= CnchBGThreadType::ServerMinType && type <= CnchBGThreadType::ServerMaxType)
{
#if 0
auto threads = global_context.getCnchBGThreads(type);
auto threads = global_context->getCnchBGThreadsMap(type);
res = threads->getStatusMap();
#endif
}
else
{
Expand Down Expand Up @@ -703,7 +703,7 @@ void CnchServerServiceImpl::getNumBackgroundThreads(
{
}
void CnchServerServiceImpl::controlCnchBGThread(
google::protobuf::RpcController * /*cntl*/,
google::protobuf::RpcController * cntl,
const Protos::ControlCnchBGThreadReq * request,
Protos::ControlCnchBGThreadResp * response,
google::protobuf::Closure * done)
Expand All @@ -712,7 +712,7 @@ void CnchServerServiceImpl::controlCnchBGThread(
RPCHelpers::serviceHandler(
done,
response,
[request = request, response = response, done = done, & global_context = *context_ptr, log = log] {
[cntl = cntl, request = request, response = response, done = done, & global_context = *context_ptr, log = log] {
brpc::ClosureGuard done_guard(done);

try
Expand All @@ -722,6 +722,10 @@ void CnchServerServiceImpl::controlCnchBGThread(
storage_id = RPCHelpers::createStorageID(request->storage_id());
auto type = CnchBGThreadType(request->type());
auto action = CnchBGThreadAction(request->action());
auto & controller = static_cast<brpc::Controller &>(*cntl);
LOG_DEBUG(log, "Received controlBGThread for {} type {} action {} from {}",
storage_id.empty() ? "empty storage" : storage_id.getNameForLogs(),
toString(type), toString(action), butil::endpoint2str(controller.remote_side()).c_str());
global_context.controlCnchBGThread(storage_id, type, action);
}
catch (...)
Expand Down
13 changes: 12 additions & 1 deletion src/DaemonManager/DaemonJobServerBGThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ std::map<String, UInt64> fetchServerStartTimes(Context & context, CnchTopologyMa
String rpc_address = host_port.getRPCAddress();
CnchServerClientPtr client_ptr = context.getCnchServerClientPool().get(host_port);
if (!client_ptr)
{
LOG_WARNING(log, "Not able to connect to server with rpc address {}", rpc_address);
continue;
}

try
{
Expand All @@ -218,7 +221,7 @@ std::map<String, UInt64> fetchServerStartTimes(Context & context, CnchTopologyMa
}
catch (...)
{
LOG_INFO(log, "Failed to reach server: {}", rpc_address);
LOG_INFO(log, "Failed to reach server with rpc address: {}", rpc_address);
}
}

Expand Down Expand Up @@ -553,8 +556,12 @@ std::optional<std::unordered_multimap<UUID, BGJobInfoFromServer>> fetchBGThreadF
{
CnchServerClientPtr client_ptr = context.getCnchServerClientPool().get(server);
if (!client_ptr)
{
LOG_ERROR(log, "Not able to connect to server with address {}", server);
return {};
}
auto tasks = client_ptr->getBackGroundStatus(type);
LOG_DEBUG(log, "Get {} jobs from server with address {}", tasks.size(), server);
for (const auto & task : tasks)
{
StorageID storage_id = RPCHelpers::createStorageID(task.storage_id());
Expand Down Expand Up @@ -1039,10 +1046,14 @@ BackgroundJobs DaemonJobServerBGThread::fetchCnchBGThreadStatus()
for (const auto & cnch_server : cnch_servers)
{
if (!cnch_server)
{
LOG_WARNING(log, "Not able to connect to server with address {}", cnch_server->getRPCAddress());
continue;
}

server_start_times[cnch_server->getRPCAddress()] = cnch_server->getServerStartTime();
auto tasks = cnch_server->getBackGroundStatus(type);
LOG_DEBUG(log, "Get {} jobs from server with address {}", tasks.size(), cnch_server->getRPCAddress());
for (const auto & task : tasks)
{
StorageID storage_id = RPCHelpers::createStorageID(task.storage_id());
Expand Down
10 changes: 2 additions & 8 deletions src/Databases/DatabaseCnch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,11 @@ void DatabaseCnch::drop(ContextPtr local_context)
throw Exception("Cnch transaction is not initialized", ErrorCodes::CNCH_TRANSACTION_NOT_INITIALIZED);

// get the lock of tables in current database
std::vector<StoragePtr> tables_to_drop;
std::vector<IntentLockPtr> locks;

for (auto iterator = getTablesIterator(getContext(), [](const String &) { return true; }); iterator->isValid(); iterator->next())
for (auto iterator = getTablesIteratorLightweight(getContext(), [](const String &) { return true; }); iterator->isValid(); iterator->next())
{
StoragePtr table = iterator->table();
if (!table)
continue;
const auto & storage_id = table->getStorageID();
locks.emplace_back(txn->createIntentLock(IntentLock::TB_LOCK_PREFIX, storage_id.database_name, storage_id.table_name));
tables_to_drop.emplace_back(table);
locks.emplace_back(txn->createIntentLock(IntentLock::TB_LOCK_PREFIX, database_name, iterator->name()));
}

for (const auto & lock : locks)
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4870,6 +4870,8 @@ UInt64 Context::tryGetTimestamp(const String & pretty_func_name) const
}
catch (...)
{
if (!getConfigRef().getBool("tso_service.use_fallback", true))
throw;
tryLogCurrentException(
pretty_func_name.c_str(), fmt::format("Unable to reach TSO from {} during call to tryGetTimestamp", tryGetTSOLeaderHostPort()));
return TxnTimestamp::fallbackTS();
Expand Down

0 comments on commit 333a3d9

Please sign in to comment.