Skip to content

Commit

Permalink
Add atomic semaphore and Semaphore implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 11, 2018
1 parent 4a32fe3 commit cdd7ea0
Show file tree
Hide file tree
Showing 28 changed files with 1,441 additions and 507 deletions.
6 changes: 6 additions & 0 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -40,6 +40,7 @@
import io.atomix.core.profile.Profile; import io.atomix.core.profile.Profile;
import io.atomix.core.profile.ProfileConfig; import io.atomix.core.profile.ProfileConfig;
import io.atomix.core.queue.DistributedQueue; import io.atomix.core.queue.DistributedQueue;
import io.atomix.core.semaphore.AtomicSemaphore;
import io.atomix.core.semaphore.DistributedSemaphore; import io.atomix.core.semaphore.DistributedSemaphore;
import io.atomix.core.set.DistributedSet; import io.atomix.core.set.DistributedSet;
import io.atomix.core.set.DistributedTreeSet; import io.atomix.core.set.DistributedTreeSet;
Expand Down Expand Up @@ -454,6 +455,11 @@ public DistributedSemaphore getSemaphore(String name) {
return primitives.getSemaphore(name); return primitives.getSemaphore(name);
} }


@Override
public AtomicSemaphore getAtomicSemaphore(String name) {
return primitives.getAtomicSemaphore(name);
}

@Override @Override
public <E> WorkQueue<E> getWorkQueue(String name) { public <E> WorkQueue<E> getWorkQueue(String name) {
return primitives.getWorkQueue(name); return primitives.getWorkQueue(name);
Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/io/atomix/core/PrimitivesService.java
Expand Up @@ -66,6 +66,9 @@
import io.atomix.core.queue.DistributedQueue; import io.atomix.core.queue.DistributedQueue;
import io.atomix.core.queue.DistributedQueueBuilder; import io.atomix.core.queue.DistributedQueueBuilder;
import io.atomix.core.queue.DistributedQueueType; import io.atomix.core.queue.DistributedQueueType;
import io.atomix.core.semaphore.AtomicSemaphore;
import io.atomix.core.semaphore.AtomicSemaphoreBuilder;
import io.atomix.core.semaphore.AtomicSemaphoreType;
import io.atomix.core.semaphore.DistributedSemaphore; import io.atomix.core.semaphore.DistributedSemaphore;
import io.atomix.core.semaphore.DistributedSemaphoreBuilder; import io.atomix.core.semaphore.DistributedSemaphoreBuilder;
import io.atomix.core.semaphore.DistributedSemaphoreType; import io.atomix.core.semaphore.DistributedSemaphoreType;
Expand Down Expand Up @@ -601,6 +604,27 @@ default DistributedSemaphoreBuilder semaphoreBuilder(String name, PrimitiveProto
return primitiveBuilder(name, DistributedSemaphoreType.instance(), protocol); return primitiveBuilder(name, DistributedSemaphoreType.instance(), protocol);
} }


/**
* Creates a new DistributedSemaphoreBuilder.
*
* @param name the primitive name
* @return distributed semaphore builder
*/
default AtomicSemaphoreBuilder atomicSemaphoreBuilder(String name) {
return primitiveBuilder(name, AtomicSemaphoreType.instance());
}

/**
* Creates a new DistributedSemaphoreBuilder.
*
* @param name the primitive name
* @param protocol the primitive protocol
* @return distributed semaphore builder
*/
default AtomicSemaphoreBuilder atomicSemaphoreBuilder(String name, PrimitiveProtocol protocol) {
return primitiveBuilder(name, AtomicSemaphoreType.instance(), protocol);
}

/** /**
* Creates a new WorkQueueBuilder. * Creates a new WorkQueueBuilder.
* *
Expand Down Expand Up @@ -837,6 +861,14 @@ default TransactionBuilder transactionBuilder() {
*/ */
DistributedSemaphore getSemaphore(String name); DistributedSemaphore getSemaphore(String name);


/**
* Creates a new DistributedSemaphore.
*
* @param name the primitive name
* @return DistributedSemaphore
*/
AtomicSemaphore getAtomicSemaphore(String name);

/** /**
* Creates a new WorkQueueBuilder. * Creates a new WorkQueueBuilder.
* *
Expand Down
Expand Up @@ -57,6 +57,8 @@
import io.atomix.core.multiset.DistributedMultisetType; import io.atomix.core.multiset.DistributedMultisetType;
import io.atomix.core.queue.DistributedQueue; import io.atomix.core.queue.DistributedQueue;
import io.atomix.core.queue.DistributedQueueType; import io.atomix.core.queue.DistributedQueueType;
import io.atomix.core.semaphore.AtomicSemaphore;
import io.atomix.core.semaphore.AtomicSemaphoreType;
import io.atomix.core.semaphore.DistributedSemaphore; import io.atomix.core.semaphore.DistributedSemaphore;
import io.atomix.core.semaphore.DistributedSemaphoreType; import io.atomix.core.semaphore.DistributedSemaphoreType;
import io.atomix.core.set.DistributedSet; import io.atomix.core.set.DistributedSet;
Expand Down Expand Up @@ -263,6 +265,11 @@ public DistributedSemaphore getSemaphore(String name) {
return getPrimitive(name, DistributedSemaphoreType.instance(), configService.getConfig(name)); return getPrimitive(name, DistributedSemaphoreType.instance(), configService.getConfig(name));
} }


@Override
public AtomicSemaphore getAtomicSemaphore(String name) {
return getPrimitive(name, AtomicSemaphoreType.instance(), configService.getConfig(name));
}

@Override @Override
public <E> WorkQueue<E> getWorkQueue(String name) { public <E> WorkQueue<E> getWorkQueue(String name) {
return getPrimitive(name, WorkQueueType.instance(), configService.getConfig(name)); return getPrimitive(name, WorkQueueType.instance(), configService.getConfig(name));
Expand Down
145 changes: 145 additions & 0 deletions core/src/main/java/io/atomix/core/semaphore/AsyncAtomicSemaphore.java
@@ -0,0 +1,145 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.core.semaphore;

import io.atomix.primitive.AsyncPrimitive;
import io.atomix.primitive.DistributedPrimitive;
import io.atomix.utils.time.Version;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;


/**
* Distributed implementation of {@link java.util.concurrent.Semaphore}.
*/
public interface AsyncAtomicSemaphore extends AsyncPrimitive {

/**
* Acquires a permit from this semaphore.
*
* @return future to be completed once the permit has been acquired
*/
CompletableFuture<Version> acquire();

/**
* Acquires the given number of permits from this semaphore.
*
* @return future to be completed once the permits has been acquired
*/
CompletableFuture<Version> acquire(int permits);

/**
* Acquires a permit, if one is available and returns immediately.
*
* @return future to be completed with a boolean indicating whether the permit was acquired
*/
CompletableFuture<Optional<Version>> tryAcquire();

/**
* Acquires the given number of permits, if they are available and returns immediately.
*
* @param permits permits to acquire
* @return future to be completed with a boolean indicating whether the permits was acquired
*/
CompletableFuture<Optional<Version>> tryAcquire(int permits);

/**
* Acquires a permit from this semaphore if one becomes available within the given waiting time.
*
* @param timeout the maximum time to wait for a permit
* @return future to be completed with a boolean indicating whether the permit was acquired
*/
CompletableFuture<Optional<Version>> tryAcquire(Duration timeout);

/**
* Acquires the given number of permits, if they are available within the given waiting time.
*
* @param permits permits to acquire
* @param timeout the maximum time to wait for a permit
* @return future to be completed with a boolean indicating whether the permits was acquired
*/
CompletableFuture<Optional<Version>> tryAcquire(int permits, Duration timeout);

/**
* Releases a permit.
*
* @return future to be completed once the permit has been released
*/
CompletableFuture<Void> release();

/**
* Releases the given number of permits.
*
* @param permits permits to release
* @return future to be completed once the permits has been released
*/
CompletableFuture<Void> release(int permits);

/**
* Query the current number of permits available.
*
* @return a future for available permits
*/
CompletableFuture<Integer> availablePermits();

/**
* Acquires and returns all permits that are immediately available.
* If the initial permits is negative, this will set available permits to 0,
* and return a negative number.
* If a positive number is returned, the acquired permits will be recorded.
* If the Client disconnects, these permits will be automatically released.
*
* @return the future complete with number of permits acquired
*/
CompletableFuture<Integer> drainPermits();

/**
* Increases the number of available permits by the indicated
* amount. This method differs from {@code release} in that it does not
* effect the amount of permits this caller has acquired.
*
* @param permits the number of permits to add
* @return the future complete with available permits after increase
*/
CompletableFuture<Integer> increasePermits(int permits);

/**
* Shrinks the number of available permits by the indicated reduction.
* This method differs from {@code acquire} in that it does not block
* waiting for permits to become available and can be reduced to negative.
*
* @param permits the number of permits to remove
* @return the future complete with available permits after increase
*/
CompletableFuture<Integer> reducePermits(int permits);

/**
* Query the waiting queue status.
*
* @return the future complete with waiting queue status
*/
CompletableFuture<QueueStatus> queueStatus();

@Override
default AtomicSemaphore sync() {
return sync(Duration.ofMillis(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS));
}

@Override
AtomicSemaphore sync(Duration operationTimeout);
}
Expand Up @@ -17,10 +17,8 @@


import io.atomix.primitive.AsyncPrimitive; import io.atomix.primitive.AsyncPrimitive;
import io.atomix.primitive.DistributedPrimitive; import io.atomix.primitive.DistributedPrimitive;
import io.atomix.utils.time.Version;


import java.time.Duration; import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;




Expand All @@ -34,37 +32,37 @@ public interface AsyncDistributedSemaphore extends AsyncPrimitive {
* *
* @return future to be completed once the permit has been acquired * @return future to be completed once the permit has been acquired
*/ */
CompletableFuture<Version> acquire(); CompletableFuture<Void> acquire();


/** /**
* Acquires the given number of permits from this semaphore. * Acquires the given number of permits from this semaphore.
* *
* @return future to be completed once the permits has been acquired * @return future to be completed once the permits has been acquired
*/ */
CompletableFuture<Version> acquire(int permits); CompletableFuture<Void> acquire(int permits);


/** /**
* Acquires a permit, if one is available and returns immediately. * Acquires a permit, if one is available and returns immediately.
* *
* @return future to be completed with a boolean indicating whether the permit was acquired * @return future to be completed with a boolean indicating whether the permit was acquired
*/ */
CompletableFuture<Optional<Version>> tryAcquire(); CompletableFuture<Boolean> tryAcquire();


/** /**
* Acquires the given number of permits, if they are available and returns immediately. * Acquires the given number of permits, if they are available and returns immediately.
* *
* @param permits permits to acquire * @param permits permits to acquire
* @return future to be completed with a boolean indicating whether the permits was acquired * @return future to be completed with a boolean indicating whether the permits was acquired
*/ */
CompletableFuture<Optional<Version>> tryAcquire(int permits); CompletableFuture<Boolean> tryAcquire(int permits);


/** /**
* Acquires a permit from this semaphore if one becomes available within the given waiting time. * Acquires a permit from this semaphore if one becomes available within the given waiting time.
* *
* @param timeout the maximum time to wait for a permit * @param timeout the maximum time to wait for a permit
* @return future to be completed with a boolean indicating whether the permit was acquired * @return future to be completed with a boolean indicating whether the permit was acquired
*/ */
CompletableFuture<Optional<Version>> tryAcquire(Duration timeout); CompletableFuture<Boolean> tryAcquire(Duration timeout);


/** /**
* Acquires the given number of permits, if they are available within the given waiting time. * Acquires the given number of permits, if they are available within the given waiting time.
Expand All @@ -73,7 +71,7 @@ public interface AsyncDistributedSemaphore extends AsyncPrimitive {
* @param timeout the maximum time to wait for a permit * @param timeout the maximum time to wait for a permit
* @return future to be completed with a boolean indicating whether the permits was acquired * @return future to be completed with a boolean indicating whether the permits was acquired
*/ */
CompletableFuture<Optional<Version>> tryAcquire(int permits, Duration timeout); CompletableFuture<Boolean> tryAcquire(int permits, Duration timeout);


/** /**
* Releases a permit. * Releases a permit.
Expand Down Expand Up @@ -116,7 +114,7 @@ public interface AsyncDistributedSemaphore extends AsyncPrimitive {
* @param permits the number of permits to add * @param permits the number of permits to add
* @return the future complete with available permits after increase * @return the future complete with available permits after increase
*/ */
CompletableFuture<Integer> increase(int permits); CompletableFuture<Integer> increasePermits(int permits);


/** /**
* Shrinks the number of available permits by the indicated reduction. * Shrinks the number of available permits by the indicated reduction.
Expand All @@ -126,7 +124,7 @@ public interface AsyncDistributedSemaphore extends AsyncPrimitive {
* @param permits the number of permits to remove * @param permits the number of permits to remove
* @return the future complete with available permits after increase * @return the future complete with available permits after increase
*/ */
CompletableFuture<Integer> reduce(int permits); CompletableFuture<Integer> reducePermits(int permits);


/** /**
* Query the waiting queue status. * Query the waiting queue status.
Expand Down

0 comments on commit cdd7ea0

Please sign in to comment.