Skip to content

Commit

Permalink
Update AtomicCounter implementation to use service proxies.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 7, 2018
1 parent 2214242 commit 3a204b2
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 389 deletions.
Expand Up @@ -17,7 +17,7 @@


import io.atomix.core.counter.impl.AtomicCounterProxyBuilder; import io.atomix.core.counter.impl.AtomicCounterProxyBuilder;
import io.atomix.core.counter.impl.AtomicCounterResource; import io.atomix.core.counter.impl.AtomicCounterResource;
import io.atomix.core.counter.impl.AtomicCounterService; import io.atomix.core.counter.impl.DefaultAtomicCounterService;
import io.atomix.primitive.PrimitiveManagementService; import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.resource.PrimitiveResource; import io.atomix.primitive.resource.PrimitiveResource;
Expand Down Expand Up @@ -49,7 +49,7 @@ public String name() {


@Override @Override
public PrimitiveService newService(ServiceConfig config) { public PrimitiveService newService(ServiceConfig config) {
return new AtomicCounterService(); return new DefaultAtomicCounterService();
} }


@Override @Override
Expand Down

This file was deleted.

Expand Up @@ -17,12 +17,8 @@


import io.atomix.core.counter.AsyncAtomicCounter; import io.atomix.core.counter.AsyncAtomicCounter;
import io.atomix.core.counter.AtomicCounter; import io.atomix.core.counter.AtomicCounter;
import io.atomix.core.counter.impl.AtomicCounterOperations.AddAndGet; import io.atomix.primitive.AbstractAsyncPrimitiveProxy;
import io.atomix.core.counter.impl.AtomicCounterOperations.CompareAndSet;
import io.atomix.core.counter.impl.AtomicCounterOperations.GetAndAdd;
import io.atomix.core.counter.impl.AtomicCounterOperations.Set;
import io.atomix.primitive.PrimitiveRegistry; import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.AbstractAsyncPrimitive;
import io.atomix.primitive.proxy.PrimitiveProxy; import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
Expand All @@ -31,82 +27,61 @@
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import static io.atomix.core.counter.impl.AtomicCounterOperations.ADD_AND_GET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.COMPARE_AND_SET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.DECREMENT_AND_GET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.GET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_ADD;
import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_DECREMENT;
import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_INCREMENT;
import static io.atomix.core.counter.impl.AtomicCounterOperations.INCREMENT_AND_GET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.SET;

/** /**
* Atomix counter implementation. * Atomix counter implementation.
*/ */
public class AtomicCounterProxy extends AbstractAsyncPrimitive<AsyncAtomicCounter> implements AsyncAtomicCounter { public class AtomicCounterProxy extends AbstractAsyncPrimitiveProxy<AsyncAtomicCounter, AtomicCounterService> implements AsyncAtomicCounter {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder() private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder()
.register(KryoNamespaces.BASIC) .register(KryoNamespaces.BASIC)
.register(AtomicCounterOperations.NAMESPACE)
.build()); .build());


public AtomicCounterProxy(PrimitiveProxy proxy, PrimitiveRegistry registry) { public AtomicCounterProxy(PrimitiveProxy proxy, PrimitiveRegistry registry) {
super(proxy, registry); super(AtomicCounterService.class, proxy, registry);
}

@Override
protected Serializer serializer() {
return SERIALIZER;
}

private long nullOrZero(Long value) {
return value != null ? value : 0;
} }


@Override @Override
public CompletableFuture<Long> get() { public CompletableFuture<Long> get() {
return this.<Long>invokeBy(getPartitionKey(), GET).thenApply(this::nullOrZero); return applyBy(getPartitionKey(), service -> service.get());
} }


@Override @Override
public CompletableFuture<Void> set(long value) { public CompletableFuture<Void> set(long value) {
return this.invokeBy(getPartitionKey(), SET, new Set(value)); return acceptBy(getPartitionKey(), service -> service.set(value));
} }


@Override @Override
public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) { public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
return this.invokeBy(getPartitionKey(), COMPARE_AND_SET, return applyBy(getPartitionKey(), service -> service.compareAndSet(expectedValue, updateValue));
new CompareAndSet(expectedValue, updateValue));
} }


@Override @Override
public CompletableFuture<Long> addAndGet(long delta) { public CompletableFuture<Long> addAndGet(long delta) {
return this.invokeBy(getPartitionKey(), ADD_AND_GET, new AddAndGet(delta)); return applyBy(getPartitionKey(), service -> service.addAndGet(delta));
} }


@Override @Override
public CompletableFuture<Long> getAndAdd(long delta) { public CompletableFuture<Long> getAndAdd(long delta) {
return this.invokeBy(getPartitionKey(), GET_AND_ADD, new GetAndAdd(delta)); return applyBy(getPartitionKey(), service -> service.getAndAdd(delta));
} }


@Override @Override
public CompletableFuture<Long> incrementAndGet() { public CompletableFuture<Long> incrementAndGet() {
return this.invokeBy(getPartitionKey(), INCREMENT_AND_GET); return applyBy(getPartitionKey(), service -> service.incrementAndGet());
} }


@Override @Override
public CompletableFuture<Long> getAndIncrement() { public CompletableFuture<Long> getAndIncrement() {
return this.invokeBy(getPartitionKey(), GET_AND_INCREMENT); return applyBy(getPartitionKey(), service -> service.getAndIncrement());
} }


@Override @Override
public CompletableFuture<Long> decrementAndGet() { public CompletableFuture<Long> decrementAndGet() {
return this.invokeBy(getPartitionKey(), DECREMENT_AND_GET); return applyBy(getPartitionKey(), service -> service.decrementAndGet());
} }


@Override @Override
public CompletableFuture<Long> getAndDecrement() { public CompletableFuture<Long> getAndDecrement() {
return this.invokeBy(getPartitionKey(), GET_AND_DECREMENT); return applyBy(getPartitionKey(), service -> service.getAndDecrement());
} }


@Override @Override
Expand Down

0 comments on commit 3a204b2

Please sign in to comment.