diff --git a/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java b/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java index 942bde443f422..be2679afc77e6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java @@ -40,7 +40,7 @@ import com.starrocks.common.util.FrontendDaemon; import com.starrocks.load.ExportJob.JobState; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; import com.starrocks.task.ExportExportingTask; import com.starrocks.task.ExportPendingTask; import com.starrocks.task.LeaderTaskExecutor; @@ -171,25 +171,25 @@ private boolean checkJobNeedCancel(ExportJob job) { boolean beHasErr = false; String errMsg = ""; - for (Long beId : job.getBeStartTimeMap().keySet()) { - Backend be = GlobalStateMgr.getCurrentSystemInfo().getBackend(beId); - if (null == be) { + for (Long nodeId : job.getBeStartTimeMap().keySet()) { + ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNode(nodeId); + if (null == node) { // The current implementation, if the be in the job is not found, // the job will be cancelled beHasErr = true; - errMsg = "be not found during exec export job. be:" + beId; + errMsg = "Be or cn not found during exec export job. Node:" + nodeId; LOG.warn(errMsg + " job: {}", job); break; } - if (!be.isAlive()) { + if (!node.isAlive()) { beHasErr = true; - errMsg = "be not alive during exec export job. be:" + beId; + errMsg = "Be or cn not alive during exec export job. Node:" + nodeId; LOG.warn(errMsg + " job: {}", job); break; } - if (be.getLastStartTime() > job.getBeStartTimeMap().get(beId)) { + if (node.getLastStartTime() > job.getBeStartTimeMap().get(nodeId)) { beHasErr = true; - errMsg = "be has rebooted during exec export job. be:" + beId; + errMsg = "Be or cn has rebooted during exec export job. Node:" + nodeId; LOG.warn(errMsg + " job: {}", job); break; } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java b/fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java index 03f06c9ad7798..288b86b49687f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java @@ -88,6 +88,7 @@ import com.starrocks.sql.ast.LoadStmt; import com.starrocks.sql.ast.PartitionNames; import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; import com.starrocks.task.AgentClient; import com.starrocks.thrift.TAgentResult; import com.starrocks.thrift.THdfsProperties; @@ -720,6 +721,7 @@ public Status releaseSnapshotPaths() { TNetworkAddress address = snapshotPath.first; String host = address.getHostname(); int port = address.getPort(); + Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackendWithBePort(host, port); if (backend == null) { continue; @@ -752,8 +754,8 @@ public Status releaseMetadataLocks() { TNetworkAddress address = location.getServer(); String host = address.getHostname(); int port = address.getPort(); - Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackendWithBePort(host, port); - if (backend == null) { + ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNodeWithBePort(host, port); + if (!GlobalStateMgr.getCurrentSystemInfo().checkNodeAvailable(node)) { continue; } try { diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index de9a7e6bd3926..125f830049350 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -509,6 +509,13 @@ public boolean checkBackendAvailable(long backendId) { return backend != null && backend.isAvailable(); } + public boolean checkNodeAvailable(ComputeNode node) { + if (node instanceof Backend) { + return node != null && node.isAvailable(); + } + return node != null && node.isAlive(); + } + public boolean checkBackendAlive(long backendId) { Backend backend = idToBackendRef.get(backendId); return backend != null && backend.isAlive(); @@ -616,6 +623,14 @@ public Backend getBackendWithBePort(String host, int bePort) { return null; } + public ComputeNode getBackendOrComputeNodeWithBePort(String host, int bePort) { + ComputeNode node = getBackendWithBePort(host, bePort); + if (node == null) { + node = getComputeNodeWithBePort(host, bePort); + } + return node; + } + public List getBackendOnlyWithHost(String host) { List resultBackends = new ArrayList<>(); for (Backend backend : idToBackendRef.values()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java b/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java index 8d087b4fc08f3..c06db2aa53f64 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java @@ -43,7 +43,7 @@ import com.starrocks.rpc.BrpcProxy; import com.starrocks.rpc.LakeService; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; import com.starrocks.thrift.TAgentResult; import com.starrocks.thrift.TInternalScanRange; import com.starrocks.thrift.TNetworkAddress; @@ -124,18 +124,16 @@ private Status makeSnapshots() { TNetworkAddress address = location.getServer(); String host = address.getHostname(); int port = address.getPort(); - Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackendWithBePort(host, port); - if (backend == null) { + ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNodeWithBePort(host, port); + if (!GlobalStateMgr.getCurrentSystemInfo().checkNodeAvailable(node)) { return Status.CANCELLED; } - long backendId = backend.getId(); - if (!GlobalStateMgr.getCurrentSystemInfo().checkBackendAvailable(backendId)) { - return Status.CANCELLED; - } - this.job.setBeStartTime(backendId, backend.getLastStartTime()); + + long nodeId = node.getId(); + this.job.setBeStartTime(nodeId, node.getLastStartTime()); Status status; if (job.exportLakeTable()) { - status = lockTabletMetadata(internalScanRange, backend); + status = lockTabletMetadata(internalScanRange, node); } else { status = makeSnapshot(internalScanRange, address); } @@ -147,7 +145,7 @@ private Status makeSnapshots() { return Status.OK; } - private Status lockTabletMetadata(TInternalScanRange internalScanRange, Backend backend) { + private Status lockTabletMetadata(TInternalScanRange internalScanRange, ComputeNode backend) { try { LakeService lakeService = BrpcProxy.getLakeService(backend.getHost(), backend.getBrpcPort()); LockTabletMetadataRequest request = new LockTabletMetadataRequest(); diff --git a/fe/fe-core/src/test/java/com/starrocks/load/ExportCheckerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/ExportCheckerTest.java index 4c315dc3cc059..1de09b09be0a4 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/ExportCheckerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/ExportCheckerTest.java @@ -17,6 +17,7 @@ import com.starrocks.common.UserException; import com.starrocks.load.ExportJob.JobState; import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; import com.starrocks.system.SystemInfoService; import mockit.Mock; import mockit.MockUp; @@ -44,7 +45,7 @@ public synchronized void cancel(ExportFailMsg.CancelType type, String msg) throw new MockUp() { @Mock - public Backend getBackend(long backendId) { + public ComputeNode getBackendOrComputeNode(long backendId) { return be; } }; @@ -79,7 +80,7 @@ public Backend getBackend(long backendId) { new MockUp() { @Mock - public Backend getBackend(long backendId) { + public ComputeNode getBackendOrComputeNode(long backendId) { return null; } }; diff --git a/fe/fe-core/src/test/java/com/starrocks/load/ExportPendingTaskTest.java b/fe/fe-core/src/test/java/com/starrocks/load/ExportPendingTaskTest.java new file mode 100644 index 0000000000000..bfb3ad2054bf6 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/load/ExportPendingTaskTest.java @@ -0,0 +1,90 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load; + +import com.starrocks.common.Status; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.SystemInfoService; +import com.starrocks.task.ExportPendingTask; +import com.starrocks.thrift.TInternalScanRange; +import com.starrocks.thrift.TNetworkAddress; +import com.starrocks.thrift.TScanRange; +import com.starrocks.thrift.TScanRangeLocation; +import com.starrocks.thrift.TScanRangeLocations; +import com.starrocks.thrift.TStatusCode; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collections; + +public class ExportPendingTaskTest { + + @Test + public void testMakeSnapshots(@Mocked ExportJob job) throws NoSuchMethodException, + InvocationTargetException, IllegalAccessException { + + ComputeNode node = new ComputeNode(4L, "127.0.0.1", 12345); + + new MockUp() { + @Mock + public ComputeNode getBackendOrComputeNode(long backendId) { + return node; + } + + @Mock + public ComputeNode getBackendOrComputeNodeWithBePort(String host, int bePort) { + return node; + } + }; + + TScanRangeLocations scanRangeLocations = new TScanRangeLocations(); + TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress("127.0.0.1", 12346)); + scanRangeLocations.addToLocations(scanRangeLocation); + TScanRange scanRange = new TScanRange(); + scanRange.setInternal_scan_range(new TInternalScanRange()); + scanRangeLocations.setScan_range(scanRange); + + new Expectations() { + { + job.getTabletLocations(); + result = Collections.singletonList(scanRangeLocations); + minTimes = 0; + + job.exportLakeTable(); + result = true; + minTimes = 0; + } + }; + + ExportPendingTask task = new ExportPendingTask(job); + + Method method = ExportPendingTask.class.getDeclaredMethod("makeSnapshots"); + method.setAccessible(true); + + Status status = (Status) method.invoke(task, null); + Assert.assertEquals(Status.CANCELLED, status); + + node.setAlive(true); + status = (Status) method.invoke(task, null); + Assert.assertEquals(TStatusCode.CANCELLED, status.getErrorCode()); + } +} + diff --git a/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java index 67c2d7ff7751b..c51793ec21916 100644 --- a/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java @@ -124,12 +124,17 @@ public void testGetBackendOrComputeNode() { Backend be = new Backend(10001, "host1", 1000); service.addBackend(be); ComputeNode cn = new ComputeNode(10002, "host2", 1000); + cn.setBePort(1001); service.addComputeNode(cn); Assert.assertEquals(be, service.getBackendOrComputeNode(be.getId())); Assert.assertEquals(cn, service.getBackendOrComputeNode(cn.getId())); Assert.assertNull(service.getBackendOrComputeNode(/* Not Exist */ 100)); + Assert.assertEquals(cn, service.getBackendOrComputeNodeWithBePort("host2", 1001)); + Assert.assertFalse(service.checkNodeAvailable(cn)); + Assert.assertFalse(service.checkNodeAvailable(be)); + List nodes = service.backendAndComputeNodeStream().collect(Collectors.toList()); Assert.assertEquals(2, nodes.size()); Assert.assertEquals(be, nodes.get(0));