Skip to content

Commit

Permalink
ARTEMIS-2046 Fixing issues with JournalStorageManager.stop in replica…
Browse files Browse the repository at this point in the history
…tion, JDBC and shared storage
  • Loading branch information
clebertsuconic committed Aug 21, 2018
1 parent f1dfc72 commit 63e6cd9
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 39 deletions.
Expand Up @@ -249,6 +249,12 @@ public ActiveMQException createException(String msg) {
public ActiveMQException createException(String msg) {
return new ActiveMQNullRefException(msg);
}
},
SHUTDOWN_ERROR(219) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQShutdownException(msg);
}
};

private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
Expand Down
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;

/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQShutdownException extends ActiveMQException {

public ActiveMQShutdownException() {
super(ActiveMQExceptionType.SHUTDOWN_ERROR);
}

public ActiveMQShutdownException(String msg) {
super(ActiveMQExceptionType.SHUTDOWN_ERROR, msg);
}
}
Expand Up @@ -1500,9 +1500,11 @@ public void commit(final Xid xid, final boolean onePhase) throws XAException {

XAException xaException = null;
if (onePhase) {
logger.debug("Throwing oneFase RMFAIL on xid=" + xid, t);
//we must return XA_RMFAIL
xaException = new XAException(XAException.XAER_RMFAIL);
} else {
logger.debug("Throwing twoFase Retry on xid=" + xid, t);
// Any error on commit -> RETRY
// We can't rollback a Prepared TX for definition
xaException = new XAException(XAException.XA_RETRY);
Expand Down Expand Up @@ -1753,7 +1755,7 @@ public void rollback(final Xid xid) throws XAException {
} catch (XAException xae) {
throw xae;
} catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || e.getType() == ActiveMQExceptionType.SHUTDOWN_ERROR) {
// Unblocked on failover
throw new XAException(XAException.XA_RETRY);
}
Expand Down
Expand Up @@ -31,7 +31,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
Expand Down Expand Up @@ -334,19 +336,19 @@ public void run() {
}


private void checkStatus() {
private void checkStatus() throws Exception {
checkStatus(null);
}

private void checkStatus(IOCompletion callback) {
private void checkStatus(IOCompletion callback) throws Exception {
if (!started) {
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
throw new IllegalStateException("JDBCJournal is not loaded");
throw new ActiveMQShutdownException("JDBCJournal is not loaded");
}

if (failed.get()) {
if (callback != null) callback.onError(-1, "JDBC Journal failed");
throw new IllegalStateException("JDBCJournal Failed");
throw new ActiveMQException("JDBCJournal Failed");
}
}

Expand Down Expand Up @@ -388,7 +390,7 @@ private void appendRecord(JDBCJournalRecord record) throws Exception {
if (callback != null) callback.waitCompletion();
}

private synchronized void addTxRecord(JDBCJournalRecord record) {
private synchronized void addTxRecord(JDBCJournalRecord record) throws Exception {

if (logger.isTraceEnabled()) {
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
Expand Down Expand Up @@ -823,6 +824,9 @@ public void run() {
usedFile);
}
result.set(true);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
Expand Down Expand Up @@ -882,7 +886,10 @@ public void run() {
}

result.set(true);
} catch (Exception e) {
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendUpdateRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendUpdateRecord:" + e, e);
Expand Down Expand Up @@ -933,7 +940,10 @@ record = records.remove(id);
record.delete(usedFile);
}
result.set(true);
} catch (Exception e) {
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
} finally {
Expand Down Expand Up @@ -993,7 +1003,7 @@ public void run() {
}

tx.addPositive(usedFile, id, addRecord.getEncodeSize());
} catch (Exception e) {
} catch (Throwable e) {
logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
} finally {
Expand Down Expand Up @@ -1031,9 +1041,9 @@ public void run() {
}
}

private void checkJournalIsLoaded() {
private void checkJournalIsLoaded() throws Exception {
if (state != JournalState.LOADED && state != JournalState.SYNCING) {
throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
}
}

Expand Down Expand Up @@ -1085,7 +1095,7 @@ public void run() {
}

tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
} catch ( Exception e ) {
} catch (Throwable e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition(null, tx, e );
} finally {
Expand Down Expand Up @@ -1132,7 +1142,7 @@ public void run() {
}

tx.addNegative(usedFile, id);
} catch (Exception e) {
} catch (Throwable e) {
logger.error("appendDeleteRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
} finally {
Expand Down Expand Up @@ -1185,7 +1195,10 @@ public void run() {
}

tx.prepare(usedFile);
} catch (Exception e) {
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
setErrorCondition(callback, tx, e);
Expand Down Expand Up @@ -1267,6 +1280,9 @@ public void run() {


tx.commit(usedFile);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendCommitRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendCommitRecord:" + e, e);
Expand Down Expand Up @@ -1317,6 +1333,9 @@ public void run() {
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);

tx.rollback(usedFile);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendRollbackRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendRollbackRecord:" + e, e);
Expand Down Expand Up @@ -2360,10 +2379,10 @@ public synchronized void stop() throws Exception {
return;
}

setJournalState(JournalState.STOPPED);

flush();

setJournalState(JournalState.STOPPED);

if (providedIOThreadPool == null) {
threadPool.shutdown();

Expand Down Expand Up @@ -2681,6 +2700,8 @@ private JournalFile appendRecord(final JournalInternalRecord encoder,
final JournalTransaction tx,
final IOCallback parameterCallback) throws Exception {

checkJournalIsLoaded();

final IOCallback callback;

final int size = encoder.getEncodeSize();
Expand Down
Expand Up @@ -126,8 +126,10 @@
*/
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

private static final int CRITICAL_PATHS = 1;
private static final int CRITICAL_STORE = 0;
protected static final int CRITICAL_PATHS = 3;
protected static final int CRITICAL_STORE = 0;
protected static final int CRITICAL_STOP = 1;
protected static final int CRITICAL_STOP_2 = 2;

private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);

Expand Down Expand Up @@ -405,6 +407,16 @@ public void readUnLock() {
leaveCritical(CRITICAL_STORE);
}

/** for internal use and testsuite, don't use it outside of tests */
public void writeLock() {
storageManagerLock.writeLock().lock();
}

/** for internal use and testsuite, don't use it outside of tests */
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}

@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
readLock();
Expand Down
Expand Up @@ -229,9 +229,21 @@ private void cleanupIncompleteFiles() throws Exception {
}

@Override
public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
try {
enterCritical(CRITICAL_STOP);
synchronized (this) {
if (internalStop(ioCriticalError, sendFailover))
return;
}
} finally {
leaveCritical(CRITICAL_STOP);
}
}

private boolean internalStop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) {
return;
return true;
}

if (!ioCriticalError) {
Expand All @@ -255,30 +267,41 @@ public void run() {
// that's ok
}

// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
// and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) {
try {
token.waitCompletion(5000);
} catch (Exception e) {
// ignore it
enterCritical(CRITICAL_STOP_2);
storageManagerLock.writeLock().lock();
try {

// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
// and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) {
try {
token.waitCompletion(5000);
} catch (Exception e) {
// ignore it
}
}
}
// we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown
// while the backup will never receive then
replicatorInUse.stop(false);
}
replicatorInUse.stop();
}
bindingsJournal.stop();
bindingsJournal.stop();

messageJournal.stop();
messageJournal.stop();

journalLoaded = false;
journalLoaded = false;

started = false;
started = false;
} finally {
storageManagerLock.writeLock().unlock();
leaveCritical(CRITICAL_STOP_2);
}
return false;
}

/**
Expand Down
Expand Up @@ -150,6 +150,20 @@ public synchronized void registerJournal(final byte id, final Journal journal) {
journals[id] = journal;
}

/**
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
*/
public void pause() {
started = false;
}

/**
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
*/
public void resume() {
started = true;
}

@Override
public void handlePacket(final Packet packet) {
if (logger.isTraceEnabled()) {
Expand Down

0 comments on commit 63e6cd9

Please sign in to comment.