Skip to content

Commit

Permalink
GNITE-18744 Implement primary replica side leaseGrant handler (#1765)
Browse files Browse the repository at this point in the history
  • Loading branch information
denis-chudov authored and vldpyatkov committed Mar 27, 2023
1 parent 64b518e commit 9323ee9
Show file tree
Hide file tree
Showing 43 changed files with 1,991 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
/** A constant holding the maximum value a {@code HybridTimestamp} can have. */
public static final HybridTimestamp MAX_VALUE = new HybridTimestamp(Long.MAX_VALUE, Integer.MAX_VALUE);

/**
* Cluster cLock skew. The constant determines the undefined inclusive interval to compares timestamp from various nodes.
* TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
*/
private static final long CLOCK_SKEW = 7L;

/** Physical clock. */
private final long physical;

Expand Down Expand Up @@ -131,6 +137,42 @@ public int hashCode() {
return result;
}

/**
* Compares two timestamps with the clock skew.
* t1, t2 comparable if t1 is not contained on [t2 - CLOCK_SKEW; t2 + CLOCK_SKEW].
* TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
*
* @param anotherTimestamp Another timestamp.
* @return Result of comparison can be positive or negative, or {@code 0} if timestamps are not comparable.
*/
private int compareWithClockSkew(HybridTimestamp anotherTimestamp) {
if (getPhysical() - CLOCK_SKEW <= anotherTimestamp.getPhysical() && getPhysical() + CLOCK_SKEW >= anotherTimestamp.getPhysical()) {
return 0;
}

return compareTo(anotherTimestamp);
}

/**
* Defines whether this timestamp is strictly before the given one, taking the clock skew into account.
*
* @param anotherTimestamp Another timestamp.
* @return Whether this timestamp is before the given one or not.
*/
public boolean before(HybridTimestamp anotherTimestamp) {
return compareWithClockSkew(anotherTimestamp) < 0;
}

/**
* Defines whether this timestamp is strictly after the given one, taking the clock skew into account.
*
* @param anotherTimestamp Another timestamp.
* @return Whether this timestamp is after the given one or not.
*/
public boolean after(HybridTimestamp anotherTimestamp) {
return compareWithClockSkew(anotherTimestamp) > 0;
}

@Override
public int compareTo(HybridTimestamp other) {
if (this.physical == other.physical) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -1005,4 +1007,52 @@ public static <T> Optional<T> findAny(Collection<T> collection, @Nullable Predic

return Optional.empty();
}

/**
* Retries operation until it succeeds or fails with exception that is different than the given.
*
* @param operation Operation.
* @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped.
* @param executor Executor to make retry in.
* @return Future that is completed when operation is successful or failed with other exception than the given.
*/
public static <T> CompletableFuture<T> retryOperationUntilSuccess(
Supplier<CompletableFuture<T>> operation,
Function<Throwable, Boolean> stopRetryCondition,
Executor executor
) {
CompletableFuture<T> fut = new CompletableFuture<>();

retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor);

return fut;
}

/**
* Retries operation until it succeeds or fails with exception that is different than the given.
*
* @param operation Operation.
* @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped.
* @param executor Executor to make retry in.
* @param fut Future that is completed when operation is successful or failed with other exception than the given.
*/
public static <T> void retryOperationUntilSuccess(
Supplier<CompletableFuture<T>> operation,
Function<Throwable, Boolean> stopRetryCondition,
CompletableFuture<T> fut,
Executor executor
) {
operation.get()
.whenComplete((res, e) -> {
if (e == null) {
fut.complete(res);
} else {
if (stopRetryCondition.apply(e)) {
fut.completeExceptionally(e);
} else {
executor.execute(() -> retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor));
}
}
});
}
}
29 changes: 29 additions & 0 deletions modules/placement-driver-api/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply from: "$rootDir/buildscripts/java-core.gradle"
apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"

dependencies {
annotationProcessor project(":ignite-network-annotation-processor")

implementation project(':ignite-core')
implementation project(':ignite-network-api')
}

description = 'ignite-placement-driver-api'
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.placementdriver.message;

import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;

/**
* Lease granted message.
*/
@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE)
public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage {
@Marshallable
HybridTimestamp leaseStartTime();

@Marshallable
HybridTimestamp leaseExpirationTime();

boolean force();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.placementdriver.message;

import org.apache.ignite.network.annotations.Transferable;

/**
* Response for lease granted message.
*/
@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE_RESPONSE)
public interface LeaseGrantedMessageResponse extends PlacementDriverReplicaMessage {
boolean accepted();

String redirectProposal();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.placementdriver.message;

import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_NAME;
import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_TYPE;

import org.apache.ignite.network.annotations.MessageGroup;

/**
* Message group for placement driver messages.
*/
@MessageGroup(groupType = GROUP_TYPE, groupName = GROUP_NAME)
public interface PlacementDriverMessageGroup {
/** Placement driver message group type. */
short GROUP_TYPE = 11;

String GROUP_NAME = "PlacementDriverMessages";

short LEASE_GRANTED_MESSAGE = 0;

short LEASE_GRANTED_MESSAGE_RESPONSE = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.placementdriver.message;

import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Marshallable;

/**
* Placement driver replica messages.
*/
public interface PlacementDriverReplicaMessage extends NetworkMessage {
/**
* Gets a replication group id.
*
* @return Replication group id.
*/
@Marshallable
ReplicationGroupId groupId();
}
2 changes: 2 additions & 0 deletions modules/placement-driver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation project(':ignite-affinity')
implementation project(':ignite-vault')
implementation project(':ignite-rocksdb-common:')
implementation project(':ignite-replicator')

implementation libs.jetbrains.annotations

Expand All @@ -48,6 +49,7 @@ dependencies {
integrationTestImplementation project(':ignite-schema')
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-affinity')
integrationTestImplementation project(':ignite-replicator')

integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-network')))
Expand Down
Loading

0 comments on commit 9323ee9

Please sign in to comment.