Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.transaction.AbortTransactionException;
Expand Down Expand Up @@ -213,9 +212,7 @@ public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
.append(id).append(System.currentTimeMillis()).toString().hashCode();
this.authCode = 0;
}

protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
Expand Down Expand Up @@ -595,7 +592,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
.filter(entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
if (!routineLoadTaskInfoOptional.isPresent()) {
throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed"
+ " while task " + txnState.getLabel() + "has been aborted ");
+ " while task " + txnState.getLabel() + " has been aborted ");
}
} finally {
readUnlock();
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ public void rollEditLog() {
*/
private synchronized void logEdit(short op, Writable writable) {
if (this.getNumEditStreams() == 0) {
LOG.error("Fatal Error : no editLog stream");
LOG.error("Fatal Error : no editLog stream", new Exception());
throw new Error("Fatal Error : no editLog stream");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public static Catalog createTestCatalog() throws InstantiationException, Illegal
Backend backend1 = createBackend(testBackendId1, "host1", 123, 124, 125);
Backend backend2 = createBackend(testBackendId2, "host1", 123, 124, 125);
Backend backend3 = createBackend(testBackendId3, "host1", 123, 124, 125);
catalog.getCurrentSystemInfo().addBackend(backend1);
catalog.getCurrentSystemInfo().addBackend(backend2);
catalog.getCurrentSystemInfo().addBackend(backend3);
Catalog.getCurrentSystemInfo().addBackend(backend1);
Catalog.getCurrentSystemInfo().addBackend(backend2);
Catalog.getCurrentSystemInfo().addBackend(backend3);
catalog.initDefaultCluster();
Database db = createSimpleDb(testDbId1, testTableId1, testPartitionId1, testIndexId1, testTabletId1,
testStartVersion, testStartVersionHash);
Expand Down
6 changes: 6 additions & 0 deletions fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.alter.SchemaChangeJob;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.transaction.TransactionState;

import java.util.HashMap;
Expand Down Expand Up @@ -85,6 +86,11 @@ public void logStartSchemaChange(SchemaChangeJob schemaChangeJob) {
public void logFinishingSchemaChange(SchemaChangeJob schemaChangeJob) {
}

@Mock
public void logOpRoutineLoadJob(RoutineLoadOperation operation) {

}

public TransactionState getTransaction(long transactionId) {
return allTransactionState.get(transactionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.UserException;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;

Expand Down Expand Up @@ -71,6 +73,8 @@ public void testNormalPlan() throws UserException {
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
request.setTxnId(1);
request.setLoadId(new TUniqueId(2, 3));
request.setFileType(TFileType.FILE_STREAM);
request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN);
StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
planner.plan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest getBaseRequest() {
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
request.setFileType(TFileType.FILE_STREAM);
request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN);
return request;
}

Expand Down Expand Up @@ -251,7 +253,6 @@ public void testColumnsNormal() throws UserException, UserException {
};

TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_LOCAL);
request.setColumns("k1,k2,v1, v2=k2");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
Expand Down Expand Up @@ -300,7 +301,7 @@ public void testHllColumnsNormal() throws UserException {
};

TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_LOCAL);
request.setFileType(TFileType.FILE_STREAM);
request.setColumns("k1,k2, v1=hll_hash(k2)");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
Expand Down Expand Up @@ -330,21 +331,27 @@ public void testHllColumnsNoHllHash() throws UserException {
}
}

new Expectations() {{
catalog.getFunction((Function) any, (Function.CompareMode) any);
result = new ScalarFunction(new FunctionName("hll_hash1"), Lists.newArrayList(), Type.BIGINT, false);
}};
new Expectations() {
{
catalog.getFunction((Function) any, (Function.CompareMode) any);
result = new ScalarFunction(new FunctionName("hll_hash1"), Lists.newArrayList(), Type.BIGINT, false);
minTimes = 0;
}
};

new Expectations() {
{
dstTable.getColumn("k1");
result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get();
minTimes = 0;

dstTable.getColumn("k2");
result = null;
minTimes = 0;

dstTable.getColumn("v1");
result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get();
minTimes = 0;
}
};

Expand Down Expand Up @@ -522,15 +529,19 @@ public void testWhereBad() throws UserException, UserException {
{
dstTable.getColumn("k1");
result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get();
minTimes = 0;

dstTable.getColumn("k2");
result = columns.stream().filter(c -> c.getName().equals("k2")).findFirst().get();
minTimes = 0;

dstTable.getColumn("v1");
result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get();
minTimes = 0;

dstTable.getColumn("v2");
result = columns.stream().filter(c -> c.getName().equals("v2")).findFirst().get();
minTimes = 0;
}
};

Expand Down
4 changes: 0 additions & 4 deletions fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -527,17 +527,13 @@ public void testUse() throws Exception {
EasyMock.expect(useStmt.getDatabase()).andReturn("testDb").anyTimes();
EasyMock.expect(useStmt.getRedirectStatus()).andReturn(RedirectStatus.NO_FORWARD).anyTimes();
EasyMock.expect(useStmt.getClusterName()).andReturn("testCluster").anyTimes();

EasyMock.replay(useStmt);

Symbol symbol = new Symbol(0, useStmt);
SqlParser parser = EasyMock.createMock(SqlParser.class);
EasyMock.expect(parser.parse()).andReturn(symbol).anyTimes();
EasyMock.replay(parser);

PowerMock.expectNew(SqlParser.class, EasyMock.isA(SqlScanner.class), EasyMock.anyString()).andReturn(parser);
PowerMock.replay(SqlParser.class);

StmtExecutor executor = new StmtExecutor(ctx, "");
executor.execute();

Expand Down
Loading