Skip to content

Commit

Permalink
Rename RaftLock classes to RaftDistributedLock* for consistency with …
Browse files Browse the repository at this point in the history
…interface names.
  • Loading branch information
kuujo committed Nov 7, 2017
1 parent 7416e2d commit 27ce928
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 93 deletions.
Expand Up @@ -33,7 +33,7 @@
import io.atomix.primitives.multimap.impl.RaftConsistentSetMultimapService;
import io.atomix.primitives.queue.impl.RaftWorkQueueService;
import io.atomix.primitives.tree.impl.RaftDocumentTreeService;
import io.atomix.primitives.value.impl.RaftValueService;
import io.atomix.primitives.value.impl.RaftAtomicValueService;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.protocol.messaging.RaftClientCommunicator;
import io.atomix.protocols.raft.service.RaftService;
Expand Down Expand Up @@ -74,7 +74,7 @@ public abstract class AtomixPartition implements Managed<AtomixPartition> {
.put(DistributedPrimitive.Type.COUNTER.name(), RaftCounterService::new)
.put(DistributedPrimitive.Type.LEADER_ELECTOR.name(), RaftLeaderElectorService::new)
.put(DistributedPrimitive.Type.WORK_QUEUE.name(), RaftWorkQueueService::new)
.put(Type.VALUE.name(), RaftValueService::new)
.put(Type.VALUE.name(), RaftAtomicValueService::new)
.put(DistributedPrimitive.Type.DOCUMENT_TREE.name(),
() -> new RaftDocumentTreeService(Ordering.NATURAL))
.put(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), Ordering.NATURAL),
Expand Down
Expand Up @@ -28,7 +28,7 @@
import io.atomix.primitives.leadership.impl.RaftLeaderElector;
import io.atomix.primitives.leadership.impl.TranscodingAsyncLeaderElector;
import io.atomix.primitives.lock.AsyncDistributedLock;
import io.atomix.primitives.lock.impl.RaftLock;
import io.atomix.primitives.lock.impl.RaftDistributedLock;
import io.atomix.primitives.map.AsyncAtomicCounterMap;
import io.atomix.primitives.map.AsyncConsistentMap;
import io.atomix.primitives.map.AsyncConsistentTreeMap;
Expand All @@ -45,7 +45,7 @@
import io.atomix.primitives.tree.impl.RaftDocumentTree;
import io.atomix.primitives.tree.impl.TranscodingAsyncDocumentTree;
import io.atomix.primitives.value.AsyncAtomicValue;
import io.atomix.primitives.value.impl.RaftValue;
import io.atomix.primitives.value.impl.RaftAtomicValue;
import io.atomix.primitives.value.impl.TranscodingAsyncAtomicValue;
import io.atomix.protocols.raft.RaftClient;
import io.atomix.protocols.raft.ReadConsistency;
Expand Down Expand Up @@ -224,7 +224,7 @@ public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {

@Override
public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
RaftValue value = new RaftValue(client.newProxyBuilder()
RaftAtomicValue value = new RaftAtomicValue(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.VALUE.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
Expand Down Expand Up @@ -285,7 +285,7 @@ public <T> AsyncLeaderElector<T> newAsyncLeaderElector(String name, Serializer s

@Override
public AsyncDistributedLock newAsyncDistributedLock(String name, Duration lockTimeout) {
return new RaftLock(client.newProxyBuilder()
return new RaftDistributedLock(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
Expand Down
Expand Up @@ -17,8 +17,8 @@

import io.atomix.primitives.impl.AbstractRaftPrimitive;
import io.atomix.primitives.lock.AsyncDistributedLock;
import io.atomix.primitives.lock.impl.RaftLockOperations.Lock;
import io.atomix.primitives.lock.impl.RaftLockOperations.Unlock;
import io.atomix.primitives.lock.impl.RaftDistributedLockOperations.Lock;
import io.atomix.primitives.lock.impl.RaftDistributedLockOperations.Unlock;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.serializer.Serializer;
import io.atomix.serializer.kryo.KryoNamespace;
Expand All @@ -32,27 +32,27 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static io.atomix.primitives.lock.impl.RaftLockOperations.LOCK;
import static io.atomix.primitives.lock.impl.RaftLockOperations.UNLOCK;
import static io.atomix.primitives.lock.impl.RaftDistributedLockOperations.LOCK;
import static io.atomix.primitives.lock.impl.RaftDistributedLockOperations.UNLOCK;

/**
* Raft lock.
*/
public class RaftLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
public class RaftDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(RaftLockOperations.NAMESPACE)
.register(RaftLockEvents.NAMESPACE)
.register(RaftDistributedLockOperations.NAMESPACE)
.register(RaftDistributedLockEvents.NAMESPACE)
.build());

private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
private final AtomicInteger id = new AtomicInteger();
private int lock;

public RaftLock(RaftProxy proxy) {
public RaftDistributedLock(RaftProxy proxy) {
super(proxy);
proxy.addEventListener(RaftLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
proxy.addEventListener(RaftLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
proxy.addEventListener(RaftDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
proxy.addEventListener(RaftDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
}

private void handleLocked(LockEvent event) {
Expand Down
Expand Up @@ -22,13 +22,13 @@
/**
* Raft value events.
*/
public enum RaftLockEvents implements EventType {
public enum RaftDistributedLockEvents implements EventType {
LOCK("lock"),
FAIL("fail");

private final String id;

RaftLockEvents(String id) {
RaftDistributedLockEvents(String id) {
this.id = id;
}

Expand All @@ -41,6 +41,6 @@ public String id() {
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50)
.register(LockEvent.class)
.register(byte[].class)
.build(RaftLockEvents.class.getSimpleName());
.build(RaftDistributedLockEvents.class.getSimpleName());

}
Expand Up @@ -25,14 +25,14 @@
/**
* Counter commands.
*/
public enum RaftLockOperations implements OperationId {
public enum RaftDistributedLockOperations implements OperationId {
LOCK("lock", OperationType.COMMAND),
UNLOCK("unlock", OperationType.COMMAND);

private final String id;
private final OperationType type;

RaftLockOperations(String id, OperationType type) {
RaftDistributedLockOperations(String id, OperationType type) {
this.id = id;
this.type = type;
}
Expand All @@ -52,7 +52,7 @@ public OperationType type() {
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(Lock.class)
.register(Unlock.class)
.build(RaftLockOperations.class.getSimpleName());
.build(RaftDistributedLockOperations.class.getSimpleName());

/**
* Abstract lock operation.
Expand Down
Expand Up @@ -15,8 +15,8 @@
*/
package io.atomix.primitives.lock.impl;

import io.atomix.primitives.lock.impl.RaftLockOperations.Lock;
import io.atomix.primitives.lock.impl.RaftLockOperations.Unlock;
import io.atomix.primitives.lock.impl.RaftDistributedLockOperations.Lock;
import io.atomix.primitives.lock.impl.RaftDistributedLockOperations.Unlock;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
Expand All @@ -35,17 +35,17 @@
import java.util.Queue;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.atomix.primitives.lock.impl.RaftLockOperations.LOCK;
import static io.atomix.primitives.lock.impl.RaftLockOperations.UNLOCK;
import static io.atomix.primitives.lock.impl.RaftDistributedLockOperations.LOCK;
import static io.atomix.primitives.lock.impl.RaftDistributedLockOperations.UNLOCK;

/**
* Raft atomic value service.
*/
public class RaftLockService extends AbstractRaftService {
public class RaftDistributedLockService extends AbstractRaftService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(RaftLockOperations.NAMESPACE)
.register(RaftLockEvents.NAMESPACE)
.register(RaftDistributedLockOperations.NAMESPACE)
.register(RaftDistributedLockEvents.NAMESPACE)
.register(LockHolder.class)
.build());

Expand Down Expand Up @@ -78,7 +78,7 @@ public void install(SnapshotReader reader) {
queue.remove(holder);
RaftSession session = sessions().getSession(holder.session);
if (session != null && session.getState().active()) {
session.publish(RaftLockEvents.FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
session.publish(RaftDistributedLockEvents.FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
}
}));
}
Expand All @@ -105,9 +105,9 @@ protected void lock(Commit<Lock> commit) {
commit.index(),
commit.session().sessionId().id(),
0);
commit.session().publish(RaftLockEvents.LOCK, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
commit.session().publish(RaftDistributedLockEvents.LOCK, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
} else if (commit.value().timeout() == 0) {
commit.session().publish(RaftLockEvents.FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
commit.session().publish(RaftDistributedLockEvents.FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
} else if (commit.value().timeout() > 0) {
LockHolder holder = lock = new LockHolder(
commit.value().id(),
Expand All @@ -119,7 +119,7 @@ protected void lock(Commit<Lock> commit) {
timers.remove(commit.index());
queue.remove(holder);
if (commit.session().getState().active()) {
commit.session().publish(RaftLockEvents.FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
commit.session().publish(RaftDistributedLockEvents.FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
}
}));
} else {
Expand Down Expand Up @@ -152,7 +152,7 @@ protected void unlock(Commit<Unlock> commit) {
if (session == null || session.getState() == RaftSession.State.EXPIRED || session.getState() == RaftSession.State.CLOSED) {
lock = queue.poll();
} else {
session.publish(RaftLockEvents.LOCK, SERIALIZER::encode, new LockEvent(lock.id, commit.index()));
session.publish(RaftDistributedLockEvents.LOCK, SERIALIZER::encode, new LockEvent(lock.id, commit.index()));
break;
}
}
Expand All @@ -175,7 +175,7 @@ private void releaseSession(RaftSession session) {
if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED || lockSession.getState() == RaftSession.State.CLOSED) {
lock = queue.poll();
} else {
lockSession.publish(RaftLockEvents.LOCK, SERIALIZER::encode, new LockEvent(lock.id, lock.index));
lockSession.publish(RaftDistributedLockEvents.LOCK, SERIALIZER::encode, new LockEvent(lock.id, lock.index));
break;
}
}
Expand Down
Expand Up @@ -19,8 +19,8 @@
import io.atomix.primitives.impl.AbstractRaftPrimitive;
import io.atomix.primitives.value.AsyncAtomicValue;
import io.atomix.primitives.value.AtomicValueEventListener;
import io.atomix.primitives.value.impl.RaftValueOperations.CompareAndSet;
import io.atomix.primitives.value.impl.RaftValueOperations.GetAndSet;
import io.atomix.primitives.value.impl.RaftAtomicValueOperations.CompareAndSet;
import io.atomix.primitives.value.impl.RaftAtomicValueOperations.GetAndSet;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.serializer.Serializer;
import io.atomix.serializer.kryo.KryoNamespace;
Expand All @@ -29,26 +29,26 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static io.atomix.primitives.value.impl.RaftValueOperations.ADD_LISTENER;
import static io.atomix.primitives.value.impl.RaftValueOperations.COMPARE_AND_SET;
import static io.atomix.primitives.value.impl.RaftValueOperations.GET;
import static io.atomix.primitives.value.impl.RaftValueOperations.GET_AND_SET;
import static io.atomix.primitives.value.impl.RaftValueOperations.REMOVE_LISTENER;
import static io.atomix.primitives.value.impl.RaftValueOperations.SET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.ADD_LISTENER;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.COMPARE_AND_SET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.GET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.GET_AND_SET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.REMOVE_LISTENER;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.SET;

/**
* Atomix counter implementation.
*/
public class RaftValue extends AbstractRaftPrimitive implements AsyncAtomicValue<byte[]> {
public class RaftAtomicValue extends AbstractRaftPrimitive implements AsyncAtomicValue<byte[]> {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(RaftValueOperations.NAMESPACE)
.register(RaftValueEvents.NAMESPACE)
.register(RaftAtomicValueOperations.NAMESPACE)
.register(RaftAtomicValueEvents.NAMESPACE)
.build());

private final Set<AtomicValueEventListener<byte[]>> eventListeners = Sets.newConcurrentHashSet();

public RaftValue(RaftProxy proxy) {
public RaftAtomicValue(RaftProxy proxy) {
super(proxy);
}

Expand All @@ -59,7 +59,7 @@ public CompletableFuture<byte[]> get() {

@Override
public CompletableFuture<Void> set(byte[] value) {
return proxy.invoke(SET, SERIALIZER::encode, new RaftValueOperations.Set(value));
return proxy.invoke(SET, SERIALIZER::encode, new RaftAtomicValueOperations.Set(value));
}

@Override
Expand Down
Expand Up @@ -23,12 +23,12 @@
/**
* Raft value events.
*/
public enum RaftValueEvents implements EventType {
public enum RaftAtomicValueEvents implements EventType {
CHANGE("change");

private final String id;

RaftValueEvents(String id) {
RaftAtomicValueEvents(String id) {
this.id = id;
}

Expand All @@ -42,6 +42,6 @@ public String id() {
.register(AtomicValueEvent.class)
.register(AtomicValueEvent.Type.class)
.register(byte[].class)
.build(RaftValueEvents.class.getSimpleName());
.build(RaftAtomicValueEvents.class.getSimpleName());

}
Expand Up @@ -26,7 +26,7 @@
/**
* Counter commands.
*/
public enum RaftValueOperations implements OperationId {
public enum RaftAtomicValueOperations implements OperationId {
GET("get", OperationType.QUERY),
SET("set", OperationType.COMMAND),
COMPARE_AND_SET("compareAndSet", OperationType.COMMAND),
Expand All @@ -37,7 +37,7 @@ public enum RaftValueOperations implements OperationId {
private final String id;
private final OperationType type;

RaftValueOperations(String id, OperationType type) {
RaftAtomicValueOperations(String id, OperationType type) {
this.id = id;
this.type = type;
}
Expand All @@ -59,7 +59,7 @@ public OperationType type() {
.register(Set.class)
.register(CompareAndSet.class)
.register(GetAndSet.class)
.build(RaftValueOperations.class.getSimpleName());
.build(RaftAtomicValueOperations.class.getSimpleName());

/**
* Abstract value command.
Expand Down
Expand Up @@ -17,9 +17,9 @@

import com.google.common.collect.Sets;
import io.atomix.primitives.value.AtomicValueEvent;
import io.atomix.primitives.value.impl.RaftValueOperations.CompareAndSet;
import io.atomix.primitives.value.impl.RaftValueOperations.GetAndSet;
import io.atomix.primitives.value.impl.RaftValueOperations.Set;
import io.atomix.primitives.value.impl.RaftAtomicValueOperations.CompareAndSet;
import io.atomix.primitives.value.impl.RaftAtomicValueOperations.GetAndSet;
import io.atomix.primitives.value.impl.RaftAtomicValueOperations.Set;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
Expand All @@ -33,22 +33,22 @@
import java.util.Arrays;
import java.util.HashSet;

import static io.atomix.primitives.value.impl.RaftValueEvents.CHANGE;
import static io.atomix.primitives.value.impl.RaftValueOperations.ADD_LISTENER;
import static io.atomix.primitives.value.impl.RaftValueOperations.COMPARE_AND_SET;
import static io.atomix.primitives.value.impl.RaftValueOperations.GET;
import static io.atomix.primitives.value.impl.RaftValueOperations.GET_AND_SET;
import static io.atomix.primitives.value.impl.RaftValueOperations.REMOVE_LISTENER;
import static io.atomix.primitives.value.impl.RaftValueOperations.SET;
import static io.atomix.primitives.value.impl.RaftAtomicValueEvents.CHANGE;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.ADD_LISTENER;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.COMPARE_AND_SET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.GET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.GET_AND_SET;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.REMOVE_LISTENER;
import static io.atomix.primitives.value.impl.RaftAtomicValueOperations.SET;

/**
* Raft atomic value service.
*/
public class RaftValueService extends AbstractRaftService {
public class RaftAtomicValueService extends AbstractRaftService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(RaftValueOperations.NAMESPACE)
.register(RaftValueEvents.NAMESPACE)
.register(RaftAtomicValueOperations.NAMESPACE)
.register(RaftAtomicValueEvents.NAMESPACE)
.build());

private byte[] value = new byte[0];
Expand Down

0 comments on commit 27ce928

Please sign in to comment.