Skip to content
Permalink
Browse files
[GOBBLIN-1657] Update completion watermark on change_property in Iceb…
…ergMetadataWriter (#3517)

* [GOBBLIN-1655] Update completion watermark for quiet tables during iceberg registration

* [GOBBLIN-1657] Update completion watermark on change_proerty GMCE

* Added test case to check watermark update on change_property

Co-authored-by: Vikram Bohra <vbohra@vbohra-mn1.linkedin.biz>
  • Loading branch information
vikrambohra and Vikram Bohra committed Jun 7, 2022
1 parent e814aaf commit ef18c482b241378f8912fa1944914167a61f147c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 34 deletions.
@@ -45,6 +45,8 @@ public class KafkaAuditCountVerifier {
public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + "reference.tiers";
public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
private static final double DEFAULT_THRESHOLD = 0.999;
public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX + "complete.on.no.counts";
private final boolean returnCompleteOnNoCounts;

private final AuditCountClient auditCountClient;
private final String srcTier;
@@ -67,6 +69,7 @@ public KafkaAuditCountVerifier(State state, AuditCountClient client) {
state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
this.srcTier = state.getProp(SOURCE_TIER);
this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
this.returnCompleteOnNoCounts = state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
}

/**
@@ -119,6 +122,10 @@ private double getCompletenessPercentage(String datasetName, long beginInMillis,
Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
log.info(String.format("Found empty counts map for %s, returning complete", datasetName));
return 1.0;
}
double percent = -1;
if (!countsByTier.containsKey(this.srcTier)) {
throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, beginInMillis, endInMillis));
@@ -98,7 +98,10 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
// There'll be only one dummy file here. This file is parsed for DB and table name calculation.
newFiles = computeDummyFile(state);
if (!newFiles.isEmpty()) {
log.info("Dummy file: " + newFiles.keySet().iterator().next());
this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
} else {
log.info("No dummy file created. Not sending GMCE");
}
} else {
this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
@@ -312,6 +312,11 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
return;
}
}
if(tableMetadata.completenessEnabled) {
tableMetadata.completionWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
}

computeCandidateSchema(gmce, tid, tableSpec);
tableMetadata.ensureTxnInit();
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
@@ -322,12 +327,6 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
//compute topic name
if(!tableMetadata.newProperties.get().containsKey(TOPIC_NAME_KEY) &&
tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) {
String topicPartition = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topicPartition.substring(0, topicPartition.lastIndexOf("-")));
}
break;
}
case rewrite_files: {
@@ -411,6 +410,9 @@ private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
String nativeName = tableMetadata.datasetName;
String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topic);
}

/**
@@ -692,8 +694,6 @@ private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata ta
StructLike partition = getIcebergPartitionVal(hiveSpecs, file.getFilePath(), partitionSpec);

if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType() == OperationType.add_files) {
tableMetadata.prevCompletenessWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
// Assumes first partition value to be partitioned by date
// TODO Find better way to determine a partition value
String datepartition = partition.get(0, null);
@@ -722,8 +722,7 @@ private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata ta
private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) {
table = addPartitionToIcebergTable(table, newPartitionColumn, newPartitionColumnType);
PartitionSpec partitionSpec = table.spec();
long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, prevCompletenessWatermark);
int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, tableMetadata.completionWatermark);
List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
partitionValues.add(String.valueOf(late));
return IcebergUtils.getPartition(partitionSpec.partitionType(), partitionValues);
@@ -790,28 +789,33 @@ public void flush(String dbName, String tableName) throws IOException {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
String topicName = props.get(TOPIC_NAME_KEY);
if(topicName == null) {
log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s.%s",
TOPIC_NAME_KEY, dbName, tableName));
} else {
long newCompletenessWatermark =
computeCompletenessWatermark(topicName, tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
if(newCompletenessWatermark > tableMetadata.prevCompletenessWatermark) {
log.info(String.format("Updating %s for %s.%s to %s", COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
tableMetadata.newCompletenessWatermark = newCompletenessWatermark;
}
}
checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
log.info(String.format("Checking kafka audit for %s on change_property ", topic));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime prevWatermarkDT =
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props);
} else {
log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
tableMetadata.completionWatermark, topic));
}
}

//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
@@ -835,7 +839,6 @@ public void flush(String dbName, String tableName) throws IOException {
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);

//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
@@ -850,7 +853,7 @@ public void flush(String dbName, String tableName) throws IOException {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);

//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark, tableMetadata.newCompletenessWatermark);
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
} else {
log.info("There's no transaction initiated for the table {}", tid.toString());
@@ -869,6 +872,30 @@ public void reset(String dbName, String tableName) throws IOException {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}

/**
* Update TableMetadata with the new completion watermark upon a successful audit check
* @param tableMetadata metadata of table
* @param topic topic name
* @param timestamps Sorted set in reverse order of timestamps to check audit counts for
* @param props table properties map
*/
private void checkAndUpdateCompletenessWatermark(TableMetadata tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
Map<String, String> props) {
if (topic == null) {
log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s",
TOPIC_NAME_KEY, tableMetadata.table.get().name()));
}
long newCompletenessWatermark =
computeCompletenessWatermark(topic, timestamps, tableMetadata.completionWatermark);
if (newCompletenessWatermark > tableMetadata.completionWatermark) {
log.info(String.format("Updating %s for %s to %s", COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
newCompletenessWatermark));
props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
tableMetadata.completionWatermark = newCompletenessWatermark;
}
}

/**
* NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit counts match
* for that window (aka its is set to the beginning of next window)
@@ -1085,8 +1112,7 @@ private class TableMetadata {
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
long prevCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());

@Setter
@@ -1131,7 +1157,7 @@ void ensureTxnInit() {
}
}

void reset(Map<String, String> props, Long lowWaterMark, long newCompletionWatermark) {
void reset(Map<String, String> props, Long lowWaterMark) {
this.lastProperties = Optional.of(props);
this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
this.transaction = Optional.absent();
@@ -1148,8 +1174,6 @@ void reset(Map<String, String> props, Long lowWaterMark, long newCompletionWater
this.newProperties = Optional.absent();
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
this.lowWatermark = Optional.of(lowWaterMark);
this.prevCompletenessWatermark = newCompletionWatermark;
this.newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
this.datePartitions.clear();
}
}
@@ -423,12 +423,12 @@ public void testWriteAddFileGMCECompleteness() throws IOException {

// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
Mockito.when(verifier.isComplete("testTopic", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
//completeness watermark = "2020-09-16-10"
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));

@@ -475,7 +475,7 @@ public void testWriteAddFileGMCECompleteness() throws IOException {
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));

Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis1));
@@ -486,6 +486,41 @@ public void testWriteAddFileGMCECompleteness() throws IOException {

}

@Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, groups={"icebergMetadataWriterTest"})
public void testChangePropertyGMCECompleteness() throws IOException {

Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
gmce.setOldFilePrefixes(null);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
.setFilePath(hourlyFile2.toString())
.setFileFormat("avro")
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
gmce.setOperationType(OperationType.change_property);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "6000-7000").build());
GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(65L))));

KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
Mockito.when(verifier.isComplete("testIcebergTable", watermark, expectedWatermark)).thenReturn(true);
((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();

table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000");
Assert.assertEquals(table.spec().fields().get(1).name(), "late");
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(expectedWatermark));

}

private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);

0 comments on commit ef18c48

Please sign in to comment.