diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index b07cdd3c5c6fa7..fc955cad388522 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -45,7 +45,6 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.planner.StreamLoadPlanner; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.transaction.AbortTransactionException; @@ -213,9 +212,7 @@ public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long this.clusterName = clusterName; this.dbId = dbId; this.tableId = tableId; - this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser()) - .append(ConnectContext.get().getRemoteIP()) - .append(id).append(System.currentTimeMillis()).toString().hashCode(); + this.authCode = 0; } protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { @@ -595,7 +592,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti .filter(entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); if (!routineLoadTaskInfoOptional.isPresent()) { throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" - + " while task " + txnState.getLabel() + "has been aborted "); + + " while task " + txnState.getLabel() + " has been aborted "); } } finally { readUnlock(); diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index 8524bf72259aa7..14a37479b96a34 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -719,7 +719,7 @@ public void rollEditLog() { */ private synchronized void logEdit(short op, Writable writable) { if (this.getNumEditStreams() == 0) { - LOG.error("Fatal Error : no editLog stream"); + LOG.error("Fatal Error : no editLog stream", new Exception()); throw new Error("Fatal Error : no editLog stream"); } diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index e078a6ace1f8f9..7a380c9ce245a8 100644 --- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -89,9 +89,9 @@ public static Catalog createTestCatalog() throws InstantiationException, Illegal Backend backend1 = createBackend(testBackendId1, "host1", 123, 124, 125); Backend backend2 = createBackend(testBackendId2, "host1", 123, 124, 125); Backend backend3 = createBackend(testBackendId3, "host1", 123, 124, 125); - catalog.getCurrentSystemInfo().addBackend(backend1); - catalog.getCurrentSystemInfo().addBackend(backend2); - catalog.getCurrentSystemInfo().addBackend(backend3); + Catalog.getCurrentSystemInfo().addBackend(backend1); + Catalog.getCurrentSystemInfo().addBackend(backend2); + Catalog.getCurrentSystemInfo().addBackend(backend3); catalog.initDefaultCluster(); Database db = createSimpleDb(testDbId1, testTableId1, testPartitionId1, testIndexId1, testTabletId1, testStartVersion, testStartVersionHash); diff --git a/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java index dcb74d4ac25fb7..62d5e3937b07e0 100644 --- a/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -21,6 +21,7 @@ import org.apache.doris.alter.SchemaChangeJob; import org.apache.doris.cluster.Cluster; import org.apache.doris.persist.EditLog; +import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.transaction.TransactionState; import java.util.HashMap; @@ -85,6 +86,11 @@ public void logStartSchemaChange(SchemaChangeJob schemaChangeJob) { public void logFinishingSchemaChange(SchemaChangeJob schemaChangeJob) { } + @Mock + public void logOpRoutineLoadJob(RoutineLoadOperation operation) { + + } + public TransactionState getTransaction(long transactionId) { return allTransactionState.get(transactionId); } diff --git a/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java b/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java index 8c4eca059d93ce..0457937892d1e4 100644 --- a/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java +++ b/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java @@ -24,6 +24,8 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.UserException; import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; @@ -71,6 +73,8 @@ public void testNormalPlan() throws UserException { TStreamLoadPutRequest request = new TStreamLoadPutRequest(); request.setTxnId(1); request.setLoadId(new TUniqueId(2, 3)); + request.setFileType(TFileType.FILE_STREAM); + request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN); StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, StreamLoadTask.fromTStreamLoadPutRequest(request)); planner.plan(); diff --git a/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java index 87093a6d4ef2b6..928b8f151e30c3 100644 --- a/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java +++ b/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java @@ -37,6 +37,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TStreamLoadPutRequest; @@ -72,6 +73,7 @@ public class StreamLoadScanNodeTest { TStreamLoadPutRequest getBaseRequest() { TStreamLoadPutRequest request = new TStreamLoadPutRequest(); request.setFileType(TFileType.FILE_STREAM); + request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN); return request; } @@ -251,7 +253,6 @@ public void testColumnsNormal() throws UserException, UserException { }; TStreamLoadPutRequest request = getBaseRequest(); - request.setFileType(TFileType.FILE_LOCAL); request.setColumns("k1,k2,v1, v2=k2"); StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, StreamLoadTask.fromTStreamLoadPutRequest(request)); @@ -300,7 +301,7 @@ public void testHllColumnsNormal() throws UserException { }; TStreamLoadPutRequest request = getBaseRequest(); - request.setFileType(TFileType.FILE_LOCAL); + request.setFileType(TFileType.FILE_STREAM); request.setColumns("k1,k2, v1=hll_hash(k2)"); StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, StreamLoadTask.fromTStreamLoadPutRequest(request)); @@ -330,21 +331,27 @@ public void testHllColumnsNoHllHash() throws UserException { } } - new Expectations() {{ - catalog.getFunction((Function) any, (Function.CompareMode) any); - result = new ScalarFunction(new FunctionName("hll_hash1"), Lists.newArrayList(), Type.BIGINT, false); - }}; + new Expectations() { + { + catalog.getFunction((Function) any, (Function.CompareMode) any); + result = new ScalarFunction(new FunctionName("hll_hash1"), Lists.newArrayList(), Type.BIGINT, false); + minTimes = 0; + } + }; new Expectations() { { dstTable.getColumn("k1"); result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get(); + minTimes = 0; dstTable.getColumn("k2"); result = null; + minTimes = 0; dstTable.getColumn("v1"); result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get(); + minTimes = 0; } }; @@ -522,15 +529,19 @@ public void testWhereBad() throws UserException, UserException { { dstTable.getColumn("k1"); result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get(); + minTimes = 0; dstTable.getColumn("k2"); result = columns.stream().filter(c -> c.getName().equals("k2")).findFirst().get(); + minTimes = 0; dstTable.getColumn("v1"); result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get(); + minTimes = 0; dstTable.getColumn("v2"); result = columns.stream().filter(c -> c.getName().equals("v2")).findFirst().get(); + minTimes = 0; } }; diff --git a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index c7e7609dc78650..a8f2b93320d741 100644 --- a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -527,7 +527,6 @@ public void testUse() throws Exception { EasyMock.expect(useStmt.getDatabase()).andReturn("testDb").anyTimes(); EasyMock.expect(useStmt.getRedirectStatus()).andReturn(RedirectStatus.NO_FORWARD).anyTimes(); EasyMock.expect(useStmt.getClusterName()).andReturn("testCluster").anyTimes(); - EasyMock.replay(useStmt); Symbol symbol = new Symbol(0, useStmt); @@ -535,9 +534,6 @@ public void testUse() throws Exception { EasyMock.expect(parser.parse()).andReturn(symbol).anyTimes(); EasyMock.replay(parser); - PowerMock.expectNew(SqlParser.class, EasyMock.isA(SqlScanner.class), EasyMock.anyString()).andReturn(parser); - PowerMock.replay(SqlParser.class); - StmtExecutor executor = new StmtExecutor(ctx, ""); executor.execute(); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 65f34a5513cf2b..2a9de39bd4b1e8 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -24,10 +24,8 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.FakeEditLog; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; @@ -44,11 +42,10 @@ import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadTaskInfo; import org.apache.doris.meta.MetaContext; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.persist.EditLog; import org.apache.doris.thrift.TKafkaRLTaskProgress; import org.apache.doris.thrift.TLoadSourceType; import org.apache.doris.thrift.TRLTaskTxnCommitAttachment; -import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; @@ -62,13 +59,12 @@ import org.junit.Test; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import mockit.Deencapsulation; -import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; @@ -84,7 +80,6 @@ public class GlobalTransactionMgrTest { private String transactionSource = "localfe"; - @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { @@ -114,7 +109,7 @@ public void testBeginTransaction() throws LabelAlreadyUsedException, AnalysisExc CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND); - TransactionState transactionState = fakeEditLog.getTransaction(transactionId); + TransactionState transactionState = masterTransMgr.getTransactionState(transactionId); assertNotNull(transactionState); assertEquals(transactionId, transactionState.getTransactionId()); assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus()); @@ -127,7 +122,6 @@ public void testBeginTransactionWithSameLabel() throws LabelAlreadyUsedException BeginTransactionException { FakeCatalog.setCatalog(masterCatalog); long transactionId = 0; - Throwable throwable = null; try { transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, @@ -138,7 +132,7 @@ public void testBeginTransactionWithSameLabel() throws LabelAlreadyUsedException } catch (LabelAlreadyUsedException e) { e.printStackTrace(); } - TransactionState transactionState = fakeEditLog.getTransaction(transactionId); + TransactionState transactionState = masterTransMgr.getTransactionState(transactionId); assertNotNull(transactionState); assertEquals(transactionId, transactionState.getTransactionId()); assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus()); @@ -239,7 +233,7 @@ public void testCommitTransactionWithOneFailed() throws MetaNotFoundException, transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId2, transTablets); - transactionState = fakeEditLog.getTransaction(transactionId2); + transactionState = masterTransMgr.getTransactionState(transactionId2); // check status is prepare, because the commit failed assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus()); // check replica version @@ -305,23 +299,30 @@ public void testCommitTransactionWithOneFailed() throws MetaNotFoundException, @Test public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tabletCommitInfo, - @Injectable Database database, - @Injectable KafkaTaskInfo routineLoadTaskInfo, - @Injectable TResourceInfo tResourceInfo, - @Injectable OlapTable olapTable, - @Mocked Catalog catalog, - @Mocked ConnectContext connectContext, - @Mocked KafkaConsumer kafkaConsumer) + @Mocked KafkaConsumer kafkaConsumer, + @Mocked EditLog editLog) throws MetaNotFoundException, TransactionException, DdlException { - List tabletCommitInfoList = new ArrayList<>(); - tabletCommitInfoList.add(tabletCommitInfo); + FakeCatalog.setCatalog(masterCatalog); + + TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); + TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId2); + TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId3); + List transTablets = Lists.newArrayList(); + transTablets.add(tabletCommitInfo1); + transTablets.add(tabletCommitInfo2); + transTablets.add(tabletCommitInfo3); KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic"); List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 0L); + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", partitionIdToOffset); + Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); - TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); + TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId()); transactionState.setTransactionStatus(TransactionStatus.PREPARE); - Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); + masterTransMgr.getListenerRegistry().register(routineLoadJob); + // Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); @@ -348,53 +349,43 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addRoutineLoadJob(routineLoadJob); - new Expectations() { - { - catalog.getDb(1L); - result = database; - routineLoadTaskInfo.getId(); - result = "label"; - catalog.getRoutineLoadManager(); - result = routineLoadManager; - database.getTable(anyLong); - result = olapTable; - routineLoadTaskInfo.getJobId(); - result = Deencapsulation.getField(routineLoadJob, "id"); - routineLoadTaskInfo.getPartitions(); - result = Lists.newArrayList().add(1); - } - }; Deencapsulation.setField(masterTransMgr, "idToTransactionState", idToTransactionState); - masterTransMgr.commitTransaction(1L, 1L, tabletCommitInfoList, txnCommitAttachment); + masterTransMgr.commitTransaction(1L, 1L, transTablets, txnCommitAttachment); - Assert.assertEquals(Integer.valueOf(100), Deencapsulation.getField(routineLoadJob, "currentTotalNum")); - Assert.assertEquals(Integer.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorNum")); - Assert.assertEquals(Long.valueOf(10L), ((KafkaProgress) routineLoadJob.getProgress()).getPartitionIdToOffset().get(1)); + Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); + Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); + Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) routineLoadJob.getProgress()).getPartitionIdToOffset().get(1)); // todo(ml): change to assert queue -// Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size()); -// Assert.assertNotEquals("label", routineLoadManager.getNeedScheduleTasksQueue().peek().getId()); - + // Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size()); + // Assert.assertNotEquals("label", routineLoadManager.getNeedScheduleTasksQueue().peek().getId()); } @Test public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommitInfo tabletCommitInfo, - @Injectable Database database, - @Injectable KafkaTaskInfo routineLoadTaskInfo, - @Injectable TResourceInfo tResourceInfo, - @Injectable OlapTable olapTable, - @Mocked Catalog catalog, - @Mocked ConnectContext connectContext, - @Mocked KafkaConsumer kafkaConsumer) + @Mocked EditLog editLog, + @Mocked KafkaConsumer kafkaConsumer) throws TransactionException, MetaNotFoundException, DdlException { - List tabletCommitInfoList = new ArrayList<>(); - tabletCommitInfoList.add(tabletCommitInfo); + + FakeCatalog.setCatalog(masterCatalog); + + TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); + TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId2); + TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId3); + List transTablets = Lists.newArrayList(); + transTablets.add(tabletCommitInfo1); + transTablets.add(tabletCommitInfo2); + transTablets.add(tabletCommitInfo3); KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic"); List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 0L); + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", partitionIdToOffset); + Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); - TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); + TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId()); transactionState.setTransactionStatus(TransactionStatus.PREPARE); - Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); + masterTransMgr.getListenerRegistry().register(routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); @@ -421,24 +412,15 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addRoutineLoadJob(routineLoadJob); - new Expectations() { - { - catalog.getDb(1L); - result = database; - routineLoadTaskInfo.getId(); - result = "label"; - database.getTable(anyLong); - result = olapTable; - } - }; Deencapsulation.setField(masterTransMgr, "idToTransactionState", idToTransactionState); - masterTransMgr.commitTransaction(1L, 1L, tabletCommitInfoList, txnCommitAttachment); + masterTransMgr.commitTransaction(1L, 1L, transTablets, txnCommitAttachment); - Assert.assertEquals(Integer.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalNum")); - Assert.assertEquals(Integer.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorNum")); - Assert.assertEquals(Long.valueOf(10L), ((KafkaProgress) routineLoadJob.getProgress()).getPartitionIdToOffset().get(1)); + Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); + Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); + Assert.assertEquals(Long.valueOf(11L), + ((KafkaProgress) routineLoadJob.getProgress()).getPartitionIdToOffset().get(1)); // todo(ml): change to assert queue -// Assert.assertEquals(0, routineLoadManager.getNeedScheduleTasksQueue().size()); + // Assert.assertEquals(0, routineLoadManager.getNeedScheduleTasksQueue().size()); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } @@ -560,7 +542,7 @@ public void testFinishTransactionWithOneFailed() throws MetaNotFoundException, transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId2, transTablets); - transactionState = fakeEditLog.getTransaction(transactionId2); + transactionState = masterTransMgr.getTransactionState(transactionId2); // check status is prepare, because the commit failed assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus()); @@ -621,7 +603,7 @@ public void testDeleteTransaction() throws LabelAlreadyUsedException, CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND); - TransactionState transactionState = fakeEditLog.getTransaction(transactionId); + TransactionState transactionState = masterTransMgr.getTransactionState(transactionId); assertNotNull(transactionState); assertEquals(transactionId, transactionState.getTransactionId()); assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus());