diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 93dff1b3677..2cf0817441a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -787,7 +787,7 @@ public int getJobRetry() { } // retry interval in milliseconds - public int getJobRetryInterval(){ + public int getJobRetryInterval() { return Integer.parseInt(getOptional("kylin.job.retry-interval", "30000")); } @@ -873,11 +873,11 @@ public Map getSourceEngines() { return r; } - public boolean enableHiveDdlQuote(){ + public boolean enableHiveDdlQuote() { return Boolean.parseBoolean(getOptional("kylin.source.hive.quote-enabled", TRUE)); } - public String getQuoteCharacter(){ + public String getQuoteCharacter() { return getOptional("kylin.source.quote.character", "`"); } @@ -1386,12 +1386,11 @@ public boolean isUseLocalClasspathEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE)); } - /** * different version hive use different UNION style * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union */ - public String getHiveUnionStyle(){ + public String getHiveUnionStyle() { return getOptional("kylin.hive.union.style", "UNION"); } @@ -1430,7 +1429,7 @@ public boolean isSparkSanityCheckEnabled() { public boolean isSparkFactDistinctEnable() { return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false")); } - + // ============================================================================ // ENGINE.LIVY // ============================================================================ @@ -2192,4 +2191,24 @@ public boolean isStreamingStandAloneMode() { public String getLocalStorageImpl() { return getOptional("kylin.stream.settled.storage", null); } + + public int getWarningSegmentNum() { + return Integer.parseInt(getOptional("kylin.tool.health-check.warning-segment-num", "-1")); + } + + public int getWarningCubeExpansionRate() { + return Integer.parseInt(getOptional("kylin.tool.health-check.warning-cube-expansion-rate", "5")); + } + + public int getExpansionCheckMinCubeSizeInGb() { + return Integer.parseInt(getOptional("kylin.tool.health-check.expansion-check.min-cube-size-gb", "500")); + } + + public int getStaleCubeThresholdInDays() { + return Integer.parseInt(getOptional("kylin.tool.health-check.stale-cube-threshold-days", "100")); + } + + public int getStaleJobThresholdInDays() { + return Integer.parseInt(getOptional("kylin.tool.health-check.stale-job-threshold-days", "30")); + } } diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java index 02a1fa3576f..2cf734e12ca 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java @@ -18,6 +18,9 @@ package org.apache.kylin.common.util; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; + /** * A Logger that remembers all the logged message. * @@ -41,6 +44,12 @@ public void log(String message) { } } + @Override + public void log(String message, Object... arguments) { + FormattingTuple ft = MessageFormatter.arrayFormat(message, arguments); + log(ft.getMessage()); + } + public String getBufferedLog() { return buffer.toString(); } diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Logger.java b/core-common/src/main/java/org/apache/kylin/common/util/Logger.java index 323da88eebd..eafb9694eae 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Logger.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Logger.java @@ -24,4 +24,6 @@ */ public interface Logger { public void log(String message); + + public void log(String message, Object... var2); } diff --git a/core-common/src/main/java/org/apache/kylin/common/util/SoutLogger.java b/core-common/src/main/java/org/apache/kylin/common/util/SoutLogger.java index 0ed03a79e1a..09c0f49b3f2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/SoutLogger.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/SoutLogger.java @@ -17,6 +17,9 @@ */ package org.apache.kylin.common.util; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; + /** */ public class SoutLogger implements Logger { @@ -25,4 +28,10 @@ public class SoutLogger implements Logger { public void log(String message) { System.out.println(message); } + + @Override + public void log(String message, Object... arguments) { + FormattingTuple ft = MessageFormatter.arrayFormat(message, arguments); + log(ft.getMessage()); + } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index db9c0950af6..474c9739d5e 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -220,6 +220,10 @@ public CubeInstance getCubeByUuid(String uuid) { } } + public List getErrorCubes() { + return crud.getLoadFailedEntities(); + } + /** * Get related Cubes by cubedesc name. By default, the desc name will be * translated into upper case. diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java index 46024ca6f16..bb1b374715e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java @@ -23,6 +23,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.kylin.common.persistence.JsonSerializer; @@ -48,6 +49,7 @@ abstract public class CachedCrudAssist { final private SingleValueCache cache; private boolean checkCopyOnWrite; + final private List loadErrors = new ArrayList<>(); public CachedCrudAssist(ResourceStore store, String resourceRootPath, Class entityType, SingleValueCache cache) { @@ -118,6 +120,7 @@ public void reloadAll() throws IOException { logger.debug("Reloading " + entityType.getSimpleName() + " from " + store.getReadableResourcePath(resRootPath)); cache.clear(); + loadErrors.clear(); List paths = store.collectResourceRecursively(resRootPath, resPathSuffix); for (String path : paths) { @@ -125,7 +128,7 @@ public void reloadAll() throws IOException { } logger.debug("Loaded " + cache.size() + " " + entityType.getSimpleName() + "(s) out of " + paths.size() - + " resource"); + + " resource with " + loadErrors.size() + " errors"); } public T reload(String resourceName) { @@ -141,10 +144,15 @@ private T reloadQuietlyAt(String path) { return reloadAt(path); } catch (Exception ex) { logger.error("Error loading " + entityType.getSimpleName() + " at " + path, ex); + loadErrors.add(path); return null; } } + public List getLoadFailedEntities() { + return loadErrors; + } + public T reloadAt(String path) { try { T entity = store.getResource(path, serializer); diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java index c1ffbf794ad..dfef15047c9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java @@ -106,7 +106,7 @@ private class DataModelSyncListener extends Broadcaster.Listener { public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { //clean up the current project's table desc TableMetadataManager.getInstance(config).resetProjectSpecificTableDesc(project); - + logger.info("Update models in project: " + project); try (AutoLock lock = modelMapLock.lockForWrite()) { for (String model : ProjectManager.getInstance(config).getProject(project).getModels()) { crud.reloadQuietly(model); @@ -130,6 +130,10 @@ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, } } + public List getErrorModels() { + return crud.getLoadFailedEntities(); + } + private Class getDataModelImplClass() { try { String cls = StringUtil.noBlank(config.getDataModelImpl(), DataModelDesc.class.getName()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java new file mode 100644 index 00000000000..ecca373176f --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.job; + +import com.google.common.collect.Lists; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.MailService; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.execution.CheckpointExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Locale; + +public class KylinHealthCheckJob extends AbstractApplication { + private static final Logger logger = LoggerFactory.getLogger(KylinHealthCheckJob.class); + + @SuppressWarnings("static-access") + private static final Option OPTION_FIX = OptionBuilder.withArgName("fix").hasArg().isRequired(false) + .withDescription("Fix the unhealth cube").create("fix"); + + public static void main(String[] args) throws Exception { + new KylinHealthCheckJob().execute(args); + } + + final KylinConfig config; + final BufferedLogger reporter = new BufferedLogger(logger); + final CubeManager cubeManager; + + public KylinHealthCheckJob() { + this(KylinConfig.getInstanceFromEnv()); + } + + public KylinHealthCheckJob(KylinConfig config) { + this.config = config; + this.cubeManager = CubeManager.getInstance(config); + } + + @Override + protected Options getOptions() { + Options options = new Options(); + // TODO: Support to fix the unhealthy cube automatically + options.addOption(OPTION_FIX); + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); + checkCubeHealth(); + } + + private void checkCubeHealth() throws Exception { + CubeManager cubeManager = CubeManager.getInstance(config); + + List cubes = cubeManager.listAllCubes(); + checkErrorMeta(); + + // Check if the cubeid data exist for later cube merge + checkSegmentHDFSPath(cubes); + + // Check if the hbase table exits or online + checkHBaseTables(cubes); + + // Check if there are holes in cube + // TODO: check if there are overlaps in segments of cube + checkCubeHoles(cubes); + + // Check if there are too many segments + checkTooManySegments(cubes); + + // Check if there are stale metadata + checkStaleSegments(cubes); + + // Disable/Delete the out-of-date cube + checkOutOfDateCube(cubes); + + // Check data expand rate + checkDataExpansionRate(cubes); + + // Check auto merge param + checkCubeDescParams(cubes); + + // ERROR history stopped build job + checkStoppedJob(); + + sendMail(reporter.getBufferedLog()); + } + + private void sendMail(String content) { + logger.info("Send Kylin cluster report"); + String subject = "Kylin Cluster Health Resport of " + config.getClusterName() + " on " + + new SimpleDateFormat("yyyy-MM-dd", Locale.ROOT).format(new Date()); + List users = Lists.newArrayList(config.getAdminDls()); + new MailService(config).sendMail(users, subject, content, false); + } + + private void checkErrorMeta() { + reporter.log("## Checking metadata"); + + CubeManager cubeManager = CubeManager.getInstance(config); + for (String cube : cubeManager.getErrorCubes()) { + reporter.log("Error loading CubeDesc at " + cube); + } + + DataModelManager modelManager = DataModelManager.getInstance(config); + for (String model : modelManager.getErrorModels()) { + reporter.log("Error loading DataModelDesc at " + model); + } + } + + private void checkStoppedJob() throws Exception { + reporter.log("## Cleanup stopped job"); + int staleJobThresholdInDays = config.getStaleJobThresholdInDays(); + long outdatedJobTimeCut = System.currentTimeMillis() - 1L * staleJobThresholdInDays * 24 * 60 * 60 * 1000; + ExecutableDao executableDao = ExecutableDao.getInstance(config); + // discard all expired ERROR or STOPPED jobs + List allExecutable = executableDao.getJobs(); + for (ExecutablePO executable : allExecutable) { + long lastModified = executable.getLastModified(); + String jobStatus = executableDao.getJobOutput(executable.getUuid()).getStatus(); + if (lastModified < outdatedJobTimeCut && (ExecutableState.ERROR.toString().equals(jobStatus) + || ExecutableState.STOPPED.toString().equals(jobStatus))) { + // ExecutableManager.getInstance(config).discardJob(executable.getId()); + if (executable.getType().equals(CubingJob.class.getName()) + || executable.getType().equals(CheckpointExecutable.class.getName())) { + reporter.log("Should discard job: {}, which in ERROR/STOPPED state for {} days", executable.getId(), + staleJobThresholdInDays); + } else { + logger.warn("Unknown out of date job: {} with type: {}, which in ERROR/STOPPED state for {} days", + executable.getId(), executable.getType(), staleJobThresholdInDays); + } + } + } + } + + private void checkSegmentHDFSPath(List cubes) throws IOException { + reporter.log("## Fix missing HDFS path of segments"); + FileSystem defaultFs = HadoopUtil.getWorkingFileSystem(); + for (CubeInstance cube : cubes) { + for (CubeSegment segment : cube.getSegments()) { + String jobUuid = segment.getLastBuildJobID(); + if (jobUuid != null && jobUuid.equals("") == false) { + String path = JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobUuid); + if (!defaultFs.exists(new Path(path))) { + reporter.log( + "Project: {} cube: {} segment: {} cube id data: {} don't exist and need to rebuild it", + cube.getProject(), cube.getName(), segment, path); + reporter.log( + "The rebuild url: -d '{\"startTime\":'{}', \"endTime\":'{}', \"buildType\":\"REFRESH\"}' /kylin/api/cubes/{}/build", + segment.getTSRange().start, segment.getTSRange().end, cube.getName()); + } + } + } + } + } + + private void checkHBaseTables(List cubes) throws IOException { + reporter.log("## Checking HBase Table of segments"); + HBaseAdmin hbaseAdmin = new HBaseAdmin(HBaseConfiguration.create()); + for (CubeInstance cube : cubes) { + for (CubeSegment segment : cube.getSegments()) { + if (segment.getStatus() != SegmentStatusEnum.NEW) { + String tableName = segment.getStorageLocationIdentifier(); + if ((!hbaseAdmin.tableExists(tableName)) || (!hbaseAdmin.isTableEnabled(tableName))) { + reporter.log("HBase table: {} not exist for segment: {}, project: {}", tableName, segment, + cube.getProject()); + reporter.log( + "The rebuild url: -d '{\"startTime\":'{}', \"endTime\":'{}', \"buildType\":\"REFRESH\"}' /kylin/api/cubes/{}/build", + segment.getTSRange().start, segment.getTSRange().end, cube.getName()); + } + } + } + } + } + + private void checkCubeHoles(List cubes) { + reporter.log("## Checking holes of Cubes"); + for (CubeInstance cube : cubes) { + if (cube.isReady()) { + List holes = cubeManager.calculateHoles(cube.getName()); + if (holes.size() > 0) { + reporter.log("{} holes in cube: {}, project: {}", holes.size(), cube.getName(), cube.getProject()); + } + } + } + } + + private void checkTooManySegments(List cubes) { + reporter.log("## Checking too many segments of Cubes"); + int warningSegmentNum = config.getWarningSegmentNum(); + if (warningSegmentNum < 0) { + return; + } + for (CubeInstance cube : cubes) { + if (cube.getSegments().size() >= warningSegmentNum) { + reporter.log("Too many segments: {} for cube: {}, project: {}, please merge the segments", + cube.getSegments().size(), cube.getName(), cube.getProject()); + } + } + } + + private void checkStaleSegments(List cubes) { + for (CubeInstance cube : cubes) { + for (CubeSegment segment : cube.getSegments()) { + if (segment.getInputRecordsSize() == 0) { + // TODO: add stale segment to report + logger.info("Segment: {} in project: {} may be stale", segment, cube.getProject()); + } + } + } + } + + private void checkOutOfDateCube(List cubes) { + reporter.log("## Checking out-of-date Cubes"); + int staleCubeThresholdInDays = config.getStaleCubeThresholdInDays(); + long outdatedCubeTimeCut = System.currentTimeMillis() - 1L * staleCubeThresholdInDays * 24 * 60 * 60 * 1000; + for (CubeInstance cube : cubes) { + long lastTime = cube.getLastModified(); + logger.info("Cube {} last modified time: {}, {}", cube.getName(), new Date(lastTime), + cube.getDescriptor().getNotifyList()); + if (lastTime < outdatedCubeTimeCut) { + if (cube.isReady()) { + reporter.log( + "Ready Cube: {} in project: {} is not built more then {} days, maybe it can be disabled", + cube.getName(), cube.getProject(), staleCubeThresholdInDays); + } else { + reporter.log( + "Disabled Cube: {} in project: {} is not built more then {} days, maybe it can be deleted", + cube.getName(), cube.getProject(), staleCubeThresholdInDays); + } + } + } + } + + private void checkDataExpansionRate(List cubes) { + int warningExpansionRate = config.getWarningCubeExpansionRate(); + int expansionCheckMinCubeSizeInGb = config.getExpansionCheckMinCubeSizeInGb(); + for (CubeInstance cube : cubes) { + long sizeRecordSize = cube.getInputRecordSizeBytes(); + if (sizeRecordSize > 0) { + long cubeDataSize = cube.getSizeKB() * 1024; + double expansionRate = cubeDataSize / sizeRecordSize; + if (sizeRecordSize > 1L * expansionCheckMinCubeSizeInGb * 1024 * 1024 * 1024) { + if (expansionRate > warningExpansionRate) { + logger.info("Cube: {} in project: {} with too large expansion rate: {}, cube data size: {}G", + cube.getName(), cube.getProject(), expansionRate, cubeDataSize / 1024 / 1024 / 1024); + } + } + } + } + } + + private void checkCubeDescParams(List cubes) { + for (CubeInstance cube : cubes) { + CubeDesc desc = cube.getDescriptor(); + long[] autoMergeTS = desc.getAutoMergeTimeRanges(); + if (autoMergeTS == null || autoMergeTS.length == 0) { + logger.info("Cube: {} in project: {} with no auto merge params", cube.getName(), cube.getProject()); + } + // long volatileRange = desc.getVolatileRange(); + long retentionRange = desc.getRetentionRange(); + if (retentionRange == 0) { + logger.info("Cube: {} in project: {} with no retention params", cube.getName(), cube.getProject()); + } + // queue params + } + } +} diff --git a/tool-assembly/pom.xml b/tool-assembly/pom.xml index 21e44a23f25..62dd153e45e 100644 --- a/tool-assembly/pom.xml +++ b/tool-assembly/pom.xml @@ -99,6 +99,7 @@ commons-io:commons-io commons-lang:commons-lang org.apache.commons:commons-lang3 + org.apache.commons:commons-email com.google.guava:guava org.apache.kylin:* org.springframework.security:spring-security-core diff --git a/tool/src/main/java/org/apache/kylin/tool/KylinHealthCheckJob.java b/tool/src/main/java/org/apache/kylin/tool/KylinHealthCheckJob.java new file mode 100644 index 00000000000..419fe691448 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/KylinHealthCheckJob.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool; + +import java.io.IOException; + +public class KylinHealthCheckJob { + public static void main(String[] args) throws IOException { + org.apache.kylin.rest.job.KylinHealthCheckJob cli = new org.apache.kylin.rest.job.KylinHealthCheckJob(); + cli.execute(args); + } +}