Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
abbe246
IGNITE-24963 Wound wait hang debug wip
ascherbakoff Mar 4, 2026
e0ee492
IGNITE-24963 fix unlock path
ascherbakoff Mar 4, 2026
41a6a8d
IGNITE-24963 remove logging
ascherbakoff Mar 4, 2026
fc949b9
IGNITE-24963 bencnhmarks
ascherbakoff Mar 4, 2026
066aa81
IGNITE-24963 retry id
ascherbakoff Mar 4, 2026
33f6b05
IGNITE-24963 Debug hang
ascherbakoff Mar 4, 2026
0289ab0
IGNITE-24963 Working
ascherbakoff Mar 8, 2026
66f7d9d
IGNITE-24963 Working 2
ascherbakoff Mar 11, 2026
2a39eb3
IGNITE-24963 Cleanup for bench
ascherbakoff Mar 12, 2026
ddd0379
IGNITE-24963 Try for update
ascherbakoff Mar 12, 2026
ed34505
IGNITE-24963 Try for update fixed bug
ascherbakoff Mar 12, 2026
5c167f5
IGNITE-24963 Revert to S lock
ascherbakoff Mar 13, 2026
2d7aeb0
IGNITE-24963 Merged with main
ascherbakoff Mar 13, 2026
5dc98b4
IGNITE-24963 TPC-C benchmark runner node
ascherbakoff Mar 16, 2026
a215ae4
IGNITE-24963 Cleanup lock manager wip 2
ascherbakoff Mar 16, 2026
fc7dd7e
IGNITE-24963 Fixed lock manager tests
ascherbakoff Mar 16, 2026
0d796b2
IGNITE-24963 Use proper tx formatting
ascherbakoff Mar 16, 2026
8c8190f
IGNITE-24963 Optimized part inflights
ascherbakoff Mar 17, 2026
874a0d9
IGNITE-24963 Lock free decrement
ascherbakoff Mar 17, 2026
51a1008
IGNITE-24963 Revert runInTransaction
ascherbakoff Mar 17, 2026
10c70a2
IGNITE-24963 Try WD
ascherbakoff Mar 17, 2026
458deb6
IGNITE-24963 Post review fixes 1
ascherbakoff Mar 17, 2026
b21a66e
IGNITE-24963 Post review fixes 2
ascherbakoff Mar 19, 2026
c8d28ca
IGNITE-24963 Post review fixes 3
ascherbakoff Mar 20, 2026
4638fed
IGNITE-24963 Retry commits
ascherbakoff Mar 20, 2026
c7598a0
IGNITE-24963 Stabilize WD
ascherbakoff Mar 20, 2026
b02bbfe
IGNITE-24963 Fix coarse locks deadlock prevention
ascherbakoff Mar 20, 2026
89b9587
IGNITE-24963 Fix abandoned locks handling
ascherbakoff Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ static <T> T runInTransactionInternal(
try {
ret = clo.apply(tx);

tx.commit(); // Commit is retriable.

break;
} catch (Exception ex) {
addSuppressedToList(suppressed, ex);
Expand Down Expand Up @@ -98,19 +100,6 @@ static <T> T runInTransactionInternal(
}
}

try {
tx.commit();
} catch (Exception e) {
try {
// Try to rollback tx in case if it's not finished. Retry is not needed here due to the durable finish.
tx.rollback();
} catch (Exception re) {
e.addSuppressed(re);
}

throw e;
}

return ret;
}

Expand Down Expand Up @@ -158,6 +147,7 @@ static <T> CompletableFuture<T> runInTransactionAsyncInternal(
.thenCompose(tx -> {
try {
return clo.apply(tx)
.thenCompose(res -> tx.commitAsync().thenApply(ignored -> res))
.handle((res, e) -> {
if (e != null) {
return handleClosureException(
Expand All @@ -173,30 +163,11 @@ static <T> CompletableFuture<T> runInTransactionAsyncInternal(
} else {
return completedFuture(res);
}
})
.thenCompose(identity())
.thenApply(res -> new TxWithVal<>(tx, res));
}).thenCompose(identity());
} catch (Exception e) {
return handleClosureException(igniteTransactions, tx, clo, txOptions, startTimestamp, initialTimeout, sup, e)
.thenApply(res -> new TxWithVal<>(tx, res));
return handleClosureException(igniteTransactions, tx, clo, txOptions, startTimestamp, initialTimeout, sup, e);
}
})
// Transaction commit with rollback on failure, without retries.
// Transaction rollback on closure failure is implemented in closure retry logic.
.thenCompose(txWithVal ->
txWithVal.tx.commitAsync()
.handle((ignored, e) -> {
if (e == null) {
return completedFuture(null);
} else {
return txWithVal.tx.rollbackAsync()
// Rethrow commit exception.
.handle((ign, re) -> sneakyThrow(e));
}
})
.thenCompose(fut -> fut)
.thenApply(ignored -> txWithVal.val)
);
});
}

private static <T> CompletableFuture<T> handleClosureException(
Expand Down Expand Up @@ -347,14 +318,4 @@ private static long calcRemainingTime(long initialTimeout, long startTimestamp)
private static <E extends Throwable> E sneakyThrow(Throwable e) throws E {
throw (E) e;
}

private static class TxWithVal<T> {
private final Transaction tx;
private final T val;

private TxWithVal(Transaction tx, T val) {
this.tx = tx;
this.val = val;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void testRetries(
}

boolean requiresEventualSuccess = closureFailureCount < Integer.MAX_VALUE
// Commit failure can't be retried.
&& commitFailureCount == 0
&& commitFailureCount < Integer.MAX_VALUE
&& (commitFailureCount == 0 || rollbackFailureCount < Integer.MAX_VALUE)
// Rollbacks should be retried until success or timeout, so the rollback must succeed before closure retry.
&& (closureFailureCount == 0 || rollbackFailureCount < Integer.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -702,6 +703,19 @@ public static boolean waitForCondition(BooleanSupplier cond, long sleepMillis, l
return false;
}

/**
* Ensure the future is not completed for a duration.
*
* @param future The future.
* @param durationMillis Milliseconds to check for condition.
*/
public static void ensureFutureNotCompleted(CompletableFuture<?> future, long durationMillis) {
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
.during(durationMillis, TimeUnit.MILLISECONDS)
.until(future::isDone, is(false));
}

/**
* Returns random byte array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public TxFinishReplicaRequestHandler(
* @return future result of the operation.
*/
public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) {
//LOG.info("DBG: handle finish " + request.txId() + " commit=" + request.commit());

Map<ZonePartitionId, PartitionEnlistment> enlistedGroups = asReplicationGroupIdToPartitionMap(request.groups());

UUID txId = request.txId();
Expand Down Expand Up @@ -223,10 +225,13 @@ private CompletableFuture<TransactionResult> finishAndCleanup(
List<EnlistedPartitionGroup> enlistedPartitionGroups = enlistedPartitions.entrySet().stream()
.map(entry -> new EnlistedPartitionGroup(entry.getKey(), entry.getValue().tableIds()))
.collect(toList());
// LOG.info("DBG: finishTx " + txId + " " + commit + " " + enlistedPartitionGroups.size());
return finishTransaction(enlistedPartitionGroups, txId, commit, commitTimestamp)
.thenCompose(txResult ->
txManager.cleanup(replicationGroupId, enlistedPartitions, commit, commitTimestamp, txId)
.thenApply(v -> txResult)
.thenCompose(txResult -> {
//LOG.info("DBG: done finishTx " + txId);
return txManager.cleanup(replicationGroupId, enlistedPartitions, commit, commitTimestamp, txId)
.thenApply(v -> txResult);
}
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.benchmark;

import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;

import java.io.File;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.Statement.StatementBuilder;
import org.apache.ignite.tx.Transaction;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;

/**
* Extendable class to start a dedicated cluster node for TPC-C benchmark.
*/
public class TpccBenchmarkNodeRunner {
private static final int BASE_PORT = 3344;
private static final int BASE_CLIENT_PORT = 10800;
private static final int BASE_REST_PORT = 10300;

private static final List<IgniteServer> igniteServers = new ArrayList<>();

protected static Ignite publicIgnite;
protected static IgniteImpl igniteImpl;

public static void main(String[] args) throws Exception {
TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
runner.startCluster();
}

public IgniteImpl node(int idx) {
return unwrapIgniteImpl(igniteServers.get(idx).api());
}

private void startCluster() throws Exception {
Path workDir = workDir();

String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';

@Language("HOCON")
String configTemplate = "ignite {\n"
+ " \"network\": {\n"
+ " \"port\":{},\n"
+ " \"nodeFinder\":{\n"
+ " \"netClusterNodes\": [ {} ]\n"
+ " }\n"
+ " },\n"
+ " storage.profiles: {"
+ " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+ " " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " + pageMemorySize() + " "
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ " rest.port: {},\n"
+ " raft.fsync = " + fsync() + ",\n"
+ " system.partitionsLogPath = \"" + logPath() + "\",\n"
+ " failureHandler.handler: {\n"
+ " type: \"" + StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+ " tryStop: true,\n"
+ " timeoutMillis: 60000,\n" // 1 minute for graceful shutdown
+ " },\n"
+ "}";

for (int i = 0; i < nodes(); i++) {
int port = BASE_PORT + i;
String nodeName = nodeName(port);

String config = IgniteStringFormatter.format(configTemplate, port, connectNodeAddr,
BASE_CLIENT_PORT + i, BASE_REST_PORT + i);

igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName, config, workDir.resolve(nodeName)));
}

String metaStorageNodeName = nodeName(BASE_PORT);

InitParameters initParameters = InitParameters.builder()
.metaStorageNodeNames(metaStorageNodeName)
.clusterName("cluster")
.clusterConfiguration(clusterConfiguration())
.build();

TestIgnitionManager.init(igniteServers.get(0), initParameters);

for (IgniteServer node : igniteServers) {
assertThat(node.waitForInitAsync(), willCompleteSuccessfully());

if (publicIgnite == null) {
publicIgnite = node.api();
igniteImpl = unwrapIgniteImpl(publicIgnite);
}
}
}

@Nullable
protected String clusterConfiguration() {
return "ignite {}";
}

protected static String nodeName(int port) {
return "node_" + port;
}

protected Path workDir() throws Exception {
return new File("c:/work/tpcc").toPath();
}

protected int pageMemorySize() {
return 2073741824;
}

protected String logPath() {
return "";
}

protected boolean fsync() {
return false;
}

protected int nodes() {
return 1;
}

protected void dumpWarehouse() {
final String query = "select * from warehouse";
System.out.println("Executing the query: ");
List<List<Object>> rows = sql(publicIgnite, null, null, null, query);
for (List<Object> row : rows) {
System.out.println("Row: " + row);
}
}

protected static List<List<Object>> sql(Ignite node, @Nullable Transaction tx, @Nullable String schema, @Nullable ZoneId zoneId,
String query, Object... args) {
IgniteSql sql = node.sql();
StatementBuilder builder = sql.statementBuilder()
.query(query);

if (zoneId != null) {
builder.timeZoneId(zoneId);
}

if (schema != null) {
builder.defaultSchema(schema);
}

Statement statement = builder.build();
try (ResultSet<SqlRow> rs = sql.execute(tx, statement, args)) {
return getAllResultSet(rs);
}
}
}
Loading