Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericRecord;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -87,12 +87,12 @@ public TestRawTripPayload preCombine(TestRawTripPayload another) {
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException {
public Option<GenericRecord> combineAndGetUpdateValue(GenericRecord oldRec, Schema schema) throws IOException {
return this.getInsertValue(schema);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
public Option<GenericRecord> getInsertValue(Schema schema) throws IOException {
if (isDeleted) {
return Option.empty();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericRecord;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -55,12 +55,12 @@ public HoodieJsonPayload preCombine(HoodieJsonPayload another) {
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException {
public Option<GenericRecord> combineAndGetUpdateValue(GenericRecord oldRec, Schema schema) throws IOException {
return getInsertValue(schema);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
public Option<GenericRecord> getInsertValue(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return Option.of(jsonConverter.convert(getJsonData(), schema));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

/**
* Empty payload used for deletions.
Expand All @@ -41,12 +40,12 @@ public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) {
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
public Option<GenericRecord> combineAndGetUpdateValue(GenericRecord currentValue, Schema schema) {
return Option.empty();
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) {
public Option<GenericRecord> getInsertValue(Schema schema) {
return Option.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

Expand Down Expand Up @@ -56,12 +55,12 @@ public HoodieAvroPayload preCombine(HoodieAvroPayload another) {
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
public Option<GenericRecord> combineAndGetUpdateValue(GenericRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
public Option<GenericRecord> getInsertValue(Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -50,14 +50,14 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri
* @param schema Schema used for record
* @return new combined/merged value to be written back to storage. EMPTY to skip writing this record.
*/
Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException;
Option<GenericRecord> combineAndGetUpdateValue(GenericRecord currentValue, Schema schema) throws IOException;

/**
* Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a
* new value for the given HoodieKey, wherein there is no existing record in storage to be combined against. (i.e
* insert) Return EMPTY to skip writing this record.
*/
Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;
Option<GenericRecord> getInsertValue(Schema schema) throws IOException;

/**
* This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

Expand Down Expand Up @@ -58,14 +57,14 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
public Option<GenericRecord> combineAndGetUpdateValue(GenericRecord currentValue, Schema schema) throws IOException {

Option<IndexedRecord> recordOption = getInsertValue(schema);
Option<GenericRecord> recordOption = getInsertValue(schema);
if (!recordOption.isPresent()) {
return Option.empty();
}

GenericRecord genericRecord = (GenericRecord) recordOption.get();
GenericRecord genericRecord = recordOption.get();
// combining strategy here trivially ignores currentValue on disk and writes this record
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
Expand All @@ -76,7 +75,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
public Option<GenericRecord> getInsertValue(Schema schema) throws IOException {
return recordBytes.length == 0 ? Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

Expand Down Expand Up @@ -54,12 +53,12 @@ public HoodieRecordPayload preCombine(HoodieRecordPayload another) {
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
public Option<GenericRecord> combineAndGetUpdateValue(GenericRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
public Option<GenericRecord> getInsertValue(Schema schema) throws IOException {
return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

Expand Down Expand Up @@ -54,14 +53,13 @@ public AWSDmsAvroPayload(Option<GenericRecord> record) {
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException {
IndexedRecord insertValue = getInsertValue(schema).get();
boolean delete = false;
if (insertValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertValue;
delete = record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D");
}
public Option<GenericRecord> combineAndGetUpdateValue(
GenericRecord currentValue,
Schema schema
) throws IOException {
final GenericRecord insertValue = getInsertValue(schema).get();
final boolean delete = insertValue.get(OP_FIELD) != null
&& insertValue.get(OP_FIELD).toString().equalsIgnoreCase("D");

return delete ? Option.empty() : Option.of(insertValue);
}
Expand Down