Skip to content

Commit

Permalink
[GOBBLIN-334] Implement SharedResourceFactory for LineageInfo
Browse files Browse the repository at this point in the history
Closes apache#2187 from zxcware/share
  • Loading branch information
zxcware authored and autumnust committed Jan 9, 2018
1 parent 5817cce commit 6719050
Show file tree
Hide file tree
Showing 16 changed files with 275 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.broker;

import org.apache.gobblin.broker.iface.SharedResourceKey;


/**
* A {@link SharedResourceKey} with only a string name
*/
public class StringNameSharedResourceKey implements SharedResourceKey {
private final String name;

public StringNameSharedResourceKey(String name) {
this.name = name;
}

@Override
public String toConfigurationKey() {
return name;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

StringNameSharedResourceKey that = (StringNameSharedResourceKey) o;

return name != null ? name.equals(that.name) : that.name == null;
}

@Override
public int hashCode() {
return name != null ? name.hashCode() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.Extract;

import lombok.Getter;
import lombok.Setter;


/**
Expand All @@ -67,6 +70,9 @@ public class SourceState extends State {
@Getter
private final List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList();

@Getter @Setter
private SharedResourcesBroker<GobblinScopeTypes> broker;

/**
* Default constructor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.gobblin.dataset;

import org.apache.gobblin.configuration.State;

import com.typesafe.config.Config;

/**
* A factory that creates an instance of {@link DatasetResolver}
*/
public interface DatasetResolverFactory {
String NAMESPACE = "DatasetResolverFactory";
String CLASS = NAMESPACE + "." + "class";

DatasetResolver createResolver(State state);
/**
* Create a {@link DatasetResolver} instance
*/
DatasetResolver createResolver(Config config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
public class NoopDatasetResolver implements DatasetResolver {
public static final NoopDatasetResolver INSTANCE = new NoopDatasetResolver();
public static final String FACTORY = "NOOP";

private NoopDatasetResolver() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetConstants;
Expand Down Expand Up @@ -109,6 +110,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
protected final int parallelRunnerThreads;
protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
protected final Optional<LineageInfo> lineageInfo;

/* Each partition in each branch may have separate metadata. The metadata mergers are responsible
* for aggregating this information from all workunits so it can be published.
Expand Down Expand Up @@ -144,6 +146,15 @@ public BaseDataPublisher(State state)
conf.set(key, this.getState().getProp(key));
}

// Extract LineageInfo from state
if (state instanceof SourceState) {
lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
} else if (state instanceof WorkUnitState) {
lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
} else {
lineageInfo = Optional.absent();
}

this.numBranches = this.getState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
this.shouldRetry = this.getState().getPropAsBoolean(PUBLISH_RETRY_ENABLED, false);

Expand Down Expand Up @@ -284,7 +295,9 @@ public void close()

private void addLineageInfo(WorkUnitState state, int branchId) {
DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
LineageInfo.putDestination(destination, branchId, state);
if (this.lineageInfo.isPresent()) {
this.lineageInfo.get().putDestination(destination, branchId, state);
}
}

protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.extractor.partition.Partition;
Expand Down Expand Up @@ -93,6 +92,8 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
*/
public static final Integer CURRENT_WORK_UNIT_STATE_VERSION = 3;

protected Optional<LineageInfo> lineageInfo;

/** A class that encapsulates a source entity (aka dataset) to be processed */
@Data
public static final class SourceEntity {
Expand Down Expand Up @@ -168,6 +169,7 @@ public int hashCode() {
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
initLogger(state);
lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

List<WorkUnit> workUnits = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,22 @@

import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;

import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.metadata.MetadataMerger;
import org.apache.gobblin.metadata.types.GlobalMetadata;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
Expand Down Expand Up @@ -528,8 +536,9 @@ public void testWithPartitionKey() throws IOException {
public void testPublishSingleTask()
throws IOException {
WorkUnitState state = buildTaskState(1);
LineageInfo lineageInfo = LineageInfo.getLineageInfo(state.getTaskBroker()).get();
DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
LineageInfo.setSource(source, state);
lineageInfo.setSource(source, state);
BaseDataPublisher publisher = new BaseDataPublisher(state);
publisher.publishData(state);
Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination"));
Expand All @@ -541,9 +550,10 @@ public void testPublishMultiTasks()
throws IOException {
WorkUnitState state1 = buildTaskState(2);
WorkUnitState state2 = buildTaskState(2);
LineageInfo lineageInfo = LineageInfo.getLineageInfo(state1.getTaskBroker()).get();
DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
LineageInfo.setSource(source, state1);
LineageInfo.setSource(source, state2);
lineageInfo.setSource(source, state1);
lineageInfo.setSource(source, state2);
BaseDataPublisher publisher = new BaseDataPublisher(state1);
publisher.publishData(ImmutableList.of(state1, state2));
Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.0.destination"));
Expand Down Expand Up @@ -623,7 +633,16 @@ private State buildDefaultState(int numBranches)
}

private WorkUnitState buildTaskState(int numBranches) {
WorkUnitState state = new WorkUnitState();
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
.createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker
.newSubscopedBuilder(new JobScopeInstance("LineageEventTest", String.valueOf(System.currentTimeMillis())))
.build();
SharedResourcesBroker<GobblinScopeTypes> taskBroker = jobBroker
.newSubscopedBuilder(new TaskScopeInstance("LineageEventTestTask" + String.valueOf(System.currentTimeMillis())))
.build();

WorkUnitState state = new WorkUnitState(WorkUnit.createEmpty(), new State(), taskBroker);

state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace");
state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {

public MetricContext metricContext;

protected Optional<LineageInfo> lineageInfo;

/**
* <ul>
* Does the following:
Expand All @@ -139,6 +141,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
public List<WorkUnit> getWorkunits(final SourceState state) {

this.metricContext = Instrumented.getMetricContext(state, CopySource.class);
this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

try {

Expand Down Expand Up @@ -320,8 +323,10 @@ private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
* a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
* to be set by the same logic
*/
if (copyableFile.getSourceDataset() != null && copyableFile.getDestinationDataset() != null) {
LineageInfo.setSource(copyableFile.getSourceDataset(), workUnit);
if (lineageInfo.isPresent() &&
copyableFile.getSourceDataset() != null &&
copyableFile.getDestinationDataset() != null) {
lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.publisher;


import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
import java.io.IOException;
Expand Down Expand Up @@ -81,6 +82,7 @@ public boolean isThreadSafe() {
private final FileSystem fs;
protected final EventSubmitter eventSubmitter;
protected final RecoveryHelper recoveryHelper;
protected final Optional<LineageInfo> lineageInfo;

/**
* Build a new {@link CopyDataPublisher} from {@link State}. The constructor expects the following to be set in the
Expand All @@ -93,6 +95,15 @@ public boolean isThreadSafe() {
*/
public CopyDataPublisher(State state) throws IOException {
super(state);
// Extract LineageInfo from state
if (state instanceof SourceState) {
lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
} else if (state instanceof WorkUnitState) {
lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
} else {
lineageInfo = Optional.absent();
}

String uri = this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI);
this.fs = FileSystem.get(URI.create(uri), WriterUtils.getFsConfiguration(state));

Expand Down Expand Up @@ -214,7 +225,9 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
}
LineageInfo.putDestination(copyableFile.getDestinationDataset(), 0, wus);
if (lineageInfo.isPresent()) {
lineageInfo.get().putDestination(copyableFile.getDestinationDataset(), 0, wus);
}
}
if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
datasetOriginTimestamp = copyableFile.getOriginTimestamp();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.metrics.broker;

import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.ResourceInstance;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.ConfigView;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.ScopedConfigView;
import org.apache.gobblin.broker.iface.SharedResourceFactory;
import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;


/**
* A {@link SharedResourceFactory} to share a job level {@link LineageInfo} instance
*/
public class LineageInfoFactory implements SharedResourceFactory<LineageInfo, EmptyKey, GobblinScopeTypes> {
public static final String FACTORY_NAME = "lineageInfo";

@Override
public String getName() {
return FACTORY_NAME;
}

@Override
public SharedResourceFactoryResponse<LineageInfo> createResource(SharedResourcesBroker<GobblinScopeTypes> broker,
ScopedConfigView<GobblinScopeTypes, EmptyKey> config)
throws NotConfiguredException {
return new ResourceInstance<>(new LineageInfo(config.getConfig()));
}

@Override
public GobblinScopeTypes getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker, ConfigView<GobblinScopeTypes, EmptyKey> config) {
return GobblinScopeTypes.JOB;
}
}

0 comments on commit 6719050

Please sign in to comment.