Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,27 @@ public boolean isAutoStartSparder() {
return Boolean.parseBoolean(this.getOptional("kylin.query.auto-sparder-context", "false"));
}

/**
* Sparder is considered unavailable when the check task is unresponsive for more than this time
*/
public int getSparderCanaryErrorResponseMs() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can see any short but clear comments here for Kylin user.

return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-error-response-ms", "3000"));
}

/**
* The maximum number of restart sparder when sparder is not available
*/
public int getThresholdToRestartSparder() {
return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-threshold-to-restart-spark", "3"));
}

/**
* Time period between two sparder health checks
*/
public int getSparderCanaryPeriodMinutes() {
return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
}

// ============================================================================
// Spark with Kerberos
// ============================================================================
Expand Down
7 changes: 7 additions & 0 deletions kylin-spark-project/kylin-spark-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-engine</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.query.monitor;

import org.apache.kylin.common.KylinConfig;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.KylinSparkEnv;
import org.apache.spark.sql.SparderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SparderContextCanary {
private static final Logger logger = LoggerFactory.getLogger(SparderContextCanary.class);
private static volatile boolean isStarted = false;

private static final int THRESHOLD_TO_RESTART_SPARK = KylinConfig.getInstanceFromEnv().getThresholdToRestartSparder();
private static final int PERIOD_MINUTES = KylinConfig.getInstanceFromEnv().getSparderCanaryPeriodMinutes();

private static volatile int errorAccumulated = 0;
private static volatile long lastResponseTime = -1;
private static volatile boolean sparderRestarting = false;

private SparderContextCanary() {
}

public static int getErrorAccumulated() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @SuppressWarnings("unused") to suppress warning in IDE, or remove this method.

return errorAccumulated;
}

@SuppressWarnings("unused")
public long getLastResponseTime() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here~

return lastResponseTime;
}

@SuppressWarnings("unused")
public boolean isSparderRestarting() {
return sparderRestarting;
}

public static void init() {
if (!isStarted) {
synchronized (SparderContextCanary.class) {
if (!isStarted) {
isStarted = true;
logger.info("Start monitoring Sparder");
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(SparderContextCanary::monitor,
PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
}
}
}
}

public static boolean isError() {
return errorAccumulated >= THRESHOLD_TO_RESTART_SPARK;
}

public static void monitor() {
try {
long startTime = System.currentTimeMillis();
// check sparder context
if (!SparderContext.isSparkAvailable()) {
logger.info("Sparder is unavailable, need to restart immediately.");
errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
} else {
try {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparderContext.getSparkSession().sparkContext());
jsc.setLocalProperty("spark.scheduler.pool", "vip_tasks");

long t = System.currentTimeMillis();
long ret = numberCount(jsc).get(KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(),
TimeUnit.MILLISECONDS);
logger.info("SparderContextCanary numberCount returned successfully with value {}, takes {} ms.", ret,
(System.currentTimeMillis() - t));
// reset errorAccumulated once good context is confirmed
errorAccumulated = 0;
} catch (TimeoutException te) {
errorAccumulated++;
logger.error("SparderContextCanary numberCount timeout, didn't return in {} ms, error {} times.",
KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), errorAccumulated);
} catch (ExecutionException ee) {
logger.error("SparderContextCanary numberCount occurs exception, need to restart immediately.", ee);
errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
} catch (Exception e) {
errorAccumulated++;
logger.error("SparderContextCanary numberCount occurs exception.", e);
}
}

lastResponseTime = System.currentTimeMillis() - startTime;
logger.debug("Sparder context errorAccumulated:{}", errorAccumulated);

if (isError()) {
sparderRestarting = true;
try {
// Take repair action if error accumulated exceeds threshold
logger.warn("Repairing sparder context");
if ("true".equals(System.getProperty("spark.local"))) {
SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
} else {
SparderContext.restartSpark();
}
} catch (Throwable th) {
logger.error("Restart sparder context failed.", th);
}
sparderRestarting = false;
}
} catch (Throwable th) {
logger.error("Error when monitoring Sparder.", th);
}
}

// for canary
private static JavaFutureAction<Long> numberCount(JavaSparkContext jsc) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(i);
}

return jsc.parallelize(list).countAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference

import org.apache.commons.io.FileUtils
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.query.monitor.SparderContextCanary
import org.apache.kylin.spark.classloader.ClassLoaderUtils
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
Expand Down Expand Up @@ -194,6 +195,11 @@ object SparderContext extends Logging {
logInfo("Initializing Spark, waiting for done.")
initializingThread.join()
}

if (System.getProperty("spark.local") ne "true") {
//monitor sparder
SparderContextCanary.init()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.query.monitor;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.TempMetadataBuilder;
import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.spark.sql.KylinSparkEnv;
import org.apache.spark.sql.SparderContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SparderContextCanaryTest extends LocalWithSparkSessionTest {
@Override
@Before
public void setup() throws SchedulerException {
super.setup();
SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
}

@After
public void after() {
super.after();
}

@Test
public void testSparderKilled() {
// first check should be good
Boolean ss = SparderContext.isSparkAvailable();
Assert.assertTrue(SparderContext.isSparkAvailable());

// stop sparder and check again, the sparder context should auto-restart
SparderContext.getSparkSession().stop();
Assert.assertFalse(SparderContext.isSparkAvailable());

SparderContextCanary.monitor();

Assert.assertTrue(SparderContext.isSparkAvailable());

SparderContextCanary.monitor();
Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated());
}

@Test
public void testSparderTimeout() {
// first check should be GOOD
Assert.assertTrue(SparderContext.isSparkAvailable());

// set kylin.canary.sqlcontext-error-response-ms to 1
// And SparkContextCanary numberCount will timeout
Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated());
System.setProperty("kylin.canary.sparder-context-error-response-ms", "1");
SparderContextCanary.monitor();

// errorAccumulated increase
Assert.assertEquals(1, SparderContextCanary.getErrorAccumulated());

// reach threshold to restart spark. Reset errorAccumulated.
SparderContextCanary.monitor();
Assert.assertEquals(2, SparderContextCanary.getErrorAccumulated());
SparderContextCanary.monitor();
Assert.assertEquals(3, SparderContextCanary.getErrorAccumulated());

Assert.assertTrue(SparderContext.isSparkAvailable());

System.clearProperty("kylin.canary.sparder-context-error-response-ms");

}

public void createTestMetadata() {
String tempMetadataDir = TempMetadataBuilder.prepareNLocalTempMetadata();
KylinConfig.setKylinConfigForLocalTest(tempMetadataDir);
getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", "false");
}
}