Skip to content

Commit

Permalink
[GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
Browse files Browse the repository at this point in the history
Closes #2815 from sv2000/metricContextPacker
  • Loading branch information
sv2000 authored and suvasude committed Nov 21, 2019
1 parent e41fd94 commit ae62669
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
Expand Up @@ -263,7 +263,7 @@ public String apply(KafkaTopic topic) {
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state);
KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;

import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +36,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
Expand All @@ -46,6 +46,7 @@
import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;


/**
Expand Down Expand Up @@ -368,18 +369,23 @@ protected static List<List<KafkaPartition>> getMultiWorkUnitPartitions(MultiWork
}

public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state) {
return getInstance(source, state, Optional.absent());
}

public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state,
Optional<MetricContext> metricContext) {
if (state.contains(KAFKA_WORKUNIT_PACKER_TYPE)) {
String packerTypeStr = state.getProp(KAFKA_WORKUNIT_PACKER_TYPE);
Optional<PackerType> packerType = Enums.getIfPresent(PackerType.class, packerTypeStr);
if (packerType.isPresent()) {
return getInstance(packerType.get(), source, state);
return getInstance(packerType.get(), source, state, metricContext);
}
throw new IllegalArgumentException("WorkUnit packer type " + packerTypeStr + " not found");
}
return getInstance(DEFAULT_PACKER_TYPE, source, state);
return getInstance(DEFAULT_PACKER_TYPE, source, state, metricContext);
}

public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state) {
public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state, Optional<MetricContext> metricContext) {
switch (packerType) {
case SINGLE_LEVEL:
return new KafkaSingleLevelWorkUnitPacker(source, state);
Expand All @@ -388,7 +394,11 @@ public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSou
case CUSTOM:
Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
String className = state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
return GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className, source, state);
try {
return (KafkaWorkUnitPacker) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), source, state, metricContext);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
default:
throw new IllegalArgumentException("WorkUnit packer type " + packerType + " not found");
}
Expand Down
Expand Up @@ -20,14 +20,15 @@
import java.util.List;
import java.util.Map;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.WorkUnit;

import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
Expand Down
Expand Up @@ -137,7 +137,7 @@ public Map<String, CheckpointableWatermark> getCommittedWatermarks(Class<? exten
return committed;
}

Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
return _stateStore.getAll(_storeName);
}

Expand Down

0 comments on commit ae62669

Please sign in to comment.