Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-3340 layered over ARTEMIS-2716 - activation sequence tracking to protect the journal #3646

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eefe3ae
ARTEMIS-2716 Pluggable Quorum API + Curator RI
franz1981 Apr 14, 2020
3fbf1a9
ARTEMIS-2716 Reducing dependencies from activation types
franz1981 Feb 15, 2021
30e1e13
ARTEMIS-2716 Adding new policies and activation
franz1981 Feb 19, 2021
d56d320
ARTEMIS-2716 Adding single-pair smoke-test
franz1981 Apr 27, 2021
cb45546
ARTEMIS-2716 Porting replication integration tests
franz1981 Apr 29, 2021
a64d4ef
ARTEMIS-2716 adding doc
franz1981 May 31, 2021
f8525fc
ARTEMIS-2716 adding EXPERIMENTAL hints for users
franz1981 Jun 29, 2021
169daf3
ARTEMIS-3340 adding mutable fail-fast long
franz1981 Jun 28, 2021
3658d60
ARTEMIS-3340 - implement sequential activation tracking for the pluga…
gtully Jul 7, 2021
e5fb9e6
ARTEMIS-2716 - add peer or multi primary activation test
gtully Jul 15, 2021
02783be
ARTEMIS-2716 - speed up failback backup behaviour by ignoring existin…
gtully Jul 21, 2021
ab9bb5c
ARTEMIS-3340 Implemented reusable curator primitive abstraction (#2)
franz1981 Jul 22, 2021
1be303b
ARTEMIS-2716 - perculate test with send/receive to verify journal, va…
gtully Jul 22, 2021
6638c6c
ARTEMIS-3340 - use coordinated-id in place of peer-node-id for primar…
gtully Jul 23, 2021
ee99c38
ARTEMIS-3340 - use read of activation sequence to detect stale replic…
gtully Jul 23, 2021
70932b6
ARTEMIS-3340 - arrange leases and locks and longs in namespace based …
gtully Jul 28, 2021
c01a7d7
ARTEMIS-3340 Implemented reusable curator primitive abstraction
franz1981 Jul 21, 2021
38e7e42
ARTEMIS-3340 Changed docs + config + activation logic
franz1981 Jul 27, 2021
2941cd7
ARTEMIS-3340 Changed docs + config + activation logic
franz1981 Jul 27, 2021
14fbdb5
ARTEMIS-3340 - update smoke tests to refect removed config
gtully Jul 29, 2021
32c3a1b
ARTEMIS-3340 - check for live no longer configurable, remove test
gtully Jul 29, 2021
2f7930f
ARTEMIS-3340 - rebase with audit logger changes
gtully Jul 29, 2021
c331df4
ARTEMIS-3340 Fixing ensureSequentialAccessToNodeData
franz1981 Jul 29, 2021
c54d6fc
ARTEMIS-3340 fixing in-memory view of activation sequence while self-…
franz1981 Jul 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.io.File;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.airlift.airline.Command;
import io.airlift.airline.Option;
Expand Down Expand Up @@ -71,6 +71,7 @@ public static void setEmbedded(boolean embedded) {
public Object execute(ActionContext context) throws Exception {
super.execute(context);

AtomicReference<Throwable> serverActivationFailed = new AtomicReference<>();
try {
BrokerDTO broker = getBrokerDTO();
ActiveMQSecurityManager securityManager = SecurityManagerFactory.create(broker.security);
Expand Down Expand Up @@ -110,8 +111,7 @@ public void deActivate() {
server = BrokerFactory.createServer(broker.server, securityManager, activateCallback);

server.createComponents();
AtomicBoolean serverActivationFailed = new AtomicBoolean(false);
server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(true));
server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(exception));
server.start();
server.getServer().addExternalComponent(managementContext, false);

Expand All @@ -126,14 +126,16 @@ public void deActivate() {
server.getServer().addExternalComponent(component, true);
assert component.isStarted();
}

if (serverActivationFailed.get()) {
stop();
}
} catch (Throwable t) {
t.printStackTrace();
serverActivationFailed.set(t);
}

if (serverActivationFailed.get() != null) {
stop();
return serverActivationFailed.get();
}

return new Pair<>(managementContext, server.getServer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2843,4 +2843,12 @@ static void isAutoCreated(Object source) {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601747, value = "User {0} is getting auto-created property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void isAutoCreated(String user, Object source, Object... args);

static void getActivationSequence(Object source) {
BASE_LOGGER.getActivationSequence(getCaller(), source);
}

@LogMessage(level = Logger.Level.INFO)
@Message(id = 601748, value = "User {0} is getting activation sequence on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getActivationSequence(String user, Object source, Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ private boolean isExpectedThread(Thread thread) {
} else if (threadName.contains("ObjectCleanerThread")) {
// Required since upgrade to Netty 4.1.22 maybe because https://github.com/netty/netty/commit/739e70398ccb6b11ffa97c6b5f8d55e455a2165e
return true;
} else if (threadName.contains("RMI TCP")) {
return true;
} else if (threadName.contains("RMI Scheduler")) {
return true;
} else if (threadName.contains("RMI RenewClean")) {
return true;
} else if (threadName.contains("Signal Dispatcher")) {
return true;
} else if (threadName.contains("ForkJoinPool.commonPool")) {
return true;
} else if (threadName.contains("GC Daemon")) {
return true;
} else {
for (StackTraceElement element : thread.getStackTrace()) {
if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// the directory to store the journal files in
private static String DEFAULT_JOURNAL_DIR = "data/journal";

// the directory to store the data files in
private static String DEFAULT_DATA_DIR = "data";

// true means that the journal directory will be created
private static boolean DEFAULT_CREATE_JOURNAL_DIR = true;

Expand Down Expand Up @@ -627,6 +630,8 @@ public static String getDefaultHapolicyBackupStrategy() {

public static final String DEFAULT_TEMPORARY_QUEUE_NAMESPACE = "";

private static final String DEFAULT_DISTRIBUTED_PRIMITIVE_MANAGER_CLASS_NAME = "org.apache.activemq.artemis.quorum.zookeeper.CuratorDistributedPrimitiveManager";

// Number of concurrent workers for a core bridge
public static int DEFAULT_BRIDGE_CONCURRENCY = 1;

Expand Down Expand Up @@ -938,6 +943,13 @@ public static String getDefaultJournalDir() {
return DEFAULT_JOURNAL_DIR;
}

/**
* the directory to store the journal files in
*/
public static String getDefaultDataDir() {
return DEFAULT_DATA_DIR;
}

/**
* true means that the journal directory will be created
*/
Expand Down Expand Up @@ -1721,6 +1733,10 @@ public static String getDefaultTemporaryQueueNamespace() {
return DEFAULT_TEMPORARY_QUEUE_NAMESPACE;
}

public static String getDefaultDistributedPrimitiveManagerClassName() {
return DEFAULT_DISTRIBUTED_PRIMITIVE_MANAGER_CLASS_NAME;
}

public static int getDefaultBridgeConcurrency() {
return DEFAULT_BRIDGE_CONCURRENCY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ public interface ActiveMQServerControl {
@Attribute(desc = "Node ID of this server")
String getNodeID();


/**
* Returns the current activation sequence number of this server.
* <br>
* When replicated, peers may coordinate activation with this monotonic sequence
*/
@Attribute(desc = "Activation sequence of this server instance")
long getActivationSequence();

/**
* Returns the management notification address of this server.
* <br>
Expand Down
11 changes: 11 additions & 0 deletions artemis-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,17 @@
<version>${project.version}</version>
<classifier>javadoc</classifier>
</dependency>
<!-- quorum -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-quorum-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-quorum-ri</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
9 changes: 9 additions & 0 deletions artemis-distribution/src/main/assembly/dep.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
<include>org.apache.activemq.rest:artemis-rest</include>
<include>org.apache.qpid:qpid-jms-client</include>
<include>io.micrometer:micrometer-core</include>
<!-- quorum -->
<include>org.apache.activemq:artemis-quorum-api</include>
<include>org.apache.activemq:artemis-quorum-ri</include>

<!-- dependencies -->
<include>jakarta.jms:jakarta.jms-api</include>
Expand Down Expand Up @@ -97,6 +100,12 @@
<include>com.sun.xml.bind:jaxb-impl</include>
<include>jakarta.activation:jakarta.activation-api</include>
<include>jakarta.security.auth.message:jakarta.security.auth.message-api</include>
<!-- quorum -->
<include>org.apache.curator:curator-recipes</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>org.apache.zookeeper:zookeeper</include>
<include>org.apache.zookeeper:zookeeper-jute</include>
</includes>
<!--excludes>
<exclude>org.apache.activemq:artemis-website</exclude>
Expand Down
1 change: 1 addition & 0 deletions artemis-features/src/main/resources/features.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle-->

<bundle>mvn:org.apache.activemq/activemq-artemis-native/${activemq-artemis-native-version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-quorum-api/${pom.version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-server-osgi/${pom.version}</bundle>
</feature>

Expand Down
41 changes: 41 additions & 0 deletions artemis-quorum-api/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>2.18.0-SNAPSHOT</version>
</parent>

<artifactId>artemis-quorum-api</artifactId>
<packaging>bundle</packaging>
<name>ActiveMQ Artemis Quorum API</name>

<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>

<dependencies>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_core</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.quorum;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public interface DistributedLock extends AutoCloseable {

String getLockId();

boolean isHeldByCaller() throws UnavailableStateException;

boolean tryLock() throws UnavailableStateException, InterruptedException;

default boolean tryLock(long timeout, TimeUnit unit) throws UnavailableStateException, InterruptedException {
// it doesn't make sense to be super fast
final long TARGET_FIRE_PERIOD_NS = TimeUnit.MILLISECONDS.toNanos(250);
if (timeout < 0) {
throw new IllegalArgumentException("timeout cannot be negative");
}
Objects.requireNonNull(unit);
if (timeout == 0) {
return tryLock();
}
final Thread currentThread = Thread.currentThread();
final long timeoutNs = unit.toNanos(timeout);
final long start = System.nanoTime();
final long deadline = start + timeoutNs;
long expectedNextFireTime = start;
while (!currentThread.isInterrupted()) {
long parkNs = expectedNextFireTime - System.nanoTime();
while (parkNs > 0) {
LockSupport.parkNanos(parkNs);
if (currentThread.isInterrupted()) {
throw new InterruptedException();
}
final long now = System.nanoTime();
parkNs = expectedNextFireTime - now;
}
if (tryLock()) {
return true;
}
final long now = System.nanoTime();
final long remainingTime = deadline - now;
if (remainingTime <= 0) {
return false;
}
if (remainingTime < TARGET_FIRE_PERIOD_NS) {
expectedNextFireTime = now;
} else {
expectedNextFireTime += TARGET_FIRE_PERIOD_NS;
}
}
throw new InterruptedException();
}

void unlock() throws UnavailableStateException;

void addListener(UnavailableLockListener listener);

void removeListener(UnavailableLockListener listener);

@FunctionalInterface
interface UnavailableLockListener {

void onUnavailableLockEvent();
}

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.quorum;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public interface DistributedPrimitiveManager extends AutoCloseable {

static DistributedPrimitiveManager newInstanceOf(String className, Map<String, String> properties) throws Exception {
return (DistributedPrimitiveManager) Class.forName(className).getDeclaredConstructor(Map.class).newInstance(properties);
}

@FunctionalInterface
interface UnavailableManagerListener {

void onUnavailableManagerEvent();
}

void addUnavailableManagerListener(UnavailableManagerListener listener);

void removeUnavailableManagerListener(UnavailableManagerListener listener);

boolean start(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException;

void start() throws InterruptedException, ExecutionException;

boolean isStarted();

void stop();

DistributedLock getDistributedLock(String lockId) throws InterruptedException, ExecutionException, TimeoutException;

MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException;

@Override
default void close() {
stop();
}
}