Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ CONF_Bool(enable_split_rowset_meta_pb, "true");
CONF_Int32(split_rowset_meta_pb_size, "10000"); // split rowset meta pb size, default is 10K
CONF_Bool(enable_split_tablet_schema_pb, "false");
CONF_Int32(split_tablet_schema_pb_size, "10000"); // split tablet schema pb size, default is 10K
CONF_Bool(enable_check_fe_drop_in_safe_time, "true");
CONF_mBool(enable_check_fe_drop_in_safe_time, "true");

CONF_Bool(enable_logging_for_single_version_reading, "false");
CONF_mBool(enable_logging_conflict_keys, "false");
Expand Down
6 changes: 6 additions & 0 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ def get_default_named_ports(self):
def node_type(self):
return Node.TYPE_FDB

def docker_env(self):
envs = super().docker_env()
for key, value in self.cluster.cloud_store_config.items():
envs[key] = value
return envs

def expose_sub_dirs(self):
return super().expose_sub_dirs() + ["data"]

Expand Down
45 changes: 41 additions & 4 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,14 @@ def add_parser(self, args_parsers):
"Add custom host-to-IP mappings (host:ip). For example: --extra-hosts myhost1:192.168.10.1 myhost2:192.168.10.2 . Only use when creating new cluster."
)

parser.add_argument(
"--cloud-config",
nargs="*",
type=str,
help=
"Override cloud store config values. For example: --cloud-config DORIS_CLOUD_AK=xxx DORIS_CLOUD_BUCKET=yyy. Only use when creating new cloud cluster."
)

parser.add_argument("--coverage-dir",
default="",
help="Set code coverage output directory")
Expand Down Expand Up @@ -490,7 +498,7 @@ def add_parser(self, args_parsers):
parser.add_argument(
"--fdb-version",
type=str,
default="7.1.26",
default="7.3.69",
help="fdb image version. Only use in cloud cluster.")

parser.add_argument(
Expand Down Expand Up @@ -590,7 +598,9 @@ def run(self, args):
args.add_recycle_num = 1
if not args.be_cluster:
args.be_cluster = "compute_cluster"
cloud_store_config = self._get_cloud_store_config()
cloud_store_config = self._merge_cloud_store_config(
self._get_cloud_store_config(), args.cloud_config
)
else:
args.add_ms_num = 0
args.add_recycle_num = 0
Expand Down Expand Up @@ -790,8 +800,7 @@ def do_add_node(node_type, add_num, add_ids):
except Exception as e:
LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
if is_new_cluster:
cloud_store_config = self._get_cloud_store_config()
db_mgr.create_default_storage_vault(cloud_store_config)
db_mgr.create_default_storage_vault(cluster.cloud_store_config)

if not cluster.is_host_network():
wait_service(True, args.wait_timeout, cluster, add_fe_ids,
Expand Down Expand Up @@ -867,6 +876,34 @@ def _get_cloud_store_config(self):
format(key, CLUSTER.CLOUD_CFG_FILE))
return config

@staticmethod
def _merge_cloud_store_config(base_config, overrides):
if not overrides:
return base_config

merged = dict(base_config)
for item in overrides:
pos = item.find('=')
if pos <= 0:
raise Exception(
"cloud config override '{}' error format, should be like 'name=value'".
format(item)
)
key = item[:pos].strip()
value = item[pos + 1:].strip()
if not key or not value:
raise Exception(
"cloud config override '{}' error format, should be like 'name=value'".
format(item)
)
if key not in merged:
raise Exception(
"Unknown cloud config override '{}', available keys: {}".
format(key, ", ".join(sorted(merged.keys())))
)
merged[key] = value
return merged


class DownCommand(Command):

Expand Down
4 changes: 3 additions & 1 deletion docker/runtime/doris-compose/resource/fdb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ cache-memory = 200MiB
[fdbserver.4500]

[backup_agent]
command = /usr/bin/backup_agent
command = /usr/bin/backup_agent --blob-credentials /opt/apache-doris/fdb/conf/blob_creds.json
logdir = /opt/apache-doris/fdb/log

[backup_agent.1]
19 changes: 19 additions & 0 deletions docker/runtime/doris-compose/resource/init_fdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ init_db() {
return
fi

if [ -n "$DORIS_CLOUD_AK" ] && [ -n "$DORIS_CLOUD_SK" ] && [ -n "$DORIS_CLOUD_ENDPOINT" ]; then
BLOB_CREDS_FILE="${DORIS_HOME}/conf/blob_creds.json"
if [ "$DORIS_CLOUD_PROVIDER" = "OSS" ] || [ "$DORIS_CLOUD_PROVIDER" = "COS" ]; then
AK_WITH_ENDPOINT="${DORIS_CLOUD_AK}@${DORIS_CLOUD_BUCKET}.${DORIS_CLOUD_ENDPOINT}"
else
AK_WITH_ENDPOINT="${DORIS_CLOUD_AK}@${DORIS_CLOUD_ENDPOINT}"
fi
cat > $BLOB_CREDS_FILE << EOF
{
"accounts" : {
"${AK_WITH_ENDPOINT}" : {
"secret" : "${DORIS_CLOUD_SK}"
}
}
}
EOF
health_log "Created blob_creds.json"
fi

for ((i = 0; i < 10; i++)); do
/usr/bin/fdbcli -C ${DORIS_HOME}/conf/fdb.cluster --exec 'configure new single ssd'
if [ $? -eq 0 ]; then
Expand Down
29 changes: 27 additions & 2 deletions docker/runtime/doris-compose/resource/init_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,34 @@ run_fe() {
}

start_cloud_fe() {
if [ -f "$REGISTER_FILE" ]; then
RECOVERY_SCRIPT="${DORIS_HOME}/conf/restore_snapshot.sh"
RECOVERY_ARGS=""
if [ -f "$RECOVERY_SCRIPT" ]; then
JOURNAL_ID=$(grep '^JOURNAL_ID=' "$RECOVERY_SCRIPT" | head -1 | cut -d= -f2)
if [ -z "$JOURNAL_ID" ]; then
health_log "ERROR: Could not extract JOURNAL_ID from recovery script"
exit 1
fi
health_log "Found recovery script with JOURNAL_ID=$JOURNAL_ID, executing..."
bash "$RECOVERY_SCRIPT"
RECOVERY_RES=$?
if [ $RECOVERY_RES -ne 0 ]; then
health_log "ERROR: Recovery script failed with exit code $RECOVERY_RES"
exit $RECOVERY_RES
fi
mv "$RECOVERY_SCRIPT" "${RECOVERY_SCRIPT}.bak"
MV_RES=$?
if [ $MV_RES -ne 0 ]; then
health_log "ERROR: Failed to rename recovery script to ${RECOVERY_SCRIPT}.bak"
exit $MV_RES
fi
health_log "Recovery script executed and renamed to ${RECOVERY_SCRIPT}.bak"
RECOVERY_ARGS="--metadata_failure_recovery --recovery_journal_id $JOURNAL_ID"
fi

if [ -f "$REGISTER_FILE" ] || [ -n "$RECOVERY_ARGS" ]; then
fe_daemon &
run_fe
run_fe $RECOVERY_ARGS
return
fi

Expand Down
51 changes: 51 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,7 @@ private void checkLowerCaseTableNames() {
private void checkCurrentNodeExist() {
boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
if (metadataFailureRecovery) {
updateRecoveryFrontendHostIfNeeded();
return;
}

Expand All @@ -2110,6 +2111,56 @@ private void checkCurrentNodeExist() {
}
}

// During backup-restore recovery, the restored metadata may contain FE entries with old IP
// addresses from the source cluster. Find the FE entry by node name and update its host to
// the current node's actual address. This must run before CloudClusterChecker starts to
// prevent it from dropping the only FE and leaving the BDB group empty.
private void updateRecoveryFrontendHostIfNeeded() {
if (Config.isNotCloudMode()) {
return;
}
Frontend selfFe = frontends.get(nodeName);
if (selfFe == null) {
LOG.error("Recovery mode: frontend with node name '{}' not found in metadata. "
+ "Available frontends: {}. Will exit.", nodeName, frontends.keySet());
System.exit(-1);
}

if (selfFe.getRole() != role) {
LOG.error("Recovery mode: role mismatch for frontend '{}': expected={}, actual={}. Will exit.",
nodeName, role, selfFe.getRole());
System.exit(-1);
}

if (selfFe.getHost().equals(selfNode.getHost()) && selfFe.getEditLogPort() == selfNode.getPort()) {
LOG.info("Recovery mode: frontend '{}' already has correct address {}:{}",
nodeName, selfNode.getHost(), selfNode.getPort());
return;
}

if (selfFe.getEditLogPort() != selfNode.getPort()) {
LOG.error("Recovery mode: edit_log_port mismatch for frontend '{}': restored={}, current={}. "
+ "Port migration during recovery is not supported. Will exit.",
nodeName, selfFe.getEditLogPort(), selfNode.getPort());
System.exit(-1);
}

Frontend conflicting = checkFeExist(selfNode.getHost(), selfNode.getPort());
if (conflicting != null && !conflicting.getNodeName().equals(nodeName)) {
LOG.error("Recovery mode: another frontend '{}' already has address {}:{}. "
+ "Cannot update frontend '{}' to this address. Will exit.",
conflicting.getNodeName(), selfNode.getHost(), selfNode.getPort(), nodeName);
System.exit(-1);
}

LOG.info("Recovery mode: updating frontend '{}' host from {} to {} to match current node",
nodeName, selfFe.getHost(), selfNode.getHost());
selfFe.setHost(selfNode.getHost());

editLog.logModifyFrontend(selfFe);
LOG.info("Recovery mode: frontend host update persisted to edit log");
}

private void checkBeExecVersion() {
if (Config.be_exec_version < Config.min_be_exec_version
|| Config.be_exec_version > Config.max_be_exec_version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ClusterOptions {
// just as `docker run --add-host myhost:192.168.10.10` do.
List<String> extraHosts = []

// cloud store overrides for cloud docker clusters, each item should be 'name=value'
List<String> cloudStoreConfigs = []

boolean connectToFollower = false

// 1. cloudMode = true, only create cloud cluster.
Expand Down Expand Up @@ -353,6 +356,10 @@ class SuiteCluster {
cmd += ['--extra-hosts']
cmd += options.extraHosts
}
if (!options.cloudStoreConfigs.isEmpty()) {
cmd += ['--cloud-config']
cmd += options.cloudStoreConfigs
}
if (config.dockerCoverageOutputDir != null && config.dockerCoverageOutputDir != '') {
cmd += ['--coverage-dir', config.dockerCoverageOutputDir]
}
Expand Down Expand Up @@ -595,6 +602,18 @@ class SuiteCluster {
runBackendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices)
}

// indices start from 1, not 0
// if not specific meta-service indices, then start all meta services
void startMetaServices(int... indices) {
runMsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices)
}

// indices start from 1, not 0
// if not specific recycler indices, then start all recyclers
void startRecyclers(int... indices) {
runRecyclerCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices)
}

// indices start from 1, not 0
// if not specific fe indices, then stop all frontends
void stopFrontends(int... indices) {
Expand All @@ -609,6 +628,20 @@ class SuiteCluster {
waitHbChanged()
}

// indices start from 1, not 0
// if not specific meta-service indices, then stop all meta services
void stopMetaServices(int... indices) {
runMsCmd(STOP_WAIT_TIMEOUT + 5, "stop --wait-timeout ${STOP_WAIT_TIMEOUT}".toString(), indices)
waitHbChanged()
}

// indices start from 1, not 0
// if not specific recycler indices, then stop all recyclers
void stopRecyclers(int... indices) {
runRecyclerCmd(STOP_WAIT_TIMEOUT + 5, "stop --wait-timeout ${STOP_WAIT_TIMEOUT}".toString(), indices)
waitHbChanged()
}

// indices start from 1, not 0
// if not specific fe indices, then restart all frontends
void restartFrontends(int... indices) {
Expand Down
Loading