Skip to content

Commit

Permalink
[improve](txn insert) txn insert support write to one table many times
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Mar 28, 2024
1 parent 496befd commit bde92ed
Show file tree
Hide file tree
Showing 18 changed files with 1,838 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,14 @@ && isLocal(tabletMeta.getStorageMedium())) {
+ "clear it from backend [{}]", transactionId, backendId);
}
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
TableCommitInfo tableCommitInfo
= transactionState.getTableCommitInfo(tabletMeta.getTableId());
TableCommitInfo tableCommitInfo;
if (transactionState.getSubTransactionStates().isEmpty()) {
tableCommitInfo = transactionState.getTableCommitInfo(
tabletMeta.getTableId());
} else {
tableCommitInfo = transactionState.getSubTxnIdToTableCommitInfo()
.get(transactionId);
}
PartitionCommitInfo partitionCommitInfo = tableCommitInfo == null
? null : tableCommitInfo.getPartitionCommitInfo(partitionId);
if (partitionCommitInfo != null) {
Expand All @@ -280,43 +286,52 @@ && isLocal(tabletMeta.getStorageMedium())) {
// this transaction's status can not to be VISIBLE, and this publish task of
// this replica of this tablet on this backend need retry publish success to
// make transaction VISIBLE when last publish failed.
Map<Long, PublishVersionTask> publishVersionTask =
Map<Long, List<PublishVersionTask>> publishVersionTask =
transactionState.getPublishVersionTasks();
PublishVersionTask task = publishVersionTask.get(backendId);
if (task != null && task.isFinished()) {
List<Long> errorTablets = task.getErrorTablets();
if (errorTablets != null) {
for (int i = 0; i < errorTablets.size(); i++) {
if (tabletId == errorTablets.get(i)) {
TableCommitInfo tableCommitInfo
= transactionState.getTableCommitInfo(
List<PublishVersionTask> tasks = publishVersionTask.get(backendId);
for (PublishVersionTask task : tasks) {
if (task != null && task.isFinished()) {
List<Long> errorTablets = task.getErrorTablets();
if (errorTablets != null) {
for (int i = 0; i < errorTablets.size(); i++) {
if (tabletId == errorTablets.get(i)) {
TableCommitInfo tableCommitInfo;
if (transactionState.getSubTransactionStates().isEmpty()) {
tableCommitInfo
= transactionState.getTableCommitInfo(
tabletMeta.getTableId());
PartitionCommitInfo partitionCommitInfo =
tableCommitInfo == null ? null :
tableCommitInfo.getPartitionCommitInfo(partitionId);
if (partitionCommitInfo != null) {
TPartitionVersionInfo versionInfo
= new TPartitionVersionInfo(
} else {
tableCommitInfo =
transactionState.getSubTxnIdToTableCommitInfo()
.get(transactionId);
}
PartitionCommitInfo partitionCommitInfo =
tableCommitInfo == null ? null :
tableCommitInfo.getPartitionCommitInfo(
partitionId);
if (partitionCommitInfo != null) {
TPartitionVersionInfo versionInfo
= new TPartitionVersionInfo(
tabletMeta.getPartitionId(),
partitionCommitInfo.getVersion(), 0);
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map
= transactionsToPublish.get(
transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(
transactionState.getDbId(), map);
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map
= transactionsToPublish.get(
transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(
transactionState.getDbId(), map);
}
map.put(transactionId, versionInfo);
}
map.put(transactionId, versionInfo);
}
break;
}
break;
}
}
}
}

}
}
} // end for txn id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.SubTransactionState;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
import org.apache.doris.transaction.TransactionIdGenerator;
Expand Down Expand Up @@ -702,6 +703,12 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
}

@Override
public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
List<SubTransactionState> subTransactionStates, long timeoutMillis) throws UserException {
throw new UnsupportedOperationException("commitAndPublishTransaction is not supported in cloud");
}

@Override
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
Expand Down Expand Up @@ -1231,4 +1238,19 @@ public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 op
throws Exception {
throw new Exception(NOT_SUPPORTED_MSG);
}

@Override
public void addSubTransaction(long dbId, long transactionId, long subTransactionId) {
throw new UnsupportedOperationException("addSubTransaction is not supported in cloud");
}

@Override
public void removeSubTransaction(long dbId, long subTransactionId) {
throw new UnsupportedOperationException("removeSubTransaction is not supported in cloud");
}

@Override
public void cleanSubTransactions(long dbId, long transactionId) {
throw new UnsupportedOperationException("cleanSubTransactions is not supported in cloud");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
}
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx);
insertExecutor = ctx.isTxnModel() ? new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx)
: new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx);
boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
Expand All @@ -68,9 +67,9 @@
* Insert executor for olap table
*/
public class OlapInsertExecutor extends AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
protected long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;

/**
Expand All @@ -88,22 +87,10 @@ public long getTxnId() {
@Override
public void beginTransaction() {
try {
if (ctx.isTxnModel()) {
TransactionEntry txnEntry = ctx.getTxnEntry();
// check the same label with begin
if (this.labelName != null && !this.labelName.equals(txnEntry.getLabel())) {
throw new AnalysisException("Transaction insert expect label " + txnEntry.getLabel()
+ ", but got " + this.labelName);
}
txnEntry.beginTransaction(database, table);
this.txnId = txnEntry.getTransactionId();
this.labelName = txnEntry.getLabel();
} else {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
}
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
}
Expand Down Expand Up @@ -158,10 +145,15 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys
throw new AnalysisException(e.getMessage(), e);
}
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
if (!ctx.isTxnModel()) {
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
} else {
// TODO

}
state.addTableIndexes((OlapTable) table);
if (physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
Expand All @@ -175,12 +167,6 @@ protected void beforeExec() {

@Override
protected void onComplete() throws UserException {
if (ctx.isTxnModel()) {
TransactionEntry txnEntry = ctx.getTxnEntry();
txnEntry.addCommitInfos((Table) table, coordinator.getCommitInfos());
return;
}

if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.transaction.TransactionEntry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Optional;

/**
* Insert executor for olap table with transaction model
*/
public class OlapTxnInsertExecutor extends OlapInsertExecutor {
private static final Logger LOG = LogManager.getLogger(OlapTxnInsertExecutor.class);

public OlapTxnInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
}

public long getTxnId() {
return txnId;
}

@Override
public void beginTransaction() {
try {
TransactionEntry txnEntry = ctx.getTxnEntry();
// check the same label with begin
if (this.labelName != null && !this.labelName.equals(txnEntry.getLabel())) {
throw new AnalysisException("Transaction insert expect label " + txnEntry.getLabel()
+ ", but got " + this.labelName);
}
this.txnId = txnEntry.beginTransaction(database, table);
this.labelName = txnEntry.getLabel();
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
}
}

@Override
protected void beforeExec() {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}, txn_model=true", labelName, queryId, txnId);
}

@Override
protected void onComplete() {
TransactionEntry txnEntry = ctx.getTxnEntry();
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
if (txnId != INVALID_TXN_ID) {
txnEntry.removeTable((Table) table);
}
} else {
txnEntry.addTabletCommitInfos(txnId, (Table) table, coordinator.getCommitInfos());
}
}

@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
String queryId = DebugUtil.printId(ctx.queryId());
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed, url={}", labelName, queryId, coordinator.getTrackingUrl(), t);
if (txnId != INVALID_TXN_ID) {
ctx.getTxnEntry().removeTable((Table) table);
Env.getCurrentGlobalTransactionMgr().removeSubTransaction(table.getDatabase().getId(), txnId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,10 @@ private void updateLoadCounters(Map<String, String> newLoadCounters) {
private void updateCommitInfos(List<TTabletCommitInfo> commitInfos) {
lock.lock();
try {
this.commitInfos.addAll(commitInfos);
// in pipelinex, the commit info may be duplicate, so we remove the duplicate ones
Map<Pair<Long, Long>, TTabletCommitInfo> commitInfoMap = Maps.newHashMap();
commitInfos.forEach(info -> commitInfoMap.put(Pair.of(info.getBackendId(), info.getTabletId()), info));
this.commitInfos.addAll(commitInfoMap.values());
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -35,6 +36,7 @@ public class PublishVersionTask extends AgentTask {
private static final Logger LOG = LogManager.getLogger(PublishVersionTask.class);

private long transactionId;
@Getter
private List<TPartitionVersionInfo> partitionVersionInfos;

/**
Expand Down
Loading

0 comments on commit bde92ed

Please sign in to comment.