From bf9d2d1dcf877232c237a77427c28fc7e00060b6 Mon Sep 17 00:00:00 2001 From: rupengwang Date: Fri, 10 Apr 2020 17:11:52 +0800 Subject: [PATCH] #155 fix class loader problem when query --- build/deploy/context.xml | 2 +- build/script/download-tomcat.sh | 31 +---------- .../main/resources/kylin-defaults.properties | 4 ++ .../kylin-spark-classloader/pom.xml | 18 +++++++ .../spark/classloader/ClassLoaderUtils.java | 2 +- .../classloader/DebugTomcatClassLoader.java | 4 +- .../spark/classloader/KylinItClassLoader.java | 12 ++--- .../classloader/KylinItSparkClassLoader.java | 42 +++++++-------- .../spark/classloader/SparkClassLoader.java | 51 +++++++++---------- .../spark/classloader/TomcatClassLoader.java | 30 +++++------ .../org/apache/spark/sql/SparderContext.scala | 26 ++++++++-- 11 files changed, 113 insertions(+), 109 deletions(-) diff --git a/build/deploy/context.xml b/build/deploy/context.xml index 4650d276c7a..46349ab8213 100644 --- a/build/deploy/context.xml +++ b/build/deploy/context.xml @@ -33,6 +33,6 @@ --> - + diff --git a/build/script/download-tomcat.sh b/build/script/download-tomcat.sh index 205373feea2..bc93a594ffb 100755 --- a/build/script/download-tomcat.sh +++ b/build/script/download-tomcat.sh @@ -72,33 +72,6 @@ fi echo "version ${version}" export version -cp tomcat-ext/target/kylin-tomcat-ext-${version}.jar build/tomcat/lib/kylin-tomcat-ext-${version}.jar -chmod 644 build/tomcat/lib/kylin-tomcat-ext-${version}.jar +cp kylin-spark-project/kylin-spark-classloader/target/kylin-spark-classloader-${version}.jar build/tomcat/lib/kylin-spark-classloader-${version}.jar +chmod 644 build/tomcat/lib/kylin-spark-classloader-${version}.jar -# add ROOT application -mkdir -p build/tomcat/webapps/ROOT/WEB-INF/ -cat > build/tomcat/webapps/ROOT/index.html < - - - - -EOL - -cat > build/tomcat/webapps/ROOT/WEB-INF/web.xml < - - - ROOT - - - /index.html - - - - -EOL diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 4341e68a103..9cfb8f82dc9 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -342,6 +342,10 @@ kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false kylin.engine.spark-conf-mergedict.spark.executor.memory=6G kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2 +### Spark conf overwrite for query engine +kylin.query.spark-conf.spark.executor.cores=5 +kylin.query.spark-conf.spark.executor.instances=4 + # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime #kylin.engine.spark-conf.spark.yarn.archive=hdfs://namenode:8020/kylin/spark/spark-libs.jar #kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec diff --git a/kylin-spark-project/kylin-spark-classloader/pom.xml b/kylin-spark-project/kylin-spark-classloader/pom.xml index 9d1bec9452e..ebf69a62e56 100644 --- a/kylin-spark-project/kylin-spark-classloader/pom.xml +++ b/kylin-spark-project/kylin-spark-classloader/pom.xml @@ -1,4 +1,22 @@ + + diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java index 34abc54e695..d544059f838 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/DebugTomcatClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/DebugTomcatClassLoader.java index b1421757f97..8e6de5a62cd 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/DebugTomcatClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/DebugTomcatClassLoader.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -101,7 +101,7 @@ public Class loadClass(String name, boolean resolve) throws ClassNotFoundExce throw new ClassNotFoundException(); } - if (name.startsWith("io.kyligence.kap.ext")) { + if (name.startsWith("org.apache.kylin.spark.classloader")) { return parent.loadClass(name); } if (sparkClassLoader.classNeedPreempt(name)) { diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItClassLoader.java index abe8f404999..c156ba91e9e 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItClassLoader.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kylin.spark.classloader; import java.io.File; @@ -34,7 +35,7 @@ public class KylinItClassLoader extends URLClassLoader { "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.", // logging "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "sun", "org.apache.catalina", - "org.apache.tomcat",}; + "org.apache.tomcat", }; private static final String[] THIS_CL_PRECEDENT_CLASS = new String[] {"io.kyligence", "org.apache.kylin", "org.apache.calcite"}; private static final String[] CODE_GEN_CLASS = new String[] {"org.apache.spark.sql.catalyst.expressions.Object"}; @@ -78,9 +79,9 @@ public void init() { e.printStackTrace(); } } - String spark_home = System.getenv("SPARK_HOME"); + String sparkHome = System.getenv("SPARK_HOME"); try { - File sparkJar = findFile(spark_home + "/jars", "spark-yarn_.*.jar"); + File sparkJar = findFile(sparkHome + "/jars", "spark-yarn_.*.jar"); addURL(sparkJar.toURI().toURL()); addURL(new File("../examples/test_case_data/sandbox").toURI().toURL()); } catch (MalformedURLException e) { @@ -94,7 +95,7 @@ public Class loadClass(String name, boolean resolve) throws ClassNotFoundExce if (isCodeGen(name)) { throw new ClassNotFoundException(); } - if (name.startsWith("io.kyligence.kap.ext")) { + if (name.startsWith("org.apache.kylin.spark.classloader")) { return parent.loadClass(name); } if (isThisCLPrecedent(name)) { @@ -124,7 +125,6 @@ public Class loadClass(String name, boolean resolve) throws ClassNotFoundExce return clasz; } } - //交换位置 为了让codehua 被父类加载 if (isParentCLPrecedent(name)) { logger.debug("Skipping exempt class " + name + " - delegating directly to parent"); return parent.loadClass(name); diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItSparkClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItSparkClassLoader.java index 7e15f3385aa..d8bb5a68838 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItSparkClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/KylinItSparkClassLoader.java @@ -1,25 +1,19 @@ /* - * Copyright (C) 2016 Kyligence Inc. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://kyligence.io + * http://www.apache.org/licenses/LICENSE-2.0 * - * This software is the confidential and proprietary information of - * Kyligence Inc. ("Confidential Information"). You shall not disclose - * such Confidential Information and shall use it only in accordance - * with the terms of the license agreement you entered into with - * Kyligence Inc. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kylin.spark.classloader; @@ -73,15 +67,15 @@ protected KylinItSparkClassLoader(ClassLoader parent) throws IOException { } public void init() throws MalformedURLException { - String spark_home = System.getenv("SPARK_HOME"); - if (spark_home == null) { - spark_home = System.getProperty("SPARK_HOME"); - if (spark_home == null) { + String sparkHome = System.getenv("SPARK_HOME"); + if (sparkHome == null) { + sparkHome = System.getProperty("SPARK_HOME"); + if (sparkHome == null) { throw new RuntimeException( "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); } } - File file = new File(spark_home + "/jars"); + File file = new File(sparkHome + "/jars"); File[] jars = file.listFiles(); for (File jar : jars) { addURL(jar.toURI().toURL()); diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java index be69bd64aef..b105a26f244 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,23 +18,20 @@ package org.apache.kylin.spark.classloader; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; -import java.nio.file.Files; -import java.nio.file.Paths; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.HashSet; import java.util.Set; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.kylin.spark.classloader.ClassLoaderUtils.findFile; public class SparkClassLoader extends URLClassLoader { //preempt these classes from parent @@ -61,25 +58,25 @@ public class SparkClassLoader extends URLClassLoader { private static Logger logger = LoggerFactory.getLogger(SparkClassLoader.class); static { - String sparkclassloader_spark_cl_preempt_classes = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES"); - if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_classes)) { - SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloader_spark_cl_preempt_classes, ","); + String sparkClassLoaderSparkClPreemptClasses = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES"); + if (!StringUtils.isEmpty(sparkClassLoaderSparkClPreemptClasses)) { + SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkClassLoaderSparkClPreemptClasses, ","); } - String sparkclassloader_spark_cl_preempt_files = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES"); - if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_files)) { - SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloader_spark_cl_preempt_files, ","); + String sparkClassLoaderSparkClPreemptFiles = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES"); + if (!StringUtils.isEmpty(sparkClassLoaderSparkClPreemptFiles)) { + SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkClassLoaderSparkClPreemptFiles, ","); } - String sparkclassloader_this_cl_precedent_classes = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES"); - if (!StringUtils.isEmpty(sparkclassloader_this_cl_precedent_classes)) { - THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_this_cl_precedent_classes, ","); + String sparkClassLoaderThisClPrecedentClasses = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES"); + if (!StringUtils.isEmpty(sparkClassLoaderThisClPrecedentClasses)) { + THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkClassLoaderThisClPrecedentClasses, ","); } - String sparkclassloader_parent_cl_precedent_classes = System + String sparkClassLoaderParentClPrecedentClasses = System .getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES"); - if (!StringUtils.isEmpty(sparkclassloader_parent_cl_precedent_classes)) { - PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_parent_cl_precedent_classes, ","); + if (!StringUtils.isEmpty(sparkClassLoaderParentClPrecedentClasses)) { + PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkClassLoaderParentClPrecedentClasses, ","); } try { @@ -111,20 +108,20 @@ protected SparkClassLoader(ClassLoader parent) throws IOException { } public void init() throws MalformedURLException { - String spark_home = System.getenv("SPARK_HOME"); - if (spark_home == null) { - spark_home = System.getProperty("SPARK_HOME"); - if (spark_home == null) { + String sparkHome = System.getenv("SPARK_HOME"); + if (sparkHome == null) { + sparkHome = System.getProperty("SPARK_HOME"); + if (sparkHome == null) { throw new RuntimeException( "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); } } - File file = new File(spark_home + "/jars"); + File file = new File(sparkHome + "/jars"); File[] jars = file.listFiles(); for (File jar : jars) { addURL(jar.toURI().toURL()); } - if (System.getenv("KYLIN_HOME") != null) { + /*if (System.getenv("KYLIN_HOME") != null) { // for prod String kylin_home = System.getenv("KYLIN_HOME"); File sparkJar = findFile(kylin_home + "/lib", "kylin-udf-.*-SNAPSHOT.jar"); @@ -139,7 +136,7 @@ public void init() throws MalformedURLException { // for debugtomcat logger.info("Add kylin UDF classes to spark classloader"); addURL(new File("../udf/target/classes").toURI().toURL()); - } + }*/ } diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java index 4ad51a94c4a..f826356224e 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -47,21 +47,21 @@ public class TomcatClassLoader extends ParallelWebappClassLoader { private static final Set wontFindClasses = new HashSet<>(); static { - String tomcatclassloader_parent_cl_precedent_classes = System + String tomcatClassLoaderParentClPrecedentClasses = System .getenv("TOMCATCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES"); - if (!StringUtils.isEmpty(tomcatclassloader_parent_cl_precedent_classes)) { - PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_parent_cl_precedent_classes, ","); + if (!StringUtils.isEmpty(tomcatClassLoaderParentClPrecedentClasses)) { + PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatClassLoaderParentClPrecedentClasses, ","); } - String tomcatclassloader_this_cl_precedent_classes = System + String tomcatClassLoaderThisClPrecedentClasses = System .getenv("TOMCATCLASSLOADER_THIS_CL_PRECEDENT_CLASSES"); - if (!StringUtils.isEmpty(tomcatclassloader_this_cl_precedent_classes)) { - THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_this_cl_precedent_classes, ","); + if (!StringUtils.isEmpty(tomcatClassLoaderThisClPrecedentClasses)) { + THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatClassLoaderThisClPrecedentClasses, ","); } - String tomcatclassloader_codegen_classes = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES"); - if (!StringUtils.isEmpty(tomcatclassloader_codegen_classes)) { - CODEGEN_CLASSES = StringUtils.split(tomcatclassloader_codegen_classes, ","); + String tomcatClassLoaderCodegenClasses = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES"); + if (!StringUtils.isEmpty(tomcatClassLoaderCodegenClasses)) { + CODEGEN_CLASSES = StringUtils.split(tomcatClassLoaderCodegenClasses, ","); } wontFindClasses.add("Class"); @@ -98,19 +98,19 @@ public TomcatClassLoader(ClassLoader parent) throws IOException { } public void init() { - String spark_home = System.getenv("SPARK_HOME"); - if (spark_home == null || spark_home.isEmpty()) { + String sparkHome = System.getenv("SPARK_HOME"); + if (sparkHome == null || sparkHome.isEmpty()) { throw new RuntimeException("Error found spark home."); } try { // SparkContext use spi to match deploy mode // otherwise SparkContext init fail ,can not find yarn deploy mode - File yarnJar = findFile(spark_home + "/jars", "spark-yarn.*.jar"); + File yarnJar = findFile(sparkHome + "/jars", "spark-yarn.*.jar"); addURL(yarnJar.toURI().toURL()); // jersey in spark will attempt find @Path class file in current classloader. // Not possible to delegate to spark loader // otherwise spark web ui executors tab can not render - File coreJar = findFile(spark_home + "/jars", "spark-core.*.jar"); + File coreJar = findFile(sparkHome + "/jars", "spark-core.*.jar"); addURL(coreJar.toURI().toURL()); } catch (MalformedURLException e) { e.printStackTrace(); @@ -130,7 +130,7 @@ public Class loadClass(String name, boolean resolve) throws ClassNotFoundExce throw new ClassNotFoundException(); } // class loaders should conform to global's - if (name.startsWith("io.kyligence.kap.ext")) { + if (name.startsWith("org.apache.kylin.spark.classloader")) { return parent.loadClass(name); } // if spark CL needs preempt diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index 7170dba4671..579691f3123 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -18,12 +18,15 @@ package org.apache.spark.sql +import java.io.File import java.lang.{Boolean => JBoolean, String => JString} +import java.nio.file.Paths import org.apache.kylin.query.runtime.plans.QueryToExecutionIDCache import org.apache.spark.memory.MonitorEnv import org.apache.spark.util.Utils import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.KylinSession._ import org.apache.kylin.query.UdfManager import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin @@ -31,9 +34,14 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import java.util.concurrent.atomic.AtomicReference +import org.apache.hadoop.security.UserGroupInformation +import org.apache.kylin.common.KylinConfig +import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.{SparkConf, SparkContext, SparkEnv} import org.apache.spark.sql.execution.datasource.KylinSourceStrategy +import scala.collection.JavaConverters._ + // scalastyle:off object SparderContext extends Logging { @volatile @@ -90,7 +98,7 @@ object SparderContext extends Logging { } def getTotalCore: Int = { - val sparkConf = getSparkSession.sparkContext.getConf + val sparkConf = initSparkConf(getSparkSession.sparkContext.getConf) if (sparkConf.get("spark.master").startsWith("local")) { return 1 } @@ -130,8 +138,6 @@ object SparderContext extends Logging { SparkSession.builder .appName("sparder-sql-context") .master("yarn-client") - //if user defined other master in kylin.properties, - // it will get overwrite later in org.apache.spark.sql.KylinSession.KylinBuilder.initSparkConf .withExtensions { ext => ext.injectPlannerStrategy(_ => KylinSourceStrategy) } @@ -170,6 +176,18 @@ object SparderContext extends Logging { } } + private lazy val conf: KylinConfig = KylinConfig.getInstanceFromEnv + + def initSparkConf(sparkConf: SparkConf): SparkConf = { + //add spark configuration from kylin.properties + conf.getSparkConf.asScala.foreach { + case (k, v) => + sparkConf.set(k, v) + } + sparkConf + } + + def registerListener(sc: SparkContext): Unit = { val sparkListener = new SparkListener { @@ -212,7 +230,7 @@ object SparderContext extends Logging { def withClassLoad[T](body: => T): T = { // val originClassLoad = Thread.currentThread().getContextClassLoader // fixme aron - // Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader) + Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader) val t = body // Thread.currentThread().setContextClassLoader(originClassLoad) t