Skip to content

Commit

Permalink
[improve](txn insert) txn insert support write to one table many times (
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored and ByteYue committed May 15, 2024
1 parent b45b79a commit a877143
Show file tree
Hide file tree
Showing 45 changed files with 4,505 additions and 753 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ public final class FeMetaVersion {
public static final int VERSION_129 = 129;

public static final int VERSION_130 = 130;

// for java-udtf add a bool field to write
public static final int VERSION_131 = 131;

// For transaction insert
public static final int VERSION_132 = 132;

// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_131;
public static final int VERSION_CURRENT = VERSION_132;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,83 +254,8 @@ && isLocal(tabletMeta.getStorageMedium())) {

// check if should clear transactions
if (backendTabletInfo.isSetTransactionIds()) {
List<Long> transactionIds = backendTabletInfo.getTransactionIds();
GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr();
for (Long transactionId : transactionIds) {
TransactionState transactionState
= transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
if (transactionState == null
|| transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
synchronized (transactionsToClear) {
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
}
if (LOG.isDebugEnabled()) {
LOG.debug("transaction id [{}] is not valid any more, "
+ "clear it from backend [{}]", transactionId, backendId);
}
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
TableCommitInfo tableCommitInfo
= transactionState.getTableCommitInfo(tabletMeta.getTableId());
PartitionCommitInfo partitionCommitInfo = tableCommitInfo == null
? null : tableCommitInfo.getPartitionCommitInfo(partitionId);
if (partitionCommitInfo != null) {
TPartitionVersionInfo versionInfo
= new TPartitionVersionInfo(tabletMeta.getPartitionId(),
partitionCommitInfo.getVersion(), 0);
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map
= transactionsToPublish.get(transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(transactionState.getDbId(), map);
}
map.put(transactionId, versionInfo);
}
}
} else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
// for some reasons, transaction pushlish succeed replica num less than quorum,
// this transaction's status can not to be VISIBLE, and this publish task of
// this replica of this tablet on this backend need retry publish success to
// make transaction VISIBLE when last publish failed.
Map<Long, PublishVersionTask> publishVersionTask =
transactionState.getPublishVersionTasks();
PublishVersionTask task = publishVersionTask.get(backendId);
if (task != null && task.isFinished()) {
List<Long> errorTablets = task.getErrorTablets();
if (errorTablets != null) {
for (int i = 0; i < errorTablets.size(); i++) {
if (tabletId == errorTablets.get(i)) {
TableCommitInfo tableCommitInfo
= transactionState.getTableCommitInfo(
tabletMeta.getTableId());
PartitionCommitInfo partitionCommitInfo =
tableCommitInfo == null ? null :
tableCommitInfo.getPartitionCommitInfo(partitionId);
if (partitionCommitInfo != null) {
TPartitionVersionInfo versionInfo
= new TPartitionVersionInfo(
tabletMeta.getPartitionId(),
partitionCommitInfo.getVersion(), 0);
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map
= transactionsToPublish.get(
transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(
transactionState.getDbId(), map);
}
map.put(transactionId, versionInfo);
}
}
break;
}
}
}
}

}
}
handleBackendTransactions(backendId, backendTabletInfo.getTransactionIds(), tabletId,
tabletMeta, transactionsToPublish, transactionsToClear);
} // end for txn id

// update replicase's version count
Expand Down Expand Up @@ -386,6 +311,82 @@ && isLocal(tabletMeta.getStorageMedium())) {
tabletRecoveryMap.size(), (end - start));
}

private void handleBackendTransactions(long backendId, List<Long> transactionIds, long tabletId,
TabletMeta tabletMeta, Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear) {
GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr();
long partitionId = tabletMeta.getPartitionId();
for (Long transactionId : transactionIds) {
TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
synchronized (transactionsToClear) {
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
}
if (LOG.isDebugEnabled()) {
LOG.debug("transaction id [{}] is not valid any more, clear it from backend [{}]",
transactionId, backendId);
}
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
publishPartition(transactionState, transactionId, tabletMeta, partitionId, transactionsToPublish);
} else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
// for some reasons, transaction pushlish succeed replica num less than quorum,
// this transaction's status can not to be VISIBLE, and this publish task of
// this replica of this tablet on this backend need retry publish success to
// make transaction VISIBLE when last publish failed.
Map<Long, List<PublishVersionTask>> publishVersionTask = transactionState.getPublishVersionTasks();
List<PublishVersionTask> tasks = publishVersionTask.get(backendId);
for (PublishVersionTask task : tasks) {
if (task != null && task.isFinished()) {
List<Long> errorTablets = task.getErrorTablets();
if (errorTablets != null) {
for (int i = 0; i < errorTablets.size(); i++) {
if (tabletId == errorTablets.get(i)) {
publishPartition(transactionState, transactionId, tabletMeta, partitionId,
transactionsToPublish);
break;
}
}
}
}
}
}
}
}

// the transactionId may be sub transaction id or transaction id
private TPartitionVersionInfo generatePartitionVersionInfoWhenReport(TransactionState transactionState,
long transactionId, TabletMeta tabletMeta, long partitionId) {
TableCommitInfo tableCommitInfo;
if (transactionState.getSubTransactionStates() == null) {
tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId());
} else {
tableCommitInfo = transactionState.getTableCommitInfoBySubTxnId(transactionId);
}
if (tableCommitInfo != null && tableCommitInfo.getPartitionCommitInfo(partitionId) != null) {
PartitionCommitInfo partitionCommitInfo = tableCommitInfo.getPartitionCommitInfo(partitionId);
return new TPartitionVersionInfo(tabletMeta.getPartitionId(),
partitionCommitInfo.getVersion(), 0);
}
return null;
}

private void publishPartition(TransactionState transactionState, long transactionId, TabletMeta tabletMeta,
long partitionId, Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish) {
TPartitionVersionInfo versionInfo = generatePartitionVersionInfoWhenReport(transactionState,
transactionId, tabletMeta, partitionId);
if (versionInfo != null) {
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(
transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(transactionState.getDbId(), map);
}
map.put(transactionId, versionInfo);
}
}
}

public Long getTabletIdByReplica(long replicaId) {
long stamp = readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.SubTransactionState;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
import org.apache.doris.transaction.TransactionIdGenerator;
Expand Down Expand Up @@ -739,6 +740,12 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
}

@Override
public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
List<SubTransactionState> subTransactionStates, long timeoutMillis) throws UserException {
throw new UnsupportedOperationException("commitAndPublishTransaction is not supported in cloud");
}

@Override
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
Expand Down Expand Up @@ -1301,4 +1308,14 @@ public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 op
throws Exception {
throw new Exception(NOT_SUPPORTED_MSG);
}

@Override
public void addSubTransaction(long dbId, long transactionId, long subTransactionId) {
throw new UnsupportedOperationException("addSubTransaction is not supported in cloud");
}

@Override
public void removeSubTransaction(long dbId, long subTransactionId) {
throw new UnsupportedOperationException("removeSubTransaction is not supported in cloud");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,7 @@ public void readFields(DataInput in) throws IOException {
}
case OperationType.OP_UPSERT_TRANSACTION_STATE:
case OperationType.OP_DELETE_TRANSACTION_STATE: {
data = new TransactionState();
((TransactionState) data).readFields(in);
data = TransactionState.read(in);
isRead = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void process(Database targetDb, OlapTable targetTbl, List<String> partiti
// must call this to make sure we only handle the tablet in the mIndex we saw here.
// table may be under schema change or rollup, and the newly created tablets will not be checked later,
// to make sure that the delete transaction can be done successfully.
txnState.addTableIndexes(targetTbl);
deleteJob.addTableIndexes(txnState);
idToDeleteJob.put(txnId, deleteJob);
deleteJob.dispatch();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public enum DeleteState {

// jobId(listenerId). use in beginTransaction to callback function
private final long id;
protected long transactionId;
protected static final long INVALID_TXN_ID = -1L;
protected long transactionId = INVALID_TXN_ID;
protected String label;
private final Set<Long> totalTablets;
private final Set<Long> quorumTablets;
Expand Down Expand Up @@ -739,4 +740,8 @@ private List<String> getDeleteCondString(List<Predicate> conditions) {
return deleteConditions;
}
}

protected void addTableIndexes(TransactionState state) {
state.addTableIndexes(targetTbl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -34,10 +35,13 @@

public class EtlStatus implements Writable {
public static final String DEFAULT_TRACKING_URL = FeConstants.null_string;

@SerializedName(value = "s")
private TEtlState state;
@SerializedName(value = "tu")
private String trackingUrl;
@SerializedName(value = "st")
private Map<String, String> stats;
@SerializedName(value = "c")
private Map<String, String> counters;
// not persist
private Map<String, Long> fileMap;
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/FailMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand All @@ -36,7 +38,9 @@ public enum CancelType {
TXN_UNKNOWN // cancelled because txn status is unknown
}

@SerializedName(value = "ct")
private CancelType cancelType;
@SerializedName(value = "m")
private String msg = "";

public FailMsg() {
Expand Down
Loading

0 comments on commit a877143

Please sign in to comment.