Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-278] Fix sending lineage event for KafkaSource #2131

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -499,6 +500,30 @@ public void removeProp(String key) {
}
}

/**
* Remove all properties with a certain keyPrefix
*
* @param prefix key prefix
*/
public void removePropsWithPrefix(String prefix) {
this.specProperties.entrySet().removeIf(entry -> ((String) entry.getKey()).startsWith(prefix));

Properties newCommonProperties = null;
for (Object key: this.commonProperties.keySet()) {
if (((String)key).startsWith(prefix)) {
if (newCommonProperties == null) {
newCommonProperties = new Properties();
newCommonProperties.putAll(this.commonProperties);
}
newCommonProperties.remove(key);
}
}

if (newCommonProperties != null) {
this.commonProperties = newCommonProperties;
}
}

/**
* @deprecated Use {@link #getProp(String)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,20 @@ public boolean contains(String key) {
return super.contains(key) || this.workUnit.contains(key) || this.jobState.contains(key);
}

@Override
public void removeProp(String key) {
super.removeProp(key);
this.workUnit.removeProp(key);
this.jobState.removeProp(key);
}

@Override
public void removePropsWithPrefix(String prefix) {
super.removePropsWithPrefix(prefix);
this.workUnit.removePropsWithPrefix(prefix);
this.jobState.removePropsWithPrefix(prefix);
}

/**
* Get the {@link org.apache.gobblin.source.workunit.Extract} associated with the {@link WorkUnit}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@

@Slf4j
public class LineageInfo {
public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
public static final String BRANCH_ID_METADATA_KEY = "branchId";
private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + ".";
public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn";
private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,16 @@ public void run() {
Assert.fail("Concurrency test failed with first exception: " + ExceptionUtils.getFullStackTrace(this.exceptions.poll()));
}
}

@Test
public void testRemovePropsWithPrefix() {
final State state = new State();
final String prefix = "prefix";
for (int i = 0; i < 10; i++) {
state.setProp("prefix." + i, i);
}
Assert.assertTrue(state.getPropertyNames().size() == 10);
state.removePropsWithPrefix(prefix);
Assert.assertTrue(state.getPropertyNames().size() == 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import org.apache.gobblin.lineage.LineageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,6 +133,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
private Extract.TableType tableType;
private String extractNamespace;
private boolean isFullExtract;
private String kafkaBrokers;
private boolean shouldEnableDatasetStateStore;
private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
private Set<String> topicsToProcess;
Expand Down Expand Up @@ -172,7 +174,7 @@ public List<WorkUnit> getWorkunits(SourceState state) {
extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);

kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);

Expand Down Expand Up @@ -538,17 +540,18 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets
}

WorkUnit workUnit = WorkUnit.create(extract);
if (topicSpecificState.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicate logic to line 239

workUnit.addAll(topicSpecificState.get());
}

workUnit.setProp(TOPIC_NAME, partition.getTopicName());
addDatasetUrnOptionally(workUnit);
workUnit.setProp(PARTITION_ID, partition.getId());
workUnit.setProp(LEADER_ID, partition.getLeader().getId());
workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());

// Add lineage info
workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName());
LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);

LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@

import com.google.common.collect.Lists;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
Expand Down Expand Up @@ -70,27 +68,21 @@ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int num
double avgGroupSize = totalEstDataSize / numContainers / getPreGroupingSizeFactor(this.state);

List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(entry.getValue());
for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
if (estimatedDataSizeForTopic < avgGroupSize) {

// If the total estimated size of a topic is smaller than group size, put all partitions of this
// topic in a single group.
MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey());
addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup);
addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
mwuGroups.add(mwuGroup);
} else {
// Use best-fit-decreasing to group workunits for a topic into multiple groups.
mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), entry.getValue(), avgGroupSize));
mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, avgGroupSize));
}
}

// Add common lineage information
for (MultiWorkUnit multiWorkUnit: mwuGroups) {
LineageInfo.setDatasetLineageAttribute(multiWorkUnit, ConfigurationKeys.KAFKA_BROKERS, this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, ""));
}

List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups);
return worstFitDecreasingBinPacking(groups, numContainers);
}
Expand All @@ -111,7 +103,7 @@ private static double getPreGroupingSizeFactor(State state) {
* Group {@link WorkUnit}s into groups. Each group is a {@link MultiWorkUnit}. Each group has a capacity of
* avgGroupSize. If there's a single {@link WorkUnit} whose size is larger than avgGroupSize, it forms a group itself.
*/
private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, List<WorkUnit> workUnits, double avgGroupSize) {
private static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {

// Sort workunits by data size desc
Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
Expand All @@ -123,7 +115,6 @@ private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, Lis
addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
} else {
bestGroup = MultiWorkUnit.createEmpty();
bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic);
addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
}
pQueue.add(bestGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,11 @@ public enum SizeEstimatorType {

protected final AbstractSource<?, ?> source;
protected final SourceState state;
protected final Extract.TableType tableType;
protected final String extractNameSpace;
protected final boolean isFullExtract;
protected final KafkaWorkUnitSizeEstimator sizeEstimator;

protected KafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState state) {
this.source = source;
this.state = state;
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
KafkaSource.DEFAULT_TABLE_TYPE.toString());
tableType = Extract.TableType.valueOf(tableTypeStr);
extractNameSpace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
} else {
// To be compatible, reject table type and namespace configuration keys as previous implementation
tableType = KafkaSource.DEFAULT_TABLE_TYPE;
extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
this.sizeEstimator = getWorkUnitSizeEstimator();
}

Expand Down Expand Up @@ -227,13 +213,18 @@ protected WorkUnit squeezeMultiWorkUnit(MultiWorkUnit multiWorkUnit) {
List<KafkaPartition> partitions = getPartitionsFromMultiWorkUnit(multiWorkUnit);
Preconditions.checkArgument(!partitions.isEmpty(), "There must be at least one partition in the multiWorkUnit");

Extract extract = this.source.createExtract(tableType, extractNameSpace, partitions.get(0).getTopicName());
if (isFullExtract) {
extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
}
WorkUnit workUnit = WorkUnit.create(extract, interval);
// Squeeze all partitions from the multiWorkUnit into of one the work units, which can be any one
WorkUnit workUnit = multiWorkUnit.getWorkUnits().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to preserve properties from the workunit by using an existing workunit instead of creating a new one? Will removing the properties below affect other places that access this workunit?

Would it be safer and sufficient to create a new workunit and copy the properties over from the first work unit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to preserve properties from the workunit by using an existing workunit instead of creating a new one?

Yes.

Will removing the properties below affect other places that access this workunit?

No. Because these properties are either ignored or replaced by the configurations immediately set below.

Would it be safer and sufficient to create a new workunit and copy the properties over from the first work unit?

No. We do need to remove those properties. Otherwise, the logic won't be the same as before. And all work units of in the same multi work unit have the same properties but those that are removed and extractid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant copy then remove from the copy. But if it is safe then you can remove directly.

// Update interval
workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY);
workUnit.setWatermarkInterval(interval);
// Remove the original partition information
workUnit.removeProp(KafkaSource.PARTITION_ID);
workUnit.removeProp(KafkaSource.LEADER_ID);
workUnit.removeProp(KafkaSource.LEADER_HOSTANDPORT);
// Add combined partitions information
populateMultiPartitionWorkUnit(partitions, workUnit);
workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, multiWorkUnit.getProp(ESTIMATED_WORKUNIT_SIZE));
LOG.info(String.format("Created MultiWorkUnit for partitions %s", partitions));
return workUnit;
}
Expand All @@ -243,9 +234,7 @@ protected WorkUnit squeezeMultiWorkUnit(MultiWorkUnit multiWorkUnit) {
*/
private static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions, WorkUnit workUnit) {
Preconditions.checkArgument(!partitions.isEmpty(), "There should be at least one partition");
workUnit.setProp(KafkaSource.TOPIC_NAME, partitions.get(0).getTopicName());
GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic", partitions.get(0).getTopicName()));
workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partitions.get(0).getTopicName());
for (int i = 0; i < partitions.size(); i++) {
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i), partitions.get(i).getId());
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.lineage.LineageException;
import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.publisher.CommitSequencePublisher;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.NoopPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.source.extractor.JobCommitPolicy;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -57,7 +57,7 @@
* {@link DataPublisher#publish(Collection)}. This class is thread-safe if and only if the implementation of
* {@link DataPublisher} used is also thread-safe.
*/
@AllArgsConstructor
@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {

Expand All @@ -71,13 +71,17 @@ final class SafeDatasetCommit implements Callable<Void> {
private final boolean isMultithreaded;
private final JobContext jobContext;

private MetricContext metricContext;

@Override
public Void call()
throws Exception {
if (this.datasetState.getState() == JobState.RunningState.COMMITTED) {
log.info(this.datasetUrn + " have been committed.");
return null;
}
metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class);

finalizeDatasetStateBeforeCommit(this.datasetState);
Class<? extends DataPublisher> dataPublisherClass;
try (Closer closer = Closer.create()) {
Expand Down Expand Up @@ -159,6 +163,7 @@ public Void call()
} finally {
try {
finalizeDatasetState(datasetState, datasetUrn);
submitLineageEvent(datasetState.getTaskStates());
if (commitSequenceBuilder.isPresent()) {
buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
datasetState.setState(JobState.RunningState.COMMITTED);
Expand All @@ -182,9 +187,8 @@ private void submitLineageEvent(Collection<TaskState> states) {
}

TaskState oneWorkUnitState = states.iterator().next();
if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals(
NoopPublisher.class.getName())) {
// if no publisher configured, each task should be responsible for sending lineage event.
if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
// Do nothing if the dataset is not configured with lineage info
return;
}

Expand All @@ -194,14 +198,18 @@ private void submitLineageEvent(Collection<TaskState> states) {
Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values();
for (Collection<State> dataState: datasetStates) {
Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All);
EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class),
LineageInfo.LINEAGE_NAME_SPACE).build();
EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build();
for (LineageInfo info: branchLineages) {
submitter.submit(info.getId(), info.getLineageMetaData());
}
}
} catch (LineageException e) {
log.error ("Lineage event submission failed due to :" + e.toString());
} finally {
for (TaskState taskState: states) {
// Remove lineage info from the state to avoid sending duplicate lineage events in the next run
taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
}
}
}

Expand All @@ -222,7 +230,6 @@ private void commitDataset(Collection<TaskState> taskStates, DataPublisher publi

try {
publisher.publish(taskStates);
submitLineageEvent(taskStates);
} catch (Throwable t) {
log.error("Failed to commit dataset", t);
setTaskFailureException(taskStates, t);
Expand Down