Skip to content

Commit

Permalink
KYLIN-2296 support cube level kafka config overwrite
Browse files Browse the repository at this point in the history
  • Loading branch information
yiming187 committed Dec 22, 2016
1 parent 78a5917 commit 6429817
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,14 @@ public String getFlatHiveTableClusterByDictColumn() {
return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
}

// ============================================================================
// SOURCE.KAFKA
// ============================================================================

public Map<String, String> getKafkaConfigOverride() {
return getPropertiesByPrefix("kylin.source.kafka.config-override.");
}

// ============================================================================
// STORAGE.HBASE
// ============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ public GeneralResponse initStartOffsets(@PathVariable String cubeName) {

final GeneralResponse response = new GeneralResponse();
try {
final Map<Integer, Long> startOffsets = KafkaClient.getCurrentOffsets(cubeInstance);
final Map<Integer, Long> startOffsets = KafkaClient.getLatestOffsets(cubeInstance);
CubeDesc desc = cubeInstance.getDescriptor();
desc.setPartitionOffsetStart(startOffsets);
cubeService.getCubeDescManager().updateCubeDesc(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePar
}
}

final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
final String topic = kafakaConfig.getTopic();
final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
Expand All @@ -107,7 +107,7 @@ public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePar

if (result.getEndOffset() == Long.MAX_VALUE) {
logger.debug("Seek end offsets from topic");
Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cube);
logger.debug("The end offsets are " + latestOffsets);

for (Integer partitionId : latestOffsets.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private static File getKafkaConsumerFile(String path) {
return new File(path, KAFKA_CONSUMER_FILE);
}

public static Properties getProperties(Configuration configuration) {
public static Properties extractKafkaConfigToProperties(Configuration configuration) {
Set<String> configNames = new HashSet<String>();
try {
configNames = ConsumerConfig.configNames();
Expand Down Expand Up @@ -109,7 +109,7 @@ private Properties loadKafkaConsumerProperties() {
FileInputStream is = new FileInputStream(propFile);
Configuration conf = new Configuration();
conf.addResource(is);
properties.putAll(getProperties(conf));
properties.putAll(extractKafkaConfigToProperties(conf));
IOUtils.closeQuietly(is);

File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
Expand All @@ -118,7 +118,7 @@ private Properties loadKafkaConsumerProperties() {
Properties propOverride = new Properties();
Configuration oconf = new Configuration();
oconf.addResource(ois);
properties.putAll(getProperties(oconf));
properties.putAll(extractKafkaConfigToProperties(oconf));
IOUtils.closeQuietly(ois);
}
} catch (IOException e) {
Expand Down Expand Up @@ -151,7 +151,7 @@ private File getKafkaConsumerFile() {
return getKafkaConsumerFile(path);
}

public Properties getProperties() {
public Properties extractKafkaConfigToProperties() {
Properties prop = new Properties();
prop.putAll(this.properties);
return prop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.kylin.source.kafka.hadoop;

import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
Expand All @@ -34,15 +36,14 @@
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

/**
* Run a Hadoop Job to process the stream data in kafka;
* Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
Expand Down Expand Up @@ -103,6 +104,7 @@ public int run(String[] args) throws Exception {
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
appendKafkaOverrideProperties(KylinConfig.getInstanceFromEnv(), job.getConfiguration());
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
Expand Down Expand Up @@ -157,6 +159,15 @@ private void setupMapper(CubeSegment cubeSeg) throws IOException {
job.setNumReduceTasks(0);
}

private static void appendKafkaOverrideProperties(final KylinConfig kylinConfig, Configuration conf) {
final Map<String, String> kafkaConfOverride = kylinConfig.getKafkaConfigOverride();
if (kafkaConfOverride.isEmpty() == false) {
for (String key : kafkaConfOverride.keySet()) {
conf.set(key, kafkaConfOverride.get(key), "kafka");
}
}
}

public static void main(String[] args) throws Exception {
KafkaFlatTableJob job = new KafkaFlatTableJob();
int exitCode = ToolRunner.run(job, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
}
}

Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
final List<InputSplit> splits = new ArrayList<InputSplit>();
try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void initialize(InputSplit split, Configuration conf) throws IOException,
}
String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);

Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);

consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public static long getLatestOffset(KafkaConsumer consumer, String topic, int par
return consumer.position(topicPartition);
}

public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstance) {
final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());

final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
final String topic = kafakaConfig.getTopic();
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();

Map<Integer, Long> startOffsets = Maps.newHashMap();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
Expand All @@ -115,10 +115,10 @@ public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstan


public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());

final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
final String topic = kafakaConfig.getTopic();
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();

Map<Integer, Long> startOffsets = Maps.newHashMap();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public void after() throws Exception {
@Test
public void testLoadKafkaProperties() {
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
}

@Test
Expand All @@ -44,7 +44,7 @@ public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserC
conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
assertEquals("30000", conf.get("session.timeout.ms"));

Properties prop = KafkaConsumerProperties.getProperties(conf);
Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
assertEquals("30000", prop.getProperty("session.timeout.ms"));
}
}

0 comments on commit 6429817

Please sign in to comment.