From 3a204b2d0a7f16230c6d45bb4b2d22cb56fc8bd8 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 4 Jun 2018 20:54:45 -0700 Subject: [PATCH] Update AtomicCounter implementation to use service proxies. --- .../core/counter/AtomicCounterType.java | 4 +- .../counter/impl/AtomicCounterOperations.java | 191 ------------------ .../core/counter/impl/AtomicCounterProxy.java | 49 ++--- .../counter/impl/AtomicCounterService.java | 184 +++++------------ .../impl/DefaultAtomicCounterService.java | 117 +++++++++++ .../core/generator/AtomicIdGeneratorType.java | 4 +- ...a => DefaultAtomicCounterServiceTest.java} | 26 +-- 7 files changed, 186 insertions(+), 389 deletions(-) delete mode 100644 core/src/main/java/io/atomix/core/counter/impl/AtomicCounterOperations.java create mode 100644 core/src/main/java/io/atomix/core/counter/impl/DefaultAtomicCounterService.java rename core/src/test/java/io/atomix/core/counter/impl/{AtomicCounterServiceTest.java => DefaultAtomicCounterServiceTest.java} (60%) diff --git a/core/src/main/java/io/atomix/core/counter/AtomicCounterType.java b/core/src/main/java/io/atomix/core/counter/AtomicCounterType.java index 212274db3e..f2d417aba6 100644 --- a/core/src/main/java/io/atomix/core/counter/AtomicCounterType.java +++ b/core/src/main/java/io/atomix/core/counter/AtomicCounterType.java @@ -17,7 +17,7 @@ import io.atomix.core.counter.impl.AtomicCounterProxyBuilder; import io.atomix.core.counter.impl.AtomicCounterResource; -import io.atomix.core.counter.impl.AtomicCounterService; +import io.atomix.core.counter.impl.DefaultAtomicCounterService; import io.atomix.primitive.PrimitiveManagementService; import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.resource.PrimitiveResource; @@ -49,7 +49,7 @@ public String name() { @Override public PrimitiveService newService(ServiceConfig config) { - return new AtomicCounterService(); + return new DefaultAtomicCounterService(); } @Override diff --git a/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterOperations.java b/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterOperations.java deleted file mode 100644 index 50a14d6de5..0000000000 --- a/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterOperations.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.atomix.core.counter.impl; - -import io.atomix.primitive.operation.OperationId; -import io.atomix.primitive.operation.OperationType; -import io.atomix.utils.serializer.KryoNamespace; -import io.atomix.utils.serializer.KryoNamespaces; - -/** - * {@link io.atomix.core.counter.AtomicCounter} operations. - *

- * WARNING: Do not refactor enum values. Only add to them. - * Changing values risk breaking the ability to backup/restore/upgrade clusters. - */ -public enum AtomicCounterOperations implements OperationId { - SET(OperationType.COMMAND), - COMPARE_AND_SET(OperationType.COMMAND), - INCREMENT_AND_GET(OperationType.COMMAND), - GET_AND_INCREMENT(OperationType.COMMAND), - DECREMENT_AND_GET(OperationType.COMMAND), - GET_AND_DECREMENT(OperationType.COMMAND), - ADD_AND_GET(OperationType.COMMAND), - GET_AND_ADD(OperationType.COMMAND), - GET(OperationType.QUERY); - - private final OperationType type; - - AtomicCounterOperations(OperationType type) { - this.type = type; - } - - @Override - public String id() { - return name(); - } - - @Override - public OperationType type() { - return type; - } - - public static final KryoNamespace NAMESPACE = KryoNamespace.builder() - .register(KryoNamespaces.BASIC) - .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID) - .register(Get.class) - .register(Set.class) - .register(CompareAndSet.class) - .register(AddAndGet.class) - .register(GetAndAdd.class) - .build(AtomicCounterOperations.class.getSimpleName()); - - /** - * Abstract value command. - */ - public abstract static class ValueOperation { - } - - /** - * Get query. - */ - public static class Get extends ValueOperation { - } - - /** - * Set command. - */ - public static class Set extends ValueOperation { - private Long value; - - public Set() { - } - - public Set(Long value) { - this.value = value; - } - - /** - * Returns the command value. - * - * @return The command value. - */ - public Long value() { - return value; - } - - @Override - public String toString() { - return String.format("%s[value=%s]", getClass().getSimpleName(), value); - } - } - - /** - * Compare and set command. - */ - public static class CompareAndSet extends ValueOperation { - private Long expect; - private Long update; - - public CompareAndSet() { - } - - public CompareAndSet(Long expect, Long update) { - this.expect = expect; - this.update = update; - } - - /** - * Returns the expected value. - * - * @return The expected value. - */ - public Long expect() { - return expect; - } - - /** - * Returns the updated value. - * - * @return The updated value. - */ - public Long update() { - return update; - } - - @Override - public String toString() { - return String.format("%s[expect=%s, update=%s]", getClass().getSimpleName(), expect, update); - } - } - - /** - * Delta command. - */ - public abstract static class DeltaOperation extends ValueOperation { - private long delta; - - public DeltaOperation() { - } - - public DeltaOperation(long delta) { - this.delta = delta; - } - - /** - * Returns the delta. - * - * @return The delta. - */ - public long delta() { - return delta; - } - } - - /** - * Get and add command. - */ - public static class GetAndAdd extends DeltaOperation { - public GetAndAdd() { - } - - public GetAndAdd(long delta) { - super(delta); - } - } - - /** - * Add and get command. - */ - public static class AddAndGet extends DeltaOperation { - public AddAndGet() { - } - - public AddAndGet(long delta) { - super(delta); - } - } -} \ No newline at end of file diff --git a/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterProxy.java b/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterProxy.java index bc92aa0816..0b6f5d86fb 100644 --- a/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterProxy.java +++ b/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterProxy.java @@ -17,12 +17,8 @@ import io.atomix.core.counter.AsyncAtomicCounter; import io.atomix.core.counter.AtomicCounter; -import io.atomix.core.counter.impl.AtomicCounterOperations.AddAndGet; -import io.atomix.core.counter.impl.AtomicCounterOperations.CompareAndSet; -import io.atomix.core.counter.impl.AtomicCounterOperations.GetAndAdd; -import io.atomix.core.counter.impl.AtomicCounterOperations.Set; +import io.atomix.primitive.AbstractAsyncPrimitiveProxy; import io.atomix.primitive.PrimitiveRegistry; -import io.atomix.primitive.AbstractAsyncPrimitive; import io.atomix.primitive.proxy.PrimitiveProxy; import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespaces; @@ -31,82 +27,61 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; -import static io.atomix.core.counter.impl.AtomicCounterOperations.ADD_AND_GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.COMPARE_AND_SET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.DECREMENT_AND_GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_ADD; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_DECREMENT; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_INCREMENT; -import static io.atomix.core.counter.impl.AtomicCounterOperations.INCREMENT_AND_GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.SET; - /** * Atomix counter implementation. */ -public class AtomicCounterProxy extends AbstractAsyncPrimitive implements AsyncAtomicCounter { +public class AtomicCounterProxy extends AbstractAsyncPrimitiveProxy implements AsyncAtomicCounter { private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder() .register(KryoNamespaces.BASIC) - .register(AtomicCounterOperations.NAMESPACE) .build()); public AtomicCounterProxy(PrimitiveProxy proxy, PrimitiveRegistry registry) { - super(proxy, registry); - } - - @Override - protected Serializer serializer() { - return SERIALIZER; - } - - private long nullOrZero(Long value) { - return value != null ? value : 0; + super(AtomicCounterService.class, proxy, registry); } @Override public CompletableFuture get() { - return this.invokeBy(getPartitionKey(), GET).thenApply(this::nullOrZero); + return applyBy(getPartitionKey(), service -> service.get()); } @Override public CompletableFuture set(long value) { - return this.invokeBy(getPartitionKey(), SET, new Set(value)); + return acceptBy(getPartitionKey(), service -> service.set(value)); } @Override public CompletableFuture compareAndSet(long expectedValue, long updateValue) { - return this.invokeBy(getPartitionKey(), COMPARE_AND_SET, - new CompareAndSet(expectedValue, updateValue)); + return applyBy(getPartitionKey(), service -> service.compareAndSet(expectedValue, updateValue)); } @Override public CompletableFuture addAndGet(long delta) { - return this.invokeBy(getPartitionKey(), ADD_AND_GET, new AddAndGet(delta)); + return applyBy(getPartitionKey(), service -> service.addAndGet(delta)); } @Override public CompletableFuture getAndAdd(long delta) { - return this.invokeBy(getPartitionKey(), GET_AND_ADD, new GetAndAdd(delta)); + return applyBy(getPartitionKey(), service -> service.getAndAdd(delta)); } @Override public CompletableFuture incrementAndGet() { - return this.invokeBy(getPartitionKey(), INCREMENT_AND_GET); + return applyBy(getPartitionKey(), service -> service.incrementAndGet()); } @Override public CompletableFuture getAndIncrement() { - return this.invokeBy(getPartitionKey(), GET_AND_INCREMENT); + return applyBy(getPartitionKey(), service -> service.getAndIncrement()); } @Override public CompletableFuture decrementAndGet() { - return this.invokeBy(getPartitionKey(), DECREMENT_AND_GET); + return applyBy(getPartitionKey(), service -> service.decrementAndGet()); } @Override public CompletableFuture getAndDecrement() { - return this.invokeBy(getPartitionKey(), GET_AND_DECREMENT); + return applyBy(getPartitionKey(), service -> service.getAndDecrement()); } @Override diff --git a/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterService.java b/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterService.java index 4ab05b5530..140973422d 100644 --- a/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterService.java +++ b/core/src/main/java/io/atomix/core/counter/impl/AtomicCounterService.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2015-present Open Networking Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,177 +15,89 @@ */ package io.atomix.core.counter.impl; -import io.atomix.core.counter.AtomicCounterType; -import io.atomix.core.counter.impl.AtomicCounterOperations.AddAndGet; -import io.atomix.core.counter.impl.AtomicCounterOperations.CompareAndSet; -import io.atomix.core.counter.impl.AtomicCounterOperations.GetAndAdd; -import io.atomix.core.counter.impl.AtomicCounterOperations.Set; -import io.atomix.primitive.service.AbstractPrimitiveService; -import io.atomix.primitive.service.BackupInput; -import io.atomix.primitive.service.BackupOutput; -import io.atomix.primitive.service.Commit; -import io.atomix.primitive.service.ServiceExecutor; -import io.atomix.utils.serializer.KryoNamespace; -import io.atomix.utils.serializer.KryoNamespaces; -import io.atomix.utils.serializer.Serializer; - -import java.util.Objects; - -import static io.atomix.core.counter.impl.AtomicCounterOperations.ADD_AND_GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.COMPARE_AND_SET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.DECREMENT_AND_GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_ADD; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_DECREMENT; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET_AND_INCREMENT; -import static io.atomix.core.counter.impl.AtomicCounterOperations.INCREMENT_AND_GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.SET; +import io.atomix.primitive.operation.Command; +import io.atomix.primitive.operation.Query; /** - * Atomix long state. + * Atomic counter service. */ -public class AtomicCounterService extends AbstractPrimitiveService { - private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder() - .register(KryoNamespaces.BASIC) - .register(AtomicCounterOperations.NAMESPACE) - .build()); - - private Long value = 0L; - - public AtomicCounterService() { - super(AtomicCounterType.instance()); - } - - @Override - public Serializer serializer() { - return SERIALIZER; - } - - @Override - protected void configure(ServiceExecutor executor) { - executor.register(SET, this::set); - executor.register(GET, this::get); - executor.register(COMPARE_AND_SET, this::compareAndSet); - executor.register(INCREMENT_AND_GET, this::incrementAndGet); - executor.register(GET_AND_INCREMENT, this::getAndIncrement); - executor.register(DECREMENT_AND_GET, this::decrementAndGet); - executor.register(GET_AND_DECREMENT, this::getAndDecrement); - executor.register(ADD_AND_GET, this::addAndGet); - executor.register(GET_AND_ADD, this::getAndAdd); - } - - @Override - public void backup(BackupOutput writer) { - writer.writeLong(value); - } - - @Override - public void restore(BackupInput reader) { - value = reader.readLong(); - } +public interface AtomicCounterService { /** - * Handles a set commit. + * Atomically increment by one and return the updated value. * - * @param commit the commit to handle + * @return updated value */ - protected void set(Commit commit) { - value = commit.value().value(); - } + @Command + long incrementAndGet(); /** - * Handles a get commit. + * Atomically decrement by one and return the updated value. * - * @param commit the commit to handle - * @return counter value + * @return updated value */ - protected Long get(Commit commit) { - return value; - } + @Command + long decrementAndGet(); /** - * Handles a compare and set commit. + * Atomically increment by one and return the previous value. * - * @param commit the commit to handle - * @return counter value + * @return previous value */ - protected boolean compareAndSet(Commit commit) { - if (Objects.equals(value, commit.value().expect())) { - value = commit.value().update(); - return true; - } - return false; - } + @Command + long getAndIncrement(); /** - * Handles an increment and get commit. + * Atomically decrement by one and return the previous value. * - * @param commit the commit to handle - * @return counter value + * @return previous value */ - protected long incrementAndGet(Commit commit) { - Long oldValue = value; - value = oldValue + 1; - return value; - } + @Command + long getAndDecrement(); /** - * Handles a get and increment commit. + * Atomically adds the given value to the current value. * - * @param commit the commit to handle - * @return counter value + * @param delta the value to add + * @return previous value */ - protected long getAndIncrement(Commit commit) { - Long oldValue = value; - value = oldValue + 1; - return oldValue; - } + @Command + long getAndAdd(long delta); /** - * Handles a decrement and get commit. + * Atomically adds the given value to the current value. * - * @param commit the commit to handle - * @return counter value + * @param delta the value to add + * @return updated value */ - protected long decrementAndGet(Commit commit) { - Long oldValue = value; - value = oldValue - 1; - return value; - } + @Command + long addAndGet(long delta); /** - * Handles a get and decrement commit. + * Atomically sets the given value to the current value. * - * @param commit the commit to handle - * @return counter value + * @param value the value to set */ - protected long getAndDecrement(Commit commit) { - Long oldValue = value; - value = oldValue - 1; - return oldValue; - } + @Command + void set(long value); /** - * Handles an add and get commit. + * Atomically sets the given counter to the updated value if the current value is the expected value, otherwise + * no change occurs. * - * @param commit the commit to handle - * @return counter value + * @param expectedValue the expected current value of the counter + * @param updateValue the new value to be set + * @return true if the update occurred and the expected value was equal to the current value, false otherwise */ - protected long addAndGet(Commit commit) { - Long oldValue = value; - value = oldValue + commit.value().delta(); - return value; - } + @Command + boolean compareAndSet(long expectedValue, long updateValue); /** - * Handles a get and add commit. + * Returns the current value of the counter without modifying it. * - * @param commit the commit to handle - * @return counter value + * @return current value */ - protected long getAndAdd(Commit commit) { - Long oldValue = value; - value = oldValue + commit.value().delta(); - return oldValue; - } -} \ No newline at end of file + @Query + long get(); + +} diff --git a/core/src/main/java/io/atomix/core/counter/impl/DefaultAtomicCounterService.java b/core/src/main/java/io/atomix/core/counter/impl/DefaultAtomicCounterService.java new file mode 100644 index 0000000000..f7eed5c850 --- /dev/null +++ b/core/src/main/java/io/atomix/core/counter/impl/DefaultAtomicCounterService.java @@ -0,0 +1,117 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.core.counter.impl; + +import io.atomix.core.counter.AtomicCounterType; +import io.atomix.primitive.service.AbstractPrimitiveService; +import io.atomix.primitive.service.BackupInput; +import io.atomix.primitive.service.BackupOutput; +import io.atomix.utils.serializer.KryoNamespace; +import io.atomix.utils.serializer.KryoNamespaces; +import io.atomix.utils.serializer.Serializer; + +import java.util.Objects; + +/** + * Atomix long state. + */ +public class DefaultAtomicCounterService extends AbstractPrimitiveService implements AtomicCounterService { + private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder() + .register(KryoNamespaces.BASIC) + .build()); + + private long value; + + public DefaultAtomicCounterService() { + super(AtomicCounterType.instance()); + } + + @Override + public Serializer serializer() { + return SERIALIZER; + } + + @Override + public void backup(BackupOutput writer) { + writer.writeLong(value); + } + + @Override + public void restore(BackupInput reader) { + value = reader.readLong(); + } + + @Override + public void set(long value) { + this.value = value; + } + + @Override + public long get() { + return value; + } + + @Override + public boolean compareAndSet(long expect, long update) { + if (Objects.equals(value, expect)) { + value = update; + return true; + } + return false; + } + + @Override + public long incrementAndGet() { + Long oldValue = value; + value = oldValue + 1; + return value; + } + + @Override + public long getAndIncrement() { + Long oldValue = value; + value = oldValue + 1; + return oldValue; + } + + @Override + public long decrementAndGet() { + Long oldValue = value; + value = oldValue - 1; + return value; + } + + @Override + public long getAndDecrement() { + Long oldValue = value; + value = oldValue - 1; + return oldValue; + } + + @Override + public long addAndGet(long delta) { + Long oldValue = value; + value = oldValue + delta; + return value; + } + + @Override + public long getAndAdd(long delta) { + Long oldValue = value; + value = oldValue + delta; + return oldValue; + } +} \ No newline at end of file diff --git a/core/src/main/java/io/atomix/core/generator/AtomicIdGeneratorType.java b/core/src/main/java/io/atomix/core/generator/AtomicIdGeneratorType.java index de694023ef..c3c843a06f 100644 --- a/core/src/main/java/io/atomix/core/generator/AtomicIdGeneratorType.java +++ b/core/src/main/java/io/atomix/core/generator/AtomicIdGeneratorType.java @@ -15,7 +15,7 @@ */ package io.atomix.core.generator; -import io.atomix.core.counter.impl.AtomicCounterService; +import io.atomix.core.counter.impl.DefaultAtomicCounterService; import io.atomix.core.generator.impl.AtomicIdGeneratorResource; import io.atomix.core.generator.impl.DelegatingAtomicIdGeneratorBuilder; import io.atomix.primitive.PrimitiveManagementService; @@ -49,7 +49,7 @@ public String name() { @Override public PrimitiveService newService(ServiceConfig config) { - return new AtomicCounterService(); + return new DefaultAtomicCounterService(); } @Override diff --git a/core/src/test/java/io/atomix/core/counter/impl/AtomicCounterServiceTest.java b/core/src/test/java/io/atomix/core/counter/impl/DefaultAtomicCounterServiceTest.java similarity index 60% rename from core/src/test/java/io/atomix/core/counter/impl/AtomicCounterServiceTest.java rename to core/src/test/java/io/atomix/core/counter/impl/DefaultAtomicCounterServiceTest.java index a64623331e..a9c46e7029 100644 --- a/core/src/test/java/io/atomix/core/counter/impl/AtomicCounterServiceTest.java +++ b/core/src/test/java/io/atomix/core/counter/impl/DefaultAtomicCounterServiceTest.java @@ -15,46 +15,30 @@ */ package io.atomix.core.counter.impl; -import io.atomix.core.counter.impl.AtomicCounterOperations.Set; import io.atomix.primitive.service.impl.DefaultBackupInput; import io.atomix.primitive.service.impl.DefaultBackupOutput; -import io.atomix.primitive.service.impl.DefaultCommit; -import io.atomix.primitive.session.PrimitiveSession; import io.atomix.storage.buffer.Buffer; import io.atomix.storage.buffer.HeapBuffer; import org.junit.Test; -import static io.atomix.core.counter.impl.AtomicCounterOperations.GET; -import static io.atomix.core.counter.impl.AtomicCounterOperations.SET; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; /** * Counter service test. */ -public class AtomicCounterServiceTest { +public class DefaultAtomicCounterServiceTest { @Test public void testSnapshot() throws Exception { - AtomicCounterService service = new AtomicCounterService(); - service.set(new DefaultCommit<>( - 2, - SET, - new Set(1L), - mock(PrimitiveSession.class), - System.currentTimeMillis())); + DefaultAtomicCounterService service = new DefaultAtomicCounterService(); + service.set(1); Buffer buffer = HeapBuffer.allocate(); service.backup(new DefaultBackupOutput(buffer, service.serializer())); - service = new AtomicCounterService(); + service = new DefaultAtomicCounterService(); service.restore(new DefaultBackupInput(buffer.flip(), service.serializer())); - long value = service.get(new DefaultCommit<>( - 2, - GET, - null, - mock(PrimitiveSession.class), - System.currentTimeMillis())); + long value = service.get(); assertEquals(1, value); } }