Skip to content

Commit

Permalink
ARTEMIS-3340 - implement sequential activation tracking for the pluga…
Browse files Browse the repository at this point in the history
…ble quorum replication policies
  • Loading branch information
gtully committed Jul 8, 2021
1 parent 550b08c commit 0f149b6
Show file tree
Hide file tree
Showing 33 changed files with 851 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.io.File;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.airlift.airline.Command;
import io.airlift.airline.Option;
Expand Down Expand Up @@ -71,6 +71,7 @@ public static void setEmbedded(boolean embedded) {
public Object execute(ActionContext context) throws Exception {
super.execute(context);

AtomicReference<Throwable> serverActivationFailed = new AtomicReference<>();
try {
BrokerDTO broker = getBrokerDTO();
ActiveMQSecurityManager securityManager = SecurityManagerFactory.create(broker.security);
Expand Down Expand Up @@ -110,8 +111,7 @@ public void deActivate() {
server = BrokerFactory.createServer(broker.server, securityManager, activateCallback);

server.createComponents();
AtomicBoolean serverActivationFailed = new AtomicBoolean(false);
server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(true));
server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(exception));
server.start();
server.getServer().addExternalComponent(managementContext, false);

Expand All @@ -126,14 +126,16 @@ public void deActivate() {
server.getServer().addExternalComponent(component, true);
assert component.isStarted();
}

if (serverActivationFailed.get()) {
stop();
}
} catch (Throwable t) {
t.printStackTrace();
serverActivationFailed.set(t);
}

if (serverActivationFailed.get() != null) {
stop();
return serverActivationFailed.get();
}

return new Pair<>(managementContext, server.getServer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2833,4 +2833,12 @@ static void isAutoCreated(Object source) {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601747, value = "User {0} is getting auto-created property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void isAutoCreated(String user, Object source, Object... args);

static void getActivationSequence(Object source) {
LOGGER.getActivationSequence(getCaller(), source);
}

@LogMessage(level = Logger.Level.INFO)
@Message(id = 601748, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getActivationSequence(String user, Object source, Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ public interface ActiveMQServerControl {
@Attribute(desc = "Node ID of this server")
String getNodeID();


/**
* Returns the current activation sequence number of this server.
* <br>
* When replicated, peers may coordinate activation with this monotonic sequence
*/
@Attribute(desc = "Activation sequence of this server instance")
long getActivationSequence();

/**
* Returns the management notification address of this server.
* <br>
Expand Down
10 changes: 10 additions & 0 deletions artemis-quorum-ri/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
Expand All @@ -59,6 +65,10 @@
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.MutableLong;
import org.apache.activemq.artemis.quorum.UnavailableStateException;

/**
* This is an implementation suitable to be used just on unit tests and it won't attempt
Expand Down Expand Up @@ -127,8 +129,55 @@ public DistributedLock getDistributedLock(String lockId) throws ExecutionExcepti
}

@Override
public MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException {
// TODO
return null;
public MutableLong getMutableLong(final String mutableLongId) throws ExecutionException {
// use a lock file - but with a prefix
final FileDistributedLock fileDistributedLock = (FileDistributedLock) getDistributedLock("ML:" + mutableLongId);
return new MutableLong() {
@Override
public String getMutableLongId() {
return mutableLongId;
}

@Override
public long get() throws UnavailableStateException {
try {
return readLong(fileDistributedLock);
} catch (IOException e) {
throw new UnavailableStateException(e);
}
}

@Override
public void set(long value) throws UnavailableStateException {
try {
writeLong(fileDistributedLock, value);
} catch (IOException e) {
throw new UnavailableStateException(e);
}
}

@Override
public void close() {
fileDistributedLock.close();
}
};
}

private void writeLong(FileDistributedLock fileDistributedLock, long value) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
buffer.putLong(value);
buffer.flip();
if (fileDistributedLock.getChannel().position(0).write(buffer) == Long.BYTES) {
fileDistributedLock.getChannel().force(false);
}
}

private long readLong(FileDistributedLock fileDistributedLock) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
if (fileDistributedLock.getChannel().position(0).read(buffer, 0) != Long.BYTES) {
return 0;
}
buffer.flip();
return buffer.getLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,8 @@ public void close(boolean useCallback) {
public void close() {
close(true);
}

public FileChannel getChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.quorum.zookeeper;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -54,7 +55,7 @@ final class CuratorDistributedLock implements DistributedLock {
this.leaseVersion = null;
}

protected void onReconnected() {
void onReconnected() {
synchronized (manager) {
if (closed || unavailable) {
return;
Expand All @@ -73,7 +74,7 @@ protected void onReconnected() {
}
}

protected void onLost() {
void onLost() {
synchronized (manager) {
if (closed || unavailable) {
return;
Expand All @@ -87,12 +88,7 @@ protected void onLost() {
}
}

protected void onSuspended() {
synchronized (manager) {
if (closed || unavailable) {
return;
}
}
void onSuspended() {
}

@Override
Expand Down Expand Up @@ -138,7 +134,7 @@ public boolean tryLock() throws UnavailableStateException, InterruptedException
throw new UnavailableStateException(lockId + " lock state isn't available");
}
try {
final byte[] leaseVersion = UUID.randomUUID().toString().getBytes();
final byte[] leaseVersion = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
ipcSem.setNodeData(leaseVersion);
lease = ipcSem.acquire(0, TimeUnit.NANOSECONDS);
if (lease == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

final class CuratorMutableLong implements MutableLong {

// this is used to prevent deadlocks on close
private final CuratorDistributedPrimitiveManager manager;
private final String id;
private final DistributedAtomicLong atomicLong;
Expand All @@ -45,15 +44,16 @@ final class CuratorMutableLong implements MutableLong {
this.unavailable = false;
}

protected void onReconnected() {
void onReconnected() {
synchronized (manager) {
if (closed || unavailable) {
if (closed) {
return;
}
unavailable = false;
}
}

protected void onLost() {
void onLost() {
synchronized (manager) {
if (closed || unavailable) {
return;
Expand All @@ -62,12 +62,7 @@ protected void onLost() {
}
}

protected void onSuspended() {
synchronized (manager) {
if (closed || unavailable) {
return;
}
}
void onSuspended() {
}

private void checkNotClosed() {
Expand Down Expand Up @@ -131,9 +126,6 @@ public void close(boolean useCallback) {
if (useCallback) {
onClose.accept(this);
}
if (unavailable) {
return;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public void unlockedLockIsVisibleByDifferentManagers() throws ExecutionException
ownerManager.getDistributedLock("a").unlock();
Assert.assertFalse(observerManager.getDistributedLock("a").isHeldByCaller());
Assert.assertFalse(ownerManager.getDistributedLock("a").isHeldByCaller());
Assert.assertTrue(observerManager.getDistributedLock("a").tryLock());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public void setupEnv() throws Throwable {
}
testingServer = new TestingCluster(clusterSpecs);
testingServer.start();
// start waits for quorumPeer!=null but not that it has started...
Wait.waitFor(this::ensembleHasLeader);
connectString = testingServer.getConnectString();
super.setupEnv();
}
Expand Down Expand Up @@ -307,6 +309,10 @@ public void beNotifiedOfUnavailabilityWhileBlockedOnTimedLock() throws Exception
Assert.assertEquals(TRUE, unavailableTimedLock.get());
}

private boolean ensembleHasLeader() {
return testingServer.getServers().stream().filter(CuratorDistributedLockTest::isLeader).count() != 0;
}

private static boolean isLeader(TestingZooKeeperServer server) {
long leaderId = server.getQuorumPeer().getLeaderId();
long id = server.getQuorumPeer().getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4204,6 +4204,17 @@ public String getNodeID() {
return server.getNodeID() == null ? null : server.getNodeID().toString();
}

@Override
public long getActivationSequence() {
if (AuditLogger.isEnabled()) {
AuditLogger.getActivationSequence(this.server);
}
if (server.getNodeManager() != null) {
return server.getNodeManager().getNodeActivationSequence();
}
return 0;
}

@Override
public String getManagementNotificationAddress() {
if (AuditLogger.isEnabled()) {
Expand Down

0 comments on commit 0f149b6

Please sign in to comment.