Skip to content
Permalink
Browse files
Merge branch 'gerrit/stabilization-3b6982ce7f'
Change-Id: I2ea64b20d39dc9cb4e14f350424f0e4b2e038e2c
  • Loading branch information
mblow committed Jun 1, 2021
2 parents 276b65c + 7332722 commit b4f8ddd942d5d3178ed51738bf7b30bd46f4d6f5
Show file tree
Hide file tree
Showing 21 changed files with 78 additions and 102 deletions.
@@ -36,7 +36,6 @@
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -46,6 +45,7 @@
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.server.HttpServer;
@@ -434,6 +434,10 @@ protected synchronized void doStart(MetadataProvider metadataProvider) throws Hy
}
}

protected synchronized void doRecover(MetadataProvider metadataProvider) throws HyracksDataException {
doStart(metadataProvider);
}

private void cancelJob(Throwable th) {
cancelJobSafely(metadataProvider, th);
final WaitForStateSubscriber cancelSubscriber =
@@ -125,7 +125,7 @@ protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, Interr
try {
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
listener.setState(ActivityState.RECOVERING);
listener.doStart(metadataProvider);
listener.doRecover(metadataProvider);
}
LOGGER.log(level, "Recovery completed successfully");
return null;
@@ -141,7 +141,7 @@ protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, Interr
releaseRecoveryLocks(metadataProvider);
}
} while (policy.retry(failure));
// Recovery task is essntially over now either through failure or through cancellation(stop)
// Recovery task is essentially over now either through failure or through cancellation(stop)
synchronized (listener) {
listener.notifyAll();
if (listener.getState() != ActivityState.TEMPORARILY_FAILED
@@ -76,7 +76,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.exceptions.WarningCollector;
@@ -3580,7 +3579,7 @@ private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFla
printer.print(jobId);
}
} catch (Exception e) {
if (ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
if (org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
}
@@ -34,7 +34,6 @@
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.JobUtils;
@@ -182,7 +181,7 @@ private static void runWithRetryAfterInterrupt(Work work) throws Exception {
work.run();
done = true;
} catch (Exception e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
Throwable rootCause = org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e);
if (rootCause instanceof java.lang.InterruptedException) {
interruptedException = (InterruptedException) rootCause;
// clear the interrupted state from the thread
@@ -33,7 +33,6 @@
import java.util.concurrent.Future;
import java.util.function.Predicate;

import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -43,6 +42,7 @@
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.junit.Assert;

public class CancellationTestExecutor extends TestExecutor {
@@ -29,7 +29,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.asterix.testframework.xml.ComparisonEnum;
@@ -38,6 +37,7 @@
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.logging.log4j.Level;
import org.junit.Assert;

@@ -20,9 +20,9 @@

import java.util.Collection;

import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.test.common.CancellationTestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -31,7 +31,6 @@
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ILogManager;
@@ -113,7 +112,7 @@ public void interruptedLogPageSwitch() throws Exception {
logManager.log(logRecord);
}
} catch (ACIDException e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
Throwable rootCause = org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e);
if (rootCause instanceof java.lang.InterruptedException) {
interrupted.set(true);
}
@@ -55,5 +55,10 @@ public void close() throws IOException {
public void fail(Throwable th) {
// No Op
}

@Override
public void open() throws HyracksDataException {
// No Op
}
}
}

This file was deleted.

@@ -26,7 +26,6 @@
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
@@ -129,7 +128,7 @@ public boolean handleException(Throwable th) {
if (in == null) {
return false;
}
Throwable root = ExceptionUtils.getRootCause(th);
Throwable root = org.apache.hyracks.api.util.ExceptionUtils.getRootCause(th);
if (root instanceof HyracksDataException) {
HyracksDataException r = (HyracksDataException) root;
boolean advance = false;
@@ -228,6 +228,7 @@ public void disableBlockingOperator() {
blockingOperatorDisabled = true;
}

@Override
public boolean isBlockingOperatorDisabled() {
return blockingOperatorDisabled;
}
@@ -32,7 +32,7 @@ public class MetadataConstants {
public static final int METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8 = 251;
public static final int DATAVERSE_NAME_TOTAL_LENGTH_LIMIT_UTF8 = METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8 * 4;
public static final Pattern METADATA_OBJECT_NAME_INVALID_CHARS =
Pattern.compile(SystemUtils.IS_OS_WINDOWS ? "[\u0000-\u001F\u007F\"*/:<>\\\\|+,.;=\\[\\]\n]" : "[\u0000/]");
Pattern.compile(SystemUtils.IS_OS_WINDOWS ? "[\u0000-\u001F\u007F\"*/:<>\\\\|+,;=\\[\\]\n]" : "[\u0000/]");

// Name of the dataverse the metadata lives in.
public static final DataverseName METADATA_DATAVERSE_NAME = DataverseName.createBuiltinDataverseName("Metadata");
@@ -191,6 +191,7 @@ public void open() throws HyracksDataException {
PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
}
frameOpCallback.open();
writer.open();
keySearchCmp =
BTreeUtils.getSearchMultiComparator(((ITreeIndex) index).getComparatorFactories(), frameTuple);
@@ -291,7 +291,13 @@ public void close() throws IOException {
public void fail(Throwable th) {
callback.fail(th);
}

@Override
public void open() throws HyracksDataException {
callback.open();
}
};
frameOpCallback.open();
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -217,4 +217,6 @@ public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchem

public Map<String, Object> getConfig();

public boolean isBlockingOperatorDisabled();

}
@@ -44,6 +44,9 @@ public class IntroJoinInsideSubplanRule extends AbstractDecorrelationRule {
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
if (context.getMetadataProvider().isBlockingOperatorDisabled()) {
return false;
}
AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
if (op0.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
return false;
@@ -191,7 +191,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

@Override
public void flush() throws HyracksDataException {
writer.flush();
appender.flush(writer);
}

/**
@@ -23,9 +23,11 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;

import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IFormattedException;
import org.apache.hyracks.util.ThrowingFunction;

import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -158,4 +160,41 @@ public static <K, V> V computeIfAbsent(Map<K, V> map, K key, ThrowingFunction<K,
throw HyracksDataException.create(e.getCause());
}
}

// Gets the error message for the root cause of a given Throwable instance.
public static String getErrorMessage(Throwable th) {
Throwable cause = getRootCause(th);
return cause.getMessage();
}

/**
* Determines whether supplied exception contains a matching cause in its hierarchy, or is itself a match
*/
public static boolean matchingCause(Throwable e, Predicate<Throwable> test) {
Throwable current = e;
Throwable cause = e.getCause();
while (cause != null && cause != current) {
if (test.test(cause)) {
return true;
}
Throwable nextCause = current.getCause();
current = cause;
cause = nextCause;
}
return test.test(e);
}

/**
* Unwraps enclosed exceptions until a non-product exception is found, otherwise returns the root production
* exception
*/
public static Throwable unwrap(Throwable e) {
Throwable current = e;
Throwable cause = e.getCause();
while (cause != current && cause instanceof IFormattedException) {
current = cause;
cause = current.getCause();
}
return current;
}
}
@@ -40,4 +40,9 @@ public interface IFrameOperationCallback extends Closeable {
* @param th
*/
void fail(Throwable th);

/**
* Called when the task has opened for initialization.
*/
void open() throws HyracksDataException;
}

0 comments on commit b4f8ddd

Please sign in to comment.