Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hoodie Event callbacks #251

Merged
merged 1 commit into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 30 additions & 10 deletions hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Status of a write operation.
Expand All @@ -47,16 +49,34 @@ public class WriteStatus implements Serializable {
private long totalRecords = 0;
private long totalErrorRecords = 0;

public void markSuccess(HoodieRecord record) {
writtenRecords.add(record);
totalRecords++;
}

public void markFailure(HoodieRecord record, Throwable t) {
failedRecords.add(record);
errors.put(record.getKey(), t);
totalRecords++;
totalErrorRecords++;
/**
* Mark write as success, optionally using given parameters for the purpose of calculating
* some aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus
* objects are collected in Spark Driver.
*
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained in {@link HoodieRecord} before deflation.
*/
public void markSuccess(HoodieRecord record,
Optional<Map<String, String>> optionalRecordMetadata) {
writtenRecords.add(record);
totalRecords++;
}

/**
* Mark write as failed, optionally using given parameters for the purpose of calculating
* some aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus
* objects are collected in Spark Driver.
*
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained in {@link HoodieRecord} before deflation.
*/
public void markFailure(HoodieRecord record, Throwable t,
Optional<Map<String, String>> optionalRecordMetadata) {
failedRecords.add(record);
errors.put(record.getKey(), t);
totalRecords++;
totalErrorRecords++;
}

public String getFileId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


import com.google.common.base.Preconditions;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.index.HoodieIndex;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();

private HoodieWriteConfig(Properties props) {
super(props);
Expand Down Expand Up @@ -106,6 +109,10 @@ public StorageLevel getWriteStatusStorageLevel() {
return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL));
}

public String getWriteStatusClassName() {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}

/**
* compaction properties
**/
Expand Down Expand Up @@ -363,6 +370,11 @@ public Builder withAssumeDatePartitioning(boolean assumeDatePartitioning) {
return this;
}

public Builder withWriteStatusClass(Class<? extends WriteStatus> writeStatusClass) {
props.setProperty(HOODIE_WRITE_STATUS_CLASS_PROP, writeStatusClass.getName());
return this;
}

public HoodieWriteConfig build() {
HoodieWriteConfig config = new HoodieWriteConfig(props);
// Check for mandatory properties
Expand All @@ -383,6 +395,8 @@ public HoodieWriteConfig build() {
HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT);
setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);

// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieAppendException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -45,10 +45,8 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
* IO Operation to append data onto an existing file.
Expand All @@ -74,7 +72,7 @@ public HoodieAppendHandle(HoodieWriteConfig config,
String fileId,
Iterator<HoodieRecord<T>> recordItr) {
super(config, commitTime, hoodieTable);
WriteStatus writeStatus = new WriteStatus();
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you re going to rely on a constructor being called via Reflection, then making WriteStatus an interface is not a necessary thing.. Atleast in the subclassing route, you will be forcing the subclass to implement atleast 1 constructor of the same signature as base class

Was this part of @prazanna 's suggestion earlier on? (given then comment history is nt there now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar Yea, it was part of @prazanna's suggestion earlier. The rebase has hidden the comment. Ok, I'll retain WriteStatus like before as a class.

writeStatus.setStat(new HoodieDeltaWriteStat());
this.writeStatus = writeStatus;
this.fileId = fileId;
Expand Down Expand Up @@ -128,6 +126,7 @@ private void init(Iterator<HoodieRecord<T>> recordItr) {
}

private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
Optional recordMetadata = hoodieRecord.getData().getMetadata();
try {
Optional<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(schema);

Expand All @@ -145,11 +144,11 @@ private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
}

hoodieRecord.deflate();
writeStatus.markSuccess(hoodieRecord);
writeStatus.markSuccess(hoodieRecord, recordMetadata);
return avroRecord;
} catch (Exception e) {
logger.error("Error writing record " + hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieInsertException;
import com.uber.hoodie.io.storage.HoodieStorageWriter;
Expand Down Expand Up @@ -50,7 +51,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime,
HoodieTable<T> hoodieTable, String partitionPath) {
super(config, commitTime, hoodieTable);
this.status = new WriteStatus();
this.status = ReflectionUtils.loadClass(config.getWriteStatusClassName());
status.setFileId(UUID.randomUUID().toString());
status.setPartitionPath(partitionPath);

Expand Down Expand Up @@ -89,6 +90,7 @@ public boolean canWrite(HoodieRecord record) {
* @param record
*/
public void write(HoodieRecord record) {
Optional recordMetadata = record.getData().getMetadata();
try {
Optional<IndexedRecord> avroRecord = record.getData().getInsertValue(schema);

Expand All @@ -100,13 +102,12 @@ public void write(HoodieRecord record) {
} else {
recordsDeleted++;
}

record.deflate();
status.markSuccess(record);
status.markSuccess(record, recordMetadata);
} catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job
// for a single record
status.markFailure(record, t);
status.markFailure(record, t, recordMetadata);
logger.error("Error writing record " + record, t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.uber.hoodie.io;

import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
Expand Down Expand Up @@ -66,7 +67,7 @@ public HoodieMergeHandle(HoodieWriteConfig config,
* Load the new incoming records in a map, and extract the old file path.
*/
private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
WriteStatus writeStatus = new WriteStatus();
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
writeStatus.setStat(new HoodieWriteStat());
this.writeStatus = writeStatus;
this.keyToNewRecords = new HashMap<>();
Expand Down Expand Up @@ -129,6 +130,7 @@ private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {


private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Optional<IndexedRecord> indexedRecord) {
Optional recordMetadata = hoodieRecord.getData().getMetadata();
try {
if(indexedRecord.isPresent()) {
storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord);
Expand All @@ -139,11 +141,11 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Optional<Indexed
}

hoodieRecord.deflate();
writeStatus.markSuccess(hoodieRecord);
writeStatus.markSuccess(hoodieRecord, recordMetadata);
return true;
} catch (Exception e) {
logger.error("Error writing record "+ hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieMergeOnReadTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
Expand Down Expand Up @@ -224,7 +225,26 @@ public void testSimpleInsertAndUpdate() throws Exception {
assertEquals("Must contain 200 records", 200, readClient.readSince("000").count());
}

// Check if record level metadata is aggregated properly at the end of write.
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);

String newCommitTime = "001";
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
Map<String, String> allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus .mergeMetadataForWriteStatuses(statuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
// For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this should be 2 * records.size()
assertEquals(String.valueOf(2 * records.size()), allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
}

@Test
public void testSimpleInsertAndDelete() throws Exception {
HoodieWriteConfig cfg = getConfig();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.avro.MercifulJsonConverter;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;

import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -91,6 +96,15 @@ public String getPartitionPath() {
}
}

@Override
public Optional<Map<String, String>> getMetadata() {
// Let's assume we want to count the number of input row change events
// that are processed. Let the time-bucket for this row change event be 1506582000.
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("InputRecordCount_1506582000", "2");
return Optional.of(metadataMap);
}

public String getRowKey() {
return rowKey;
}
Expand Down Expand Up @@ -120,4 +134,58 @@ private String unCompressData(byte[] data) throws IOException {
IOUtils.copy(iis, sw);
return sw.toString();
}

/**
* A custom {@link WriteStatus} that merges passed metadata key value map
* to {@code WriteStatus.markSuccess()} and {@code WriteStatus.markFailure()}.
*/
public static class MetadataMergeWriteStatus extends WriteStatus {
private Map<String, String> mergedMetadataMap = new HashMap<>();

@Override
public void markSuccess(HoodieRecord record, Optional<Map<String, String>> recordMetadata) {
super.markSuccess(record, recordMetadata);
if(recordMetadata.isPresent()) {
mergeMetadataMaps(recordMetadata.get(), mergedMetadataMap);
}
}

@Override
public void markFailure(HoodieRecord record, Throwable t,
Optional<Map<String, String>> recordMetadata) {
super.markFailure(record, t, recordMetadata);
if(recordMetadata.isPresent()) {
mergeMetadataMaps(recordMetadata.get(), mergedMetadataMap);
}
}

public static Map<String, String> mergeMetadataForWriteStatuses(List<WriteStatus> writeStatuses) {
Map<String, String> allWriteStatusMergedMetadataMap = new HashMap<>();
for (WriteStatus writeStatus : writeStatuses) {
MetadataMergeWriteStatus.mergeMetadataMaps(
((MetadataMergeWriteStatus)writeStatus).getMergedMetadataMap(),
allWriteStatusMergedMetadataMap);
}
return allWriteStatusMergedMetadataMap;
}

private static void mergeMetadataMaps(Map<String, String> mergeFromMap, Map<String, String> mergeToMap) {
for (Entry<String, String> entry : mergeFromMap.entrySet()) {
String key = entry.getKey();
if(!mergeToMap.containsKey(key)) {
mergeToMap.put(key, "0");
}
mergeToMap
.put(key, addStrsAsInt(entry.getValue(), mergeToMap.get(key)));
}
}

private Map<String, String> getMergedMetadataMap() {
return mergedMetadataMap;
}

private static String addStrsAsInt(String a, String b) {
return String.valueOf(Integer.parseInt(a) + Integer.parseInt(b));
}
}
}