Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.PublishVersionDaemon;

Expand Down Expand Up @@ -566,6 +567,8 @@ public class Env {

private final SplitSourceManager splitSourceManager;

private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr;

private final List<String> forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids);

// if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it.
Expand Down Expand Up @@ -814,6 +817,7 @@ public Env(boolean isCheckpointCatalog) {
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr();
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -6535,6 +6539,10 @@ public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}

public GlobalExternalTransactionInfoMgr getGlobalExternalTransactionInfoMgr() {
return globalExternalTransactionInfoMgr;
}

public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
* The derived class should implement the abstract method for certain type of target table
*/
public abstract class AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class);

protected long jobId;
protected final ConnectContext ctx;
protected final Coordinator coordinator;
Expand All @@ -62,6 +64,7 @@ public abstract class AbstractInsertExecutor {
protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
protected final boolean emptyInsert;
protected long txnId = INVALID_TXN_ID;

/**
* Constructor
Expand Down Expand Up @@ -93,7 +96,9 @@ public String getLabelName() {
return labelName;
}

public abstract long getTxnId();
public long getTxnId() {
return txnId;
}

/**
* begin transaction if necessary
Expand All @@ -108,7 +113,7 @@ public String getLabelName() {
/**
* Do something before exec
*/
protected abstract void beforeExec();
protected abstract void beforeExec() throws UserException;

/**
* Do something after exec finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
* Insert executor for base external table
*/
public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class);
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
protected final TransactionManager transactionManager;
protected final String catalogName;
Expand All @@ -70,16 +68,6 @@ public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table,
}
}

@Override
public long getTxnId() {
return txnId;
}

/**
* collect commit infos from BEs
*/
protected abstract void setCollectCommitInfoFunc();

/**
* At this time, FE has successfully collected all commit information from BEs.
* Before commit this txn, commit information need to be analyzed and processed.
Expand All @@ -94,7 +82,6 @@ public long getTxnId() {
@Override
public void beginTransaction() {
txnId = transactionManager.begin();
setCollectCommitInfoFunc();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
}

@Override
public void setCollectCommitInfoFunc() {
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
}

@Override
protected void beforeExec() {
protected void beforeExec() throws UserException {
// check params
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
Preconditions.checkArgument(insertCtx.isPresent(), "insert context must be present");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
}

@Override
public void setCollectCommitInfoFunc() {
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
}

@Override
protected void beforeExec() {
protected void beforeExec() throws UserException {
String dbName = ((IcebergExternalTable) table).getDbName();
String tbName = table.getName();
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for statement interruption,
// so we need to set this here
insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
executor.setCoord(insertExecutor.getCoordinator());
return insertExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink p
// do nothing
}

@Override
protected void setCollectCommitInfoFunc() {
// do nothing
}

@Override
protected void doBeforeCommit() throws UserException {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@
* Insert executor for olap table
*/
public class OlapInsertExecutor extends AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class);
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;

/**
Expand All @@ -85,11 +83,6 @@ public OlapInsertExecutor(ConnectContext ctx, Table table,
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

@Override
public long getTxnId() {
return txnId;
}

@Override
public void beginTransaction() {
if (isGroupCommitHttpStream()) {
Expand Down
32 changes: 12 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlCommand;
Expand Down Expand Up @@ -93,8 +95,6 @@
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
Expand Down Expand Up @@ -250,12 +250,6 @@ public class Coordinator implements CoordInterface {
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList();

// Collect all hivePartitionUpdates obtained from be
Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;

// Collect all icebergCommitData obtained from be
Consumer<List<TIcebergCommitData>> icebergCommitDataFunc;

// Input parameter
private long jobId = -1; // job which this task belongs to
private TUniqueId queryId;
Expand Down Expand Up @@ -486,6 +480,10 @@ public long getTxnId() {
return txnId;
}

public void setTxnId(long txnId) {
this.txnId = txnId;
}

public String getLabel() {
return label;
}
Expand Down Expand Up @@ -2378,14 +2376,6 @@ private void updateScanRangeNumByScanRange(TScanRangeParams param) {
// TODO: more ranges?
}

public void setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc) {
this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
}

public void setIcebergCommitDataFunc(Consumer<List<TIcebergCommitData>> icebergCommitDataFunc) {
this.icebergCommitDataFunc = icebergCommitDataFunc;
}

// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
Expand Down Expand Up @@ -2438,11 +2428,13 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
if (params.isSetHivePartitionUpdates()) {
((HMSTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateHivePartitionUpdates(params.getHivePartitionUpdates());
}
if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) {
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
if (params.isSetIcebergCommitDatas()) {
((IcebergTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateIcebergCommitData(params.getIcebergCommitDatas());
}

if (ctx.done) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.operations.ExternalMetadataOps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public abstract class AbstractExternalTransactionManager<T extends Transaction> implements TransactionManager {
private static final Logger LOG = LogManager.getLogger(AbstractExternalTransactionManager.class);
private final Map<Long, T> transactions = new ConcurrentHashMap<>();
protected final ExternalMetadataOps ops;

public AbstractExternalTransactionManager(ExternalMetadataOps ops) {
this.ops = ops;
}

abstract T createTransaction();

@Override
public long begin() {
long id = Env.getCurrentEnv().getNextId();
T transaction = createTransaction();
transactions.put(id, transaction);
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().putTxnById(id, transaction);
return id;
}

@Override
public void commit(long id) throws UserException {
getTransactionWithException(id).commit();
transactions.remove(id);
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
}

@Override
public void rollback(long id) {
try {
getTransactionWithException(id).rollback();
} catch (TransactionNotFoundException e) {
LOG.warn(e.getMessage(), e);
} finally {
transactions.remove(id);
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
}
}

@Override
public Transaction getTransaction(long id) throws UserException {
return getTransactionWithException(id);
}

private Transaction getTransactionWithException(long id) throws TransactionNotFoundException {
Transaction txn = transactions.get(id);
if (txn == null) {
throw new TransactionNotFoundException("Can't find transaction for " + id);
}
return txn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class GlobalExternalTransactionInfoMgr {
public Map<Long, Transaction> idToTxn = new ConcurrentHashMap<>();

public Transaction getTxnById(long txnId) {
if (idToTxn.containsKey(txnId)) {
return idToTxn.get(txnId);
}
throw new RuntimeException("Can't find txn for " + txnId);
}

public void putTxnById(long txnId, Transaction txn) {
if (idToTxn.containsKey(txnId)) {
throw new RuntimeException("Duplicate txnId for " + txnId);
}
idToTxn.put(txnId, txn);
}

public void removeTxnById(long txnId) {
idToTxn.remove(txnId);
}
}
Loading