Skip to content

Commit

Permalink
Add nested fields support for MOR tables
Browse files Browse the repository at this point in the history
  • Loading branch information
jianxu authored and vinothchandar committed Aug 16, 2017
1 parent 6a3c94a commit b1cf097
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,9 @@ public static List<IndexedRecord> generateEvolvedTestRecords(int from, int limit
throws IOException, URISyntaxException {
return toRecords(getSimpleSchema(), getEvolvedSchema(), from, limit);
}

public static Schema getComplexEvolvedSchema() throws IOException {
return new Schema.Parser()
.parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2016,2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonMethod;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@JsonIgnoreProperties(ignoreUnknown = true)
@SuppressWarnings({"unused", "FieldCanBeLocal", "MismatchedQueryAndUpdateOfCollection"})
public class TestRecord implements Serializable {
class TestMapItemRecord implements Serializable {
private String item1;
private String item2;

TestMapItemRecord(String item1, String item2) {
this.item1 = item1;
this.item2 = item2;
}
}

class TestNestedRecord implements Serializable {
private boolean isAdmin;
private String userId;

TestNestedRecord(boolean isAdmin, String userId) {
this.isAdmin = isAdmin;
this.userId = userId;
}
}

private String _hoodie_commit_time;
private String _hoodie_record_key;
private String _hoodie_partition_path;
private String _hoodie_file_name;
private String _hoodie_commit_seqno;

private String field1;
private String field2;
private String name;
private Integer favoriteIntNumber;
private Long favoriteNumber;
private Float favoriteFloatNumber;
private Double favoriteDoubleNumber;
private Map<String, TestMapItemRecord> tags;
private TestNestedRecord testNestedRecord;
private String[] stringArray;

public TestRecord(String commitTime, int recordNumber, String fileId) {
this._hoodie_commit_time = commitTime;
this._hoodie_record_key = "key" + recordNumber;
this._hoodie_partition_path = commitTime;
this._hoodie_file_name = fileId;
this._hoodie_commit_seqno = commitTime + recordNumber;

String commitTimeSuffix = "@" + commitTime;
int commitHashCode = commitTime.hashCode();

this.field1 = "field" + recordNumber;
this.field2 = "field" + recordNumber + commitTimeSuffix;
this.name = "name" + recordNumber;
this.favoriteIntNumber = recordNumber + commitHashCode;
this.favoriteNumber = (long)(recordNumber + commitHashCode);
this.favoriteFloatNumber = (float)((recordNumber + commitHashCode) / 1024.0);
this.favoriteDoubleNumber = (recordNumber + commitHashCode) / 1024.0;
this.tags = new HashMap<>();
this.tags.put("mapItem1", new TestMapItemRecord("item" + recordNumber, "item" + recordNumber + commitTimeSuffix));
this.tags.put("mapItem2", new TestMapItemRecord("item2" + recordNumber, "item2" + recordNumber + commitTimeSuffix));
this.testNestedRecord = new TestNestedRecord(false, "UserId" + recordNumber + commitTimeSuffix);
this.stringArray = new String[]{"stringArray0" + commitTimeSuffix, "stringArray1" + commitTimeSuffix};
}

public String toJsonString() throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this);
}
}
17 changes: 17 additions & 0 deletions hoodie-common/src/test/resources/complex-test-evolved.avro
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "field1", "type": ["null", "string"], "default": null},
{"name": "field2", "type": ["null", "string"], "default": null},
{"name": "name", "type": ["null", "string"], "default": null},
{"name": "favoriteIntNumber", "type": ["null", "int"], "default": null},
{"name": "favoriteNumber", "type": ["null", "long"], "default": null},
{"name": "favoriteFloatNumber", "type": ["null", "float"], "default": null},
{"name": "favoriteDoubleNumber", "type": ["null", "double"], "default": null},
{"name": "tags", "type": ["null", {"values": ["null", {"fields": [{"default": null, "type": ["null", "string"], "name": "item1"}, {"default": null, "type": ["null", "string"], "name": "item2"} ], "type": "record", "name": "tagsMapItems"} ], "type": "map"} ], "default": null},
{"default": null, "name": "testNestedRecord", "type": ["null", {"fields": [{"default": null, "name": "isAdmin", "type": ["null", "boolean"] }, {"default": null, "name": "userId", "type": ["null", "string"] } ], "name": "notes", "type": "record"}]},
{"default": null, "name": "stringArray", "type": ["null", {"items": "string", "type": "array"}]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;
Expand Down Expand Up @@ -250,15 +252,25 @@ public static Writable avroToArrayWritable(Object value, Schema schema) {
case ENUM:
return new Text(value.toString());
case ARRAY:
Writable[] values2 = new Writable[schema.getFields().size()];
GenericArray arrayValue = (GenericArray) value;
Writable[] values2 = new Writable[arrayValue.size()];
int index2 = 0;
for (Object obj : (GenericArray) value) {
for (Object obj : arrayValue) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
}
return new ArrayWritable(Writable.class, values2);
case MAP:
// TODO(vc): Need to add support for complex types
return NullWritable.get();
Map mapValue = (Map) value;
Writable[] values3 = new Writable[mapValue.size()];
int index3 = 0;
for (Object entry : mapValue.entrySet()) {
Map.Entry mapEntry = (Map.Entry) entry;
Writable[] mapValues = new Writable[2];
mapValues[0] = new Text(mapEntry.getKey().toString());
mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
values3[index3++] = new ArrayWritable(Writable.class, mapValues);
}
return new ArrayWritable(Writable.class, values3);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.uber.hoodie.hadoop;

import com.uber.hoodie.avro.MercifulJsonConverter;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TestRecord;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -115,10 +117,10 @@ public static File prepareParquetDataset(TemporaryFolder basePath, Schema schema

}

private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) {
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
for(int i=0;i<numberOfRecords;i++) {
records.add(generateAvroRecord(schema, i, commitTime, fileId));
records.add(generateAvroRecordFromJson(schema, i, commitTime, fileId));
}
return records;
}
Expand All @@ -136,6 +138,13 @@ public static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
return record;
}

public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
String commitTime, String fileId) throws IOException {
TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
MercifulJsonConverter converter = new MercifulJsonConverter(schema);
return converter.convert(record.toJsonString());
}

public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
int totalNumberOfRecords, int numberOfRecordsToUpdate,
String newCommit) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
Expand Down Expand Up @@ -73,7 +78,7 @@ private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, St
.overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build();
List<IndexedRecord> records = new ArrayList<>();
for(int i=0; i < numberOfRecords; i++) {
records.add(InputFormatTestUtil.generateAvroRecord(schema, i, newCommit, "fileid0"));
records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
}
Schema writeSchema = records.get(0).getSchema();
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema);
Expand Down Expand Up @@ -134,4 +139,119 @@ public void testReader() throws Exception {
}
}

@Test
public void testReaderWithNestedAndComplexSchema() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema());
HoodieTestUtils.initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
int numberOfRecords = 100;
int numberOfLogRecords = numberOfRecords / 2;
File partitionDir = InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, numberOfRecords, commitTime);
InputFormatTestUtil.commit(basePath, commitTime);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

// update files or generate new log file
String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, numberOfLogRecords);
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);

//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime);

//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
new MapredParquetInputFormat().
getRecordReader(new FileSplit(split.getPath(), 0,
FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();

String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");

// validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);

// use reader to read base Parquet File and log file, merge in flight and return latest commit
// here the first 50 records should be updated, see above
Void key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsRead = 0;
while (recordReader.next(key, value)) {
int currentRecordNo = numRecordsRead;
++numRecordsRead;
Writable[] values = value.get();
String recordCommitTime;
//check if the record written is with latest commit, here "101"
if (numRecordsRead > numberOfLogRecords) {
recordCommitTime = commitTime;
} else {
recordCommitTime = newCommitTime;
}
String recordCommitTimeSuffix = "@" + recordCommitTime;

Assert.assertEquals(values[0].toString(), recordCommitTime);
key = recordReader.createKey();
value = recordReader.createValue();

// Assert type STRING
Assert.assertEquals("test value for field: field1", values[5].toString(), "field" + currentRecordNo);
Assert.assertEquals("test value for field: field2",values[6].toString(), "field" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: name", values[7].toString(), "name" + currentRecordNo);

// Assert type INT
IntWritable intWritable = (IntWritable)values[8];
Assert.assertEquals("test value for field: favoriteIntNumber", intWritable.get(), currentRecordNo + recordCommitTime.hashCode());

// Assert type LONG
LongWritable longWritable = (LongWritable)values[9];
Assert.assertEquals("test value for field: favoriteNumber", longWritable.get(), currentRecordNo + recordCommitTime.hashCode());

// Assert type FLOAT
FloatWritable floatWritable = (FloatWritable)values[10];
Assert.assertEquals("test value for field: favoriteFloatNumber", floatWritable.get(), (float)((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0);

// Assert type DOUBLE
DoubleWritable doubleWritable = (DoubleWritable)values[11];
Assert.assertEquals("test value for field: favoriteDoubleNumber", doubleWritable.get(), (currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0);

// Assert type MAP
ArrayWritable mapItem = (ArrayWritable)values[12];
Writable[] mapItemValues = mapItem.get();
ArrayWritable mapItemValue1 = (ArrayWritable)mapItemValues[0];
ArrayWritable mapItemValue2 = (ArrayWritable)mapItemValues[1];
Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(), "mapItem1");
Assert.assertEquals("test value for field: tags", mapItemValue2.get()[0].toString(), "mapItem2");
ArrayWritable mapItemValue1value = (ArrayWritable)mapItemValue1.get()[1];
ArrayWritable mapItemValue2value = (ArrayWritable)mapItemValue2.get()[1];
Assert.assertEquals("test value for field: tags", mapItemValue1value.get().length, 2);
Assert.assertEquals("test value for field: tags", mapItemValue2value.get().length, 2);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1", mapItemValue1value.get()[0].toString(), "item" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1", mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2", mapItemValue1value.get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2", mapItemValue2value.get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix);

// Assert type RECORD
ArrayWritable recordItem = (ArrayWritable)values[13];
Writable[] nestedRecord = recordItem.get();
Assert.assertEquals("test value for field: testNestedRecord.isAdmin", ((BooleanWritable)nestedRecord[0]).get(), false);
Assert.assertEquals("test value for field: testNestedRecord.userId", nestedRecord[1].toString(), "UserId" + currentRecordNo + recordCommitTimeSuffix);

// Assert type ARRAY
ArrayWritable arrayValue = (ArrayWritable)values[14];
Writable[] arrayValues = arrayValue.get();
for (int i = 0; i < arrayValues.length; i++) {
Assert.assertEquals("test value for field: stringArray", arrayValues[i].toString(), "stringArray" + i + recordCommitTimeSuffix);
}
}
}
}

0 comments on commit b1cf097

Please sign in to comment.