Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]support export on cn when run on shared_data mode (backport #31208) #34018

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.ExportJob.JobState;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.task.ExportExportingTask;
import com.starrocks.task.ExportPendingTask;
import com.starrocks.task.LeaderTaskExecutor;
Expand Down Expand Up @@ -171,25 +171,25 @@ private boolean checkJobNeedCancel(ExportJob job) {

boolean beHasErr = false;
String errMsg = "";
for (Long beId : job.getBeStartTimeMap().keySet()) {
Backend be = GlobalStateMgr.getCurrentSystemInfo().getBackend(beId);
if (null == be) {
for (Long nodeId : job.getBeStartTimeMap().keySet()) {
ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNode(nodeId);
if (null == node) {
// The current implementation, if the be in the job is not found,
// the job will be cancelled
beHasErr = true;
errMsg = "be not found during exec export job. be:" + beId;
errMsg = "Be or cn not found during exec export job. Node:" + nodeId;
LOG.warn(errMsg + " job: {}", job);
break;
}
if (!be.isAlive()) {
if (!node.isAlive()) {
beHasErr = true;
errMsg = "be not alive during exec export job. be:" + beId;
errMsg = "Be or cn not alive during exec export job. Node:" + nodeId;
LOG.warn(errMsg + " job: {}", job);
break;
}
if (be.getLastStartTime() > job.getBeStartTimeMap().get(beId)) {
if (node.getLastStartTime() > job.getBeStartTimeMap().get(nodeId)) {
beHasErr = true;
errMsg = "be has rebooted during exec export job. be:" + beId;
errMsg = "Be or cn has rebooted during exec export job. Node:" + nodeId;
LOG.warn(errMsg + " job: {}", job);
break;
}
Expand Down
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import com.starrocks.sql.ast.LoadStmt;
import com.starrocks.sql.ast.PartitionNames;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.task.AgentClient;
import com.starrocks.thrift.TAgentResult;
import com.starrocks.thrift.THdfsProperties;
Expand Down Expand Up @@ -720,6 +721,7 @@ public Status releaseSnapshotPaths() {
TNetworkAddress address = snapshotPath.first;
String host = address.getHostname();
int port = address.getPort();

Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
continue;
Expand Down Expand Up @@ -752,8 +754,8 @@ public Status releaseMetadataLocks() {
TNetworkAddress address = location.getServer();
String host = address.getHostname();
int port = address.getPort();
Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNodeWithBePort(host, port);
if (!GlobalStateMgr.getCurrentSystemInfo().checkNodeAvailable(node)) {
continue;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,13 @@ public boolean checkBackendAvailable(long backendId) {
return backend != null && backend.isAvailable();
}

public boolean checkNodeAvailable(ComputeNode node) {
if (node instanceof Backend) {
return node != null && node.isAvailable();
}
return node != null && node.isAlive();
}

public boolean checkBackendAlive(long backendId) {
Backend backend = idToBackendRef.get(backendId);
return backend != null && backend.isAlive();
Expand Down Expand Up @@ -616,6 +623,14 @@ public Backend getBackendWithBePort(String host, int bePort) {
return null;
}

public ComputeNode getBackendOrComputeNodeWithBePort(String host, int bePort) {
ComputeNode node = getBackendWithBePort(host, bePort);
if (node == null) {
node = getComputeNodeWithBePort(host, bePort);
}
return node;
}

public List<Backend> getBackendOnlyWithHost(String host) {
List<Backend> resultBackends = new ArrayList<>();
for (Backend backend : idToBackendRef.values()) {
Expand Down
18 changes: 8 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.starrocks.rpc.BrpcProxy;
import com.starrocks.rpc.LakeService;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TAgentResult;
import com.starrocks.thrift.TInternalScanRange;
import com.starrocks.thrift.TNetworkAddress;
Expand Down Expand Up @@ -124,18 +124,16 @@ private Status makeSnapshots() {
TNetworkAddress address = location.getServer();
String host = address.getHostname();
int port = address.getPort();
Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNodeWithBePort(host, port);
if (!GlobalStateMgr.getCurrentSystemInfo().checkNodeAvailable(node)) {
return Status.CANCELLED;
}
long backendId = backend.getId();
if (!GlobalStateMgr.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
return Status.CANCELLED;
}
this.job.setBeStartTime(backendId, backend.getLastStartTime());

long nodeId = node.getId();
this.job.setBeStartTime(nodeId, node.getLastStartTime());
Status status;
if (job.exportLakeTable()) {
status = lockTabletMetadata(internalScanRange, backend);
status = lockTabletMetadata(internalScanRange, node);
} else {
status = makeSnapshot(internalScanRange, address);
}
Expand All @@ -147,7 +145,7 @@ private Status makeSnapshots() {
return Status.OK;
}

private Status lockTabletMetadata(TInternalScanRange internalScanRange, Backend backend) {
private Status lockTabletMetadata(TInternalScanRange internalScanRange, ComputeNode backend) {
try {
LakeService lakeService = BrpcProxy.getLakeService(backend.getHost(), backend.getBrpcPort());
LockTabletMetadataRequest request = new LockTabletMetadataRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.starrocks.common.UserException;
import com.starrocks.load.ExportJob.JobState;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import mockit.Mock;
import mockit.MockUp;
Expand Down Expand Up @@ -44,7 +45,7 @@ public synchronized void cancel(ExportFailMsg.CancelType type, String msg) throw

new MockUp<SystemInfoService>() {
@Mock
public Backend getBackend(long backendId) {
public ComputeNode getBackendOrComputeNode(long backendId) {
return be;
}
};
Expand Down Expand Up @@ -79,7 +80,7 @@ public Backend getBackend(long backendId) {

new MockUp<SystemInfoService>() {
@Mock
public Backend getBackend(long backendId) {
public ComputeNode getBackendOrComputeNode(long backendId) {
return null;
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load;

import com.starrocks.common.Status;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.task.ExportPendingTask;
import com.starrocks.thrift.TInternalScanRange;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TScanRange;
import com.starrocks.thrift.TScanRangeLocation;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TStatusCode;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;

public class ExportPendingTaskTest {

@Test
public void testMakeSnapshots(@Mocked ExportJob job) throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {

ComputeNode node = new ComputeNode(4L, "127.0.0.1", 12345);

new MockUp<SystemInfoService>() {
@Mock
public ComputeNode getBackendOrComputeNode(long backendId) {
return node;
}

@Mock
public ComputeNode getBackendOrComputeNodeWithBePort(String host, int bePort) {
return node;
}
};

TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress("127.0.0.1", 12346));
scanRangeLocations.addToLocations(scanRangeLocation);
TScanRange scanRange = new TScanRange();
scanRange.setInternal_scan_range(new TInternalScanRange());
scanRangeLocations.setScan_range(scanRange);

new Expectations() {
{
job.getTabletLocations();
result = Collections.singletonList(scanRangeLocations);
minTimes = 0;

job.exportLakeTable();
result = true;
minTimes = 0;
}
};

ExportPendingTask task = new ExportPendingTask(job);

Method method = ExportPendingTask.class.getDeclaredMethod("makeSnapshots");
method.setAccessible(true);

Status status = (Status) method.invoke(task, null);
Assert.assertEquals(Status.CANCELLED, status);

node.setAlive(true);
status = (Status) method.invoke(task, null);
Assert.assertEquals(TStatusCode.CANCELLED, status.getErrorCode());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ public void testGetBackendOrComputeNode() {
Backend be = new Backend(10001, "host1", 1000);
service.addBackend(be);
ComputeNode cn = new ComputeNode(10002, "host2", 1000);
cn.setBePort(1001);
service.addComputeNode(cn);

Assert.assertEquals(be, service.getBackendOrComputeNode(be.getId()));
Assert.assertEquals(cn, service.getBackendOrComputeNode(cn.getId()));
Assert.assertNull(service.getBackendOrComputeNode(/* Not Exist */ 100));

Assert.assertEquals(cn, service.getBackendOrComputeNodeWithBePort("host2", 1001));
Assert.assertFalse(service.checkNodeAvailable(cn));
Assert.assertFalse(service.checkNodeAvailable(be));

List<ComputeNode> nodes = service.backendAndComputeNodeStream().collect(Collectors.toList());
Assert.assertEquals(2, nodes.size());
Assert.assertEquals(be, nodes.get(0));
Expand Down