Skip to content

Commit

Permalink
[GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted…
Browse files Browse the repository at this point in the history
… from different cluster (#3331)

* [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster

* add unit test

* address comments
  • Loading branch information
ZihanLi58 committed Jul 19, 2021
1 parent 07e76fc commit 08db23e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
public abstract class GobblinMCEProducer implements Closeable {

public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
public static final String GMCE_CLUSTER_NAME = "GobblinMCE.cluster.name";
public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
private static final String DATASET_ORIGIN_KEY = "dataset.origin";
Expand Down Expand Up @@ -112,7 +113,7 @@ private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceB
.setDataOrigin(DataOrigin.valueOf(origin))
.setNativeName(state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR))
.build());
gmceBuilder.setCluster(ClustersNames.getInstance().getClusterName());
gmceBuilder.setCluster(state.getProp(GMCE_CLUSTER_NAME, ClustersNames.getInstance().getClusterName()));
//retention job does not have job.id
gmceBuilder.setFlowId(
state.getProp(AbstractJob.JOB_ID, new Configuration().get(ConfigurationKeys.AZKABAN_FLOW_ID)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,6 +82,7 @@
public class GobblinMCEWriter implements DataWriter<GenericRecord> {
public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
public static final String FORCE_HIVE_DATABASE_NAME = "force.hive.database.name";
public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
public static final String METADATA_REGISTRATION_THREADS = "metadata.registration.threads";
public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = "metadata.parallel.runner.timeout.mills";
public static final String HIVE_PARTITION_NAME = "hive.partition.name";
Expand All @@ -91,6 +93,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
List<MetadataWriter> metadataWriters;
Map<String, OperationType> tableOperationTypeMap;
Map<String, OperationType> datasetOperationTypeMap;
Set<String> acceptedClusters;
protected State state;
private final ParallelRunner parallelRunner;
private int parallelRunnerTimeoutMills;
Expand All @@ -103,6 +106,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
newSpecsMaps = new HashMap<>();
oldSpecsMaps = new HashMap<>();
metadataWriters = new ArrayList<>();
acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
state = properties;
for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) {
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state)));
Expand Down Expand Up @@ -183,8 +187,8 @@ public Descriptor getDataDescriptor() {
@Override
public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws IOException {
GenericRecord genericRecord = recordEnvelope.getRecord();
//filter out the events that not for this cluster
if (!genericRecord.get("cluster").equals(ClustersNames.getInstance().getClusterName())) {
//filter out the events that not emitted by accepted clusters
if (!acceptedClusters.contains(genericRecord.get("cluster"))) {
return;
}
// Use schema from record to avoid issue when schema evolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
private String dbName = "hivedb";

private GobblinMCEWriter gobblinMCEWriter;
private GobblinMCEWriter gobblinMCEWriterWithAcceptClusters;

GobblinMetadataChangeEvent gmce;
static File tmpDir;
Expand All @@ -96,6 +97,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
@AfterClass
public void clean() throws Exception {
gobblinMCEWriter.close();
gobblinMCEWriterWithAcceptClusters.close();
FileUtils.forceDeleteOnExit(tmpDir);
}
@BeforeClass
Expand Down Expand Up @@ -142,6 +144,8 @@ public void setUp() throws Exception {
TestHiveRegistrationPolicyForIceberg.class.getName());
state.setProp("use.data.path.as.table.location", true);
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
HiveMetastoreTest.catalog);
_avroPartitionSchema =
Expand All @@ -150,6 +154,12 @@ public void setUp() throws Exception {

@Test ( priority = 0 )
public void testWriteAddFileGMCE() throws IOException {
gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(10L))));
//Test when accept clusters does not contain the gmce cluster, we will skip
Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 0);
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
Expand Down

0 comments on commit 08db23e

Please sign in to comment.