Skip to content

Commit

Permalink
KYLIN-4612 support job status writing to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
bigxiaochu committed Jul 8, 2020
1 parent c02bb74 commit 65107a5
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
5 changes: 5 additions & 0 deletions core-common/pom.xml
Expand Up @@ -73,6 +73,11 @@
<artifactId>jsch</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>

<!-- Env & Test -->
<dependency>
<groupId>junit</groupId>
Expand Down
Expand Up @@ -2670,4 +2670,12 @@ public String getIntersectFilterOrSeparator() {
public int getDefaultTimeFilter() {
return Integer.parseInt(getOptional("kylin.web.default-time-filter", "2"));
}

public boolean jobStatusWriteKafka() {
return Boolean.parseBoolean(getOptional("kylin.engine.job-status.write.kafka", FALSE));
}

public Map<String, String> getJobStatusKafkaConfig() {
return getPropertiesByPrefix("kylin.engine.job-status.kafka.");
}
}
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.common.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kylin.common.KylinConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Properties;

public class KafkaMsgProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaMsgProducer.class);

private static Producer<String, String> producer;
private static Map<String, String> kafkaConfig;
private static String TOPIC_NAME;


private static Properties kafkaProperties = new Properties() {
{
put(ProducerConfig.ACKS_CONFIG, "-1");
put(ProducerConfig.RETRIES_CONFIG, 3);
put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
put(ProducerConfig.LINGER_MS_CONFIG, 500);
put(ProducerConfig.BATCH_SIZE_CONFIG, 10000);
put("max.in.flight.requests.per.connection", 1);
}
};


private KafkaMsgProducer() {
init();
}

private static class BasicProducerHolder {
private static final KafkaMsgProducer INSTANCE = new KafkaMsgProducer();
}

public static final KafkaMsgProducer getInstance() {
return BasicProducerHolder.INSTANCE;
}

public void init() {
if (null == kafkaConfig) {
kafkaConfig = KylinConfig.getInstanceFromEnv().getJobStatusKafkaConfig();
}
if (null == producer) {
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.get("bootstrap.servers"));
for(Map.Entry<String, String> entry : kafkaConfig.entrySet()){
kafkaProperties.put(entry.getKey(), entry.getValue());
}
producer = new KafkaProducer<>(kafkaProperties);
}
if (null == TOPIC_NAME) {
TOPIC_NAME = kafkaConfig.get("topic.name");
}
}

public void sendJobStatusMessage(String message) {
sendMessage(message);
}

public void sendEventMessage(String message) {
sendMessage(message);
}

private void sendMessage(final String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record, (recordMetadata, exception) -> {
if (null != exception) {
logger.error("kafka send message error.", exception);
}
});
}

}
Expand Up @@ -32,8 +32,10 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.kafka.KafkaMsgProducer;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.dao.ExecutablePO;
Expand Down Expand Up @@ -65,11 +67,15 @@ static ExecutableManager newInstance(KylinConfig config) throws IOException {

private final KylinConfig config;
private final ExecutableDao executableDao;
private KafkaMsgProducer kafkaMsgProducer;

private ExecutableManager(KylinConfig config) {
logger.info("Using metadata url: " + config);
this.config = config;
this.executableDao = ExecutableDao.getInstance(config);
if (config.jobStatusWriteKafka()) {
this.kafkaMsgProducer = KafkaMsgProducer.getInstance();
}
}

private static ExecutablePO parse(AbstractExecutable executable) {
Expand Down Expand Up @@ -477,12 +483,64 @@ public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String,
}
executableDao.updateJobOutput(jobOutput);
logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);

//write status to kafka
if (config.jobStatusWriteKafka()) {
AbstractExecutable executable = getJob(jobId);
if (executable == null) {
return;
}
if (executable instanceof DefaultChainedExecutable) {
StringBuffer result = new StringBuffer();

DefaultChainedExecutable job = (DefaultChainedExecutable)executable;

result.append("{");

result.append("\"jobId\":\"" + job.getId() + "\",");
result.append("\"jobName\":\"" + job.getName() + "\",");
result.append("\"status\":\"" + parseToJobStatus(job.getStatus()).name() + "\",");
result.append("\"subTaskSize\": \"" + job.getTasks().size() + "\",");

result.append("\"subTasks\":[");
job.getTasks().forEach(item -> {
result.append("{");
result.append("\"jobId\":\"" + item.getId() + "\",");
result.append("\"jobName\":\"" + item.getName() + "\",");
result.append("\"status\":\"" + parseToJobStatus(item.getStatus()).name() + "\"");
result.append("},");
});
String resultStr = result.substring(0, result.length() - 1);
resultStr += "]}";

kafkaMsgProducer.sendJobStatusMessage(resultStr);
}
}
} catch (PersistentException e) {
logger.error("error change job:" + jobId + " to " + newStatus);
throw new RuntimeException(e);
}
}

private JobStatusEnum parseToJobStatus(ExecutableState state) {
switch (state) {
case READY:
return JobStatusEnum.PENDING;
case RUNNING:
return JobStatusEnum.RUNNING;
case ERROR:
return JobStatusEnum.ERROR;
case DISCARDED:
return JobStatusEnum.DISCARDED;
case SUCCEED:
return JobStatusEnum.FINISHED;
case STOPPED:
return JobStatusEnum.STOPPED;
default:
throw new RuntimeException("invalid state:" + state);
}
}

public void reloadAll() throws IOException {
executableDao.reloadAll();
}
Expand Down

0 comments on commit 65107a5

Please sign in to comment.