Skip to content

Commit

Permalink
[GOBBLIN-278] Fix sending lineage event for KafkaSource
Browse files Browse the repository at this point in the history
Closes #2131 from zxcware/lineage
  • Loading branch information
zxcware authored and htran1 committed Oct 10, 2017
1 parent bd17f13 commit d2e4354
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -499,6 +500,30 @@ public void removeProp(String key) {
}
}

/**
* Remove all properties with a certain keyPrefix
*
* @param prefix key prefix
*/
public void removePropsWithPrefix(String prefix) {
this.specProperties.entrySet().removeIf(entry -> ((String) entry.getKey()).startsWith(prefix));

Properties newCommonProperties = null;
for (Object key: this.commonProperties.keySet()) {
if (((String)key).startsWith(prefix)) {
if (newCommonProperties == null) {
newCommonProperties = new Properties();
newCommonProperties.putAll(this.commonProperties);
}
newCommonProperties.remove(key);
}
}

if (newCommonProperties != null) {
this.commonProperties = newCommonProperties;
}
}

/**
* @deprecated Use {@link #getProp(String)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,20 @@ public boolean contains(String key) {
return super.contains(key) || this.workUnit.contains(key) || this.jobState.contains(key);
}

@Override
public void removeProp(String key) {
super.removeProp(key);
this.workUnit.removeProp(key);
this.jobState.removeProp(key);
}

@Override
public void removePropsWithPrefix(String prefix) {
super.removePropsWithPrefix(prefix);
this.workUnit.removePropsWithPrefix(prefix);
this.jobState.removePropsWithPrefix(prefix);
}

/**
* Get the {@link org.apache.gobblin.source.workunit.Extract} associated with the {@link WorkUnit}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@

@Slf4j
public class LineageInfo {
public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
public static final String BRANCH_ID_METADATA_KEY = "branchId";
private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + ".";
public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn";
private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,16 @@ public void run() {
Assert.fail("Concurrency test failed with first exception: " + ExceptionUtils.getFullStackTrace(this.exceptions.poll()));
}
}

@Test
public void testRemovePropsWithPrefix() {
final State state = new State();
final String prefix = "prefix";
for (int i = 0; i < 10; i++) {
state.setProp("prefix." + i, i);
}
Assert.assertTrue(state.getPropertyNames().size() == 10);
state.removePropsWithPrefix(prefix);
Assert.assertTrue(state.getPropertyNames().size() == 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import org.apache.gobblin.lineage.LineageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,6 +133,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
private Extract.TableType tableType;
private String extractNamespace;
private boolean isFullExtract;
private String kafkaBrokers;
private boolean shouldEnableDatasetStateStore;
private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
private Set<String> topicsToProcess;
Expand Down Expand Up @@ -172,7 +174,7 @@ public List<WorkUnit> getWorkunits(SourceState state) {
extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);

kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);

Expand Down Expand Up @@ -538,17 +540,18 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets
}

WorkUnit workUnit = WorkUnit.create(extract);
if (topicSpecificState.isPresent()) {
workUnit.addAll(topicSpecificState.get());
}

workUnit.setProp(TOPIC_NAME, partition.getTopicName());
addDatasetUrnOptionally(workUnit);
workUnit.setProp(PARTITION_ID, partition.getId());
workUnit.setProp(LEADER_ID, partition.getLeader().getId());
workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());

// Add lineage info
workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName());
LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);

LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@

import com.google.common.collect.Lists;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
Expand Down Expand Up @@ -70,27 +68,21 @@ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int num
double avgGroupSize = totalEstDataSize / numContainers / getPreGroupingSizeFactor(this.state);

List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(entry.getValue());
for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
if (estimatedDataSizeForTopic < avgGroupSize) {

// If the total estimated size of a topic is smaller than group size, put all partitions of this
// topic in a single group.
MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey());
addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup);
addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
mwuGroups.add(mwuGroup);
} else {
// Use best-fit-decreasing to group workunits for a topic into multiple groups.
mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), entry.getValue(), avgGroupSize));
mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, avgGroupSize));
}
}

// Add common lineage information
for (MultiWorkUnit multiWorkUnit: mwuGroups) {
LineageInfo.setDatasetLineageAttribute(multiWorkUnit, ConfigurationKeys.KAFKA_BROKERS, this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, ""));
}

List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups);
return worstFitDecreasingBinPacking(groups, numContainers);
}
Expand All @@ -111,7 +103,7 @@ private static double getPreGroupingSizeFactor(State state) {
* Group {@link WorkUnit}s into groups. Each group is a {@link MultiWorkUnit}. Each group has a capacity of
* avgGroupSize. If there's a single {@link WorkUnit} whose size is larger than avgGroupSize, it forms a group itself.
*/
private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, List<WorkUnit> workUnits, double avgGroupSize) {
private static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {

// Sort workunits by data size desc
Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
Expand All @@ -123,7 +115,6 @@ private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, Lis
addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
} else {
bestGroup = MultiWorkUnit.createEmpty();
bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic);
addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
}
pQueue.add(bestGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,11 @@ public enum SizeEstimatorType {

protected final AbstractSource<?, ?> source;
protected final SourceState state;
protected final Extract.TableType tableType;
protected final String extractNameSpace;
protected final boolean isFullExtract;
protected final KafkaWorkUnitSizeEstimator sizeEstimator;

protected KafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState state) {
this.source = source;
this.state = state;
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
KafkaSource.DEFAULT_TABLE_TYPE.toString());
tableType = Extract.TableType.valueOf(tableTypeStr);
extractNameSpace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
} else {
// To be compatible, reject table type and namespace configuration keys as previous implementation
tableType = KafkaSource.DEFAULT_TABLE_TYPE;
extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
this.sizeEstimator = getWorkUnitSizeEstimator();
}

Expand Down Expand Up @@ -227,13 +213,18 @@ protected WorkUnit squeezeMultiWorkUnit(MultiWorkUnit multiWorkUnit) {
List<KafkaPartition> partitions = getPartitionsFromMultiWorkUnit(multiWorkUnit);
Preconditions.checkArgument(!partitions.isEmpty(), "There must be at least one partition in the multiWorkUnit");

Extract extract = this.source.createExtract(tableType, extractNameSpace, partitions.get(0).getTopicName());
if (isFullExtract) {
extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
}
WorkUnit workUnit = WorkUnit.create(extract, interval);
// Squeeze all partitions from the multiWorkUnit into of one the work units, which can be any one
WorkUnit workUnit = multiWorkUnit.getWorkUnits().get(0);
// Update interval
workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY);
workUnit.setWatermarkInterval(interval);
// Remove the original partition information
workUnit.removeProp(KafkaSource.PARTITION_ID);
workUnit.removeProp(KafkaSource.LEADER_ID);
workUnit.removeProp(KafkaSource.LEADER_HOSTANDPORT);
// Add combined partitions information
populateMultiPartitionWorkUnit(partitions, workUnit);
workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, multiWorkUnit.getProp(ESTIMATED_WORKUNIT_SIZE));
LOG.info(String.format("Created MultiWorkUnit for partitions %s", partitions));
return workUnit;
}
Expand All @@ -243,9 +234,7 @@ protected WorkUnit squeezeMultiWorkUnit(MultiWorkUnit multiWorkUnit) {
*/
private static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions, WorkUnit workUnit) {
Preconditions.checkArgument(!partitions.isEmpty(), "There should be at least one partition");
workUnit.setProp(KafkaSource.TOPIC_NAME, partitions.get(0).getTopicName());
GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic", partitions.get(0).getTopicName()));
workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partitions.get(0).getTopicName());
for (int i = 0; i < partitions.size(); i++) {
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i), partitions.get(i).getId());
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.lineage.LineageException;
import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.publisher.CommitSequencePublisher;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.NoopPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.source.extractor.JobCommitPolicy;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -57,7 +57,7 @@
* {@link DataPublisher#publish(Collection)}. This class is thread-safe if and only if the implementation of
* {@link DataPublisher} used is also thread-safe.
*/
@AllArgsConstructor
@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {

Expand All @@ -71,13 +71,17 @@ final class SafeDatasetCommit implements Callable<Void> {
private final boolean isMultithreaded;
private final JobContext jobContext;

private MetricContext metricContext;

@Override
public Void call()
throws Exception {
if (this.datasetState.getState() == JobState.RunningState.COMMITTED) {
log.info(this.datasetUrn + " have been committed.");
return null;
}
metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class);

finalizeDatasetStateBeforeCommit(this.datasetState);
Class<? extends DataPublisher> dataPublisherClass;
try (Closer closer = Closer.create()) {
Expand Down Expand Up @@ -159,6 +163,7 @@ public Void call()
} finally {
try {
finalizeDatasetState(datasetState, datasetUrn);
submitLineageEvent(datasetState.getTaskStates());
if (commitSequenceBuilder.isPresent()) {
buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
datasetState.setState(JobState.RunningState.COMMITTED);
Expand All @@ -182,9 +187,8 @@ private void submitLineageEvent(Collection<TaskState> states) {
}

TaskState oneWorkUnitState = states.iterator().next();
if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals(
NoopPublisher.class.getName())) {
// if no publisher configured, each task should be responsible for sending lineage event.
if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
// Do nothing if the dataset is not configured with lineage info
return;
}

Expand All @@ -194,14 +198,18 @@ private void submitLineageEvent(Collection<TaskState> states) {
Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values();
for (Collection<State> dataState: datasetStates) {
Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All);
EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class),
LineageInfo.LINEAGE_NAME_SPACE).build();
EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build();
for (LineageInfo info: branchLineages) {
submitter.submit(info.getId(), info.getLineageMetaData());
}
}
} catch (LineageException e) {
log.error ("Lineage event submission failed due to :" + e.toString());
} finally {
for (TaskState taskState: states) {
// Remove lineage info from the state to avoid sending duplicate lineage events in the next run
taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
}
}
}

Expand All @@ -222,7 +230,6 @@ private void commitDataset(Collection<TaskState> taskStates, DataPublisher publi

try {
publisher.publish(taskStates);
submitLineageEvent(taskStates);
} catch (Throwable t) {
log.error("Failed to commit dataset", t);
setTaskFailureException(taskStates, t);
Expand Down

0 comments on commit d2e4354

Please sign in to comment.