Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOBBLIN-970: Pass metric context from the KafkaSource to the KafkaWor… #2815

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -263,7 +263,7 @@ public String apply(KafkaTopic topic) {
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state);
KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;

import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +36,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
Expand All @@ -46,6 +46,7 @@
import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;


/**
Expand Down Expand Up @@ -368,18 +369,23 @@ protected static List<List<KafkaPartition>> getMultiWorkUnitPartitions(MultiWork
}

public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the original function to make the change backward compatible?

return getInstance(source, state, Optional.absent());
}

public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state,
Optional<MetricContext> metricContext) {
if (state.contains(KAFKA_WORKUNIT_PACKER_TYPE)) {
String packerTypeStr = state.getProp(KAFKA_WORKUNIT_PACKER_TYPE);
Optional<PackerType> packerType = Enums.getIfPresent(PackerType.class, packerTypeStr);
if (packerType.isPresent()) {
return getInstance(packerType.get(), source, state);
return getInstance(packerType.get(), source, state, metricContext);
}
throw new IllegalArgumentException("WorkUnit packer type " + packerTypeStr + " not found");
}
return getInstance(DEFAULT_PACKER_TYPE, source, state);
return getInstance(DEFAULT_PACKER_TYPE, source, state, metricContext);
}

public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state) {
public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state, Optional<MetricContext> metricContext) {
switch (packerType) {
case SINGLE_LEVEL:
return new KafkaSingleLevelWorkUnitPacker(source, state);
Expand All @@ -388,7 +394,11 @@ public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSou
case CUSTOM:
Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
String className = state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
return GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className, source, state);
try {
return (KafkaWorkUnitPacker) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), source, state, metricContext);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
default:
throw new IllegalArgumentException("WorkUnit packer type " + packerType + " not found");
}
Expand Down
Expand Up @@ -20,14 +20,15 @@
import java.util.List;
import java.util.Map;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.WorkUnit;

import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
Expand Down
Expand Up @@ -137,7 +137,7 @@ public Map<String, CheckpointableWatermark> getCommittedWatermarks(Class<? exten
return committed;
}

Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
return _stateStore.getAll(_storeName);
}

Expand Down