Skip to content

Commit

Permalink
support delete same table twice; make fe meta compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Apr 7, 2024
1 parent f81ccfe commit 4aa4514
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ public final class FeMetaVersion {
public static final int VERSION_129 = 129;

public static final int VERSION_130 = 130;
// For transaction insert
public static final int VERSION_131 = 131;

// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_130;
public static final int VERSION_CURRENT = VERSION_131;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,7 @@ public void readFields(DataInput in) throws IOException {
}
case OperationType.OP_UPSERT_TRANSACTION_STATE:
case OperationType.OP_DELETE_TRANSACTION_STATE: {
data = new TransactionState();
((TransactionState) data).readFields(in);
data = TransactionState.read(in);
isRead = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public enum DeleteState {

// jobId(listenerId). use in beginTransaction to callback function
private final long id;
protected long transactionId;
protected static final long INVALID_TXN_ID = -1L;
protected long transactionId = INVALID_TXN_ID;
protected String label;
private final Set<Long> totalTablets;
private final Set<Long> quorumTablets;
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.load;

import org.apache.doris.catalog.Env;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.transaction.TabletCommitInfo;
Expand All @@ -41,8 +42,7 @@ public TxnDeleteJob(long id, long transactionId, String label, Map<Long, Short>
@Override
public long beginTxn() throws Exception {
TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
txnEntry.beginTransaction(targetTbl.getDatabase(), targetTbl);
this.transactionId = txnEntry.getTransactionId();
this.transactionId = txnEntry.beginTransaction(targetTbl.getDatabase(), targetTbl);
this.label = txnEntry.getLabel();
return this.transactionId;
}
Expand All @@ -51,7 +51,7 @@ public long beginTxn() throws Exception {
public String commit() throws Exception {
List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos();
TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
txnEntry.addCommitInfos(targetTbl,
txnEntry.addTabletCommitInfos(transactionId, targetTbl,
tabletCommitInfos.stream().map(c -> new TTabletCommitInfo(c.getTabletId(), c.getBackendId()))
.collect(Collectors.toList()));

Expand All @@ -63,5 +63,9 @@ public String commit() throws Exception {

@Override
public void cancel(String reason) {
if (transactionId != INVALID_TXN_ID) {
ConnectContext.get().getTxnEntry().removeTable(targetTbl);
Env.getCurrentGlobalTransactionMgr().removeSubTransaction(targetTbl.getDatabase().getId(), transactionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ private void checkCommitStatus(List<Table> tableList, TransactionState transacti
}

List<MaterializedIndex> allIndices;
if (transactionState.getLoadedTblIndexes().isEmpty()) {
if (transactionState.getLoadedTblIndexes().isEmpty()
|| transactionState.getLoadedTblIndexes().get(tableId) == null) {
allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
} else {
allIndices = Lists.newArrayList();
Expand Down Expand Up @@ -2508,7 +2509,8 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat

// TODO should use sub transaction load indexes
List<MaterializedIndex> allIndices;
if (transactionState.getLoadedTblIndexes().isEmpty()) {
if (transactionState.getLoadedTblIndexes().isEmpty()
|| transactionState.getLoadedTblIndexes().get(tableId) == null) {
allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
} else {
allIndices = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,7 @@ public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
int numTransactions = in.readInt();
for (int i = 0; i < numTransactions; ++i) {
TransactionState transactionState = new TransactionState();
transactionState.readFields(in);
TransactionState transactionState = TransactionState.read(in);
try {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId());
dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -732,46 +731,21 @@ public void prolongPublishTimeout() {

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(transactionId);
Text.writeString(out, label);
out.writeLong(dbId);
out.writeInt(idToTableCommitInfos.size());
for (TableCommitInfo info : idToTableCommitInfos.values()) {
info.write(out);
}
out.writeInt(txnCoordinator.sourceType.value());
Text.writeString(out, txnCoordinator.ip);
out.writeInt(transactionStatus.value());
out.writeInt(sourceType.value());
out.writeLong(prepareTime);
out.writeLong(preCommitTime);
out.writeLong(commitTime);
out.writeLong(finishTime);
Text.writeString(out, reason);
out.writeInt(errorReplicas.size());
for (long errorReplciaId : errorReplicas) {
out.writeLong(errorReplciaId);
}

if (txnCommitAttachment == null) {
out.writeBoolean(false);
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public static TransactionState read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_131) {
TransactionState transactionState = new TransactionState();
transactionState.readFields(in);
return transactionState;
} else {
out.writeBoolean(true);
txnCommitAttachment.write(out);
}
out.writeLong(callbackId);
out.writeLong(timeoutMs);
out.writeInt(tableIdList.size());
for (Long aLong : tableIdList) {
out.writeLong(aLong);
}
out.writeLong(subTxnIdToTableCommitInfo.size());
for (Entry<Long, TableCommitInfo> entry : subTxnIdToTableCommitInfo.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, TransactionState.class);
}
}

@Deprecated
public void readFields(DataInput in) throws IOException {
transactionId = in.readLong();
label = Text.readString(in);
Expand Down Expand Up @@ -806,13 +780,6 @@ public void readFields(DataInput in) throws IOException {
for (int i = 0; i < tableListSize; i++) {
tableIdList.add(in.readLong());
}
// TODO compatible
long subTxnSize = in.readLong();
for (int i = 0; i < subTxnSize; i++) {
long tableId = in.readLong();
TableCommitInfo info = TableCommitInfo.read(in);
subTxnIdToTableCommitInfo.put(tableId, info);
}
}

public Map<Long, Long> getTableIdToTotalNumDeltaRows() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0

// 2. Read objects from file
DataInputStream in = new DataInputStream(new FileInputStream(file));
TransactionState readTransactionState = new TransactionState();
readTransactionState.readFields(in);
TransactionState readTransactionState = TransactionState.read(in);

Assert.assertEquals(transactionState.getCoordinator().ip, readTransactionState.getCoordinator().ip);
in.close();
Expand Down
8 changes: 4 additions & 4 deletions regression-test/data/insert_p0/txn_insert.out
Original file line number Diff line number Diff line change
Expand Up @@ -964,20 +964,20 @@
-- !select39 --
1 a 100

-- !select27 --
-- !select40 --
1 2000-01-01 1 1 1.0
3 2000-01-03 3 3 3.0

-- !select28 --
-- !select41 --
2 2000-01-20 20 20 20.0
3 2000-01-30 30 30 30.0
4 2000-01-04 4 4 4.0
6 2000-01-10 10 10 10.0

-- !select29 --
-- !select42 --
3 2000-01-03 3 3 3.0

-- !select30 --
-- !select43 --
1 2000-01-01 1 1 1.0
2 2000-01-02 2 2 2.0
3 2000-01-03 3 3 3.0
Expand Down
25 changes: 13 additions & 12 deletions regression-test/suites/insert_p0/txn_insert.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,12 @@ suite("txn_insert") {
sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
sql """ update ${ut_table}_1 set score = 101 where id = 1; """
sql """ commit; """
<<<<<<< HEAD
sql "sync"
order_qt_select25 """select * from ${ut_table}_1 """
order_qt_select26 """select * from ${ut_table}_2 """
=======
order_qt_select38 """select * from ${ut_table}_1 """
order_qt_select39 """select * from ${ut_table}_2 """
>>>>>>> ebe2ce6f92 (fix)
}

// 8. delete from using and delete from stmt
// 12. delete from using and delete from stmt
if (use_nereids_planner) {
for (def ta in ["txn_insert_dt1", "txn_insert_dt2", "txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
sql """ drop table if exists ${ta} """
Expand Down Expand Up @@ -485,19 +480,25 @@ suite("txn_insert") {
where txn_insert_dt4.id = txn_insert_dt2.id;
"""
sql """
delete from txn_insert_dt2 where id = 1 or id = 5;
delete from txn_insert_dt2 where id = 1;
"""
sql """
delete from txn_insert_dt5 partition(p_20000102) where id = 1 or id = 5;
delete from txn_insert_dt2 where id = 5;
"""
sql """
delete from txn_insert_dt5 partition(p_20000102) where id = 1;
"""
sql """
delete from txn_insert_dt5 partition(p_20000102) where id = 5;
"""
sql """ commit """
sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, '10', 10.0) """
sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, '10', 10.0) """
sql "sync"
order_qt_select27 """select * from txn_insert_dt1 """
order_qt_select28 """select * from txn_insert_dt2 """
order_qt_select29 """select * from txn_insert_dt4 """
order_qt_select30 """select * from txn_insert_dt5 """
order_qt_select40 """select * from txn_insert_dt1 """
order_qt_select41 """select * from txn_insert_dt2 """
order_qt_select42 """select * from txn_insert_dt4 """
order_qt_select43 """select * from txn_insert_dt5 """
}
}
}

0 comments on commit 4aa4514

Please sign in to comment.