Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nested fields support for MOR tables #234

Merged
merged 1 commit into from
Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,9 @@ public static List<IndexedRecord> generateEvolvedTestRecords(int from, int limit
throws IOException, URISyntaxException {
return toRecords(getSimpleSchema(), getEvolvedSchema(), from, limit);
}

public static Schema getComplexEvolvedSchema() throws IOException {
return new Schema.Parser()
.parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2016,2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonMethod;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@JsonIgnoreProperties(ignoreUnknown = true)
@SuppressWarnings({"unused", "FieldCanBeLocal", "MismatchedQueryAndUpdateOfCollection"})
public class TestRecord implements Serializable {
class TestMapItemRecord implements Serializable {
private String item1;
private String item2;

TestMapItemRecord(String item1, String item2) {
this.item1 = item1;
this.item2 = item2;
}
}

class TestNestedRecord implements Serializable {
private boolean isAdmin;
private String userId;

TestNestedRecord(boolean isAdmin, String userId) {
this.isAdmin = isAdmin;
this.userId = userId;
}
}

private String _hoodie_commit_time;
private String _hoodie_record_key;
private String _hoodie_partition_path;
private String _hoodie_file_name;
private String _hoodie_commit_seqno;

private String field1;
private String field2;
private String name;
private Integer favoriteIntNumber;
private Long favoriteNumber;
private Float favoriteFloatNumber;
private Double favoriteDoubleNumber;
private Map<String, TestMapItemRecord> tags;
private TestNestedRecord testNestedRecord;
private String[] stringArray;

public TestRecord(String commitTime, int recordNumber, String fileId) {
this._hoodie_commit_time = commitTime;
this._hoodie_record_key = "key" + recordNumber;
this._hoodie_partition_path = commitTime;
this._hoodie_file_name = fileId;
this._hoodie_commit_seqno = commitTime + recordNumber;

String commitTimeSuffix = "@" + commitTime;
int commitHashCode = commitTime.hashCode();

this.field1 = "field" + recordNumber;
this.field2 = "field" + recordNumber + commitTimeSuffix;
this.name = "name" + recordNumber;
this.favoriteIntNumber = recordNumber + commitHashCode;
this.favoriteNumber = (long)(recordNumber + commitHashCode);
this.favoriteFloatNumber = (float)((recordNumber + commitHashCode) / 1024.0);
this.favoriteDoubleNumber = (recordNumber + commitHashCode) / 1024.0;
this.tags = new HashMap<>();
this.tags.put("mapItem1", new TestMapItemRecord("item" + recordNumber, "item" + recordNumber + commitTimeSuffix));
this.tags.put("mapItem2", new TestMapItemRecord("item2" + recordNumber, "item2" + recordNumber + commitTimeSuffix));
this.testNestedRecord = new TestNestedRecord(false, "UserId" + recordNumber + commitTimeSuffix);
this.stringArray = new String[]{"stringArray0" + commitTimeSuffix, "stringArray1" + commitTimeSuffix};
}

public String toJsonString() throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this);
}
}
17 changes: 17 additions & 0 deletions hoodie-common/src/test/resources/complex-test-evolved.avro
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "field1", "type": ["null", "string"], "default": null},
{"name": "field2", "type": ["null", "string"], "default": null},
{"name": "name", "type": ["null", "string"], "default": null},
{"name": "favoriteIntNumber", "type": ["null", "int"], "default": null},
{"name": "favoriteNumber", "type": ["null", "long"], "default": null},
{"name": "favoriteFloatNumber", "type": ["null", "float"], "default": null},
{"name": "favoriteDoubleNumber", "type": ["null", "double"], "default": null},
{"name": "tags", "type": ["null", {"values": ["null", {"fields": [{"default": null, "type": ["null", "string"], "name": "item1"}, {"default": null, "type": ["null", "string"], "name": "item2"} ], "type": "record", "name": "tagsMapItems"} ], "type": "map"} ], "default": null},
{"default": null, "name": "testNestedRecord", "type": ["null", {"fields": [{"default": null, "name": "isAdmin", "type": ["null", "boolean"] }, {"default": null, "name": "userId", "type": ["null", "string"] } ], "name": "notes", "type": "record"}]},
{"default": null, "name": "stringArray", "type": ["null", {"items": "string", "type": "array"}]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;
Expand Down Expand Up @@ -250,15 +252,25 @@ public static Writable avroToArrayWritable(Object value, Schema schema) {
case ENUM:
return new Text(value.toString());
case ARRAY:
Writable[] values2 = new Writable[schema.getFields().size()];
GenericArray arrayValue = (GenericArray) value;
Writable[] values2 = new Writable[arrayValue.size()];
int index2 = 0;
for (Object obj : (GenericArray) value) {
for (Object obj : arrayValue) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
}
return new ArrayWritable(Writable.class, values2);
case MAP:
// TODO(vc): Need to add support for complex types
return NullWritable.get();
Map mapValue = (Map) value;
Writable[] values3 = new Writable[mapValue.size()];
int index3 = 0;
for (Object entry : mapValue.entrySet()) {
Map.Entry mapEntry = (Map.Entry) entry;
Writable[] mapValues = new Writable[2];
mapValues[0] = new Text(mapEntry.getKey().toString());
mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
values3[index3++] = new ArrayWritable(Writable.class, mapValues);
}
return new ArrayWritable(Writable.class, values3);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.uber.hoodie.hadoop;

import com.uber.hoodie.avro.MercifulJsonConverter;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TestRecord;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -115,10 +117,10 @@ public static File prepareParquetDataset(TemporaryFolder basePath, Schema schema

}

private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) {
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
for(int i=0;i<numberOfRecords;i++) {
records.add(generateAvroRecord(schema, i, commitTime, fileId));
records.add(generateAvroRecordFromJson(schema, i, commitTime, fileId));
}
return records;
}
Expand All @@ -136,6 +138,13 @@ public static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
return record;
}

public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
String commitTime, String fileId) throws IOException {
TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
MercifulJsonConverter converter = new MercifulJsonConverter(schema);
return converter.convert(record.toJsonString());
}

public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
int totalNumberOfRecords, int numberOfRecordsToUpdate,
String newCommit) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
Expand Down Expand Up @@ -73,7 +78,7 @@ private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, St
.overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build();
List<IndexedRecord> records = new ArrayList<>();
for(int i=0; i < numberOfRecords; i++) {
records.add(InputFormatTestUtil.generateAvroRecord(schema, i, newCommit, "fileid0"));
records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
}
Schema writeSchema = records.get(0).getSchema();
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema);
Expand Down Expand Up @@ -134,4 +139,119 @@ public void testReader() throws Exception {
}
}

@Test
public void testReaderWithNestedAndComplexSchema() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema());
HoodieTestUtils.initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
int numberOfRecords = 100;
int numberOfLogRecords = numberOfRecords / 2;
File partitionDir = InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, numberOfRecords, commitTime);
InputFormatTestUtil.commit(basePath, commitTime);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

// update files or generate new log file
String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, numberOfLogRecords);
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);

//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime);

//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
new MapredParquetInputFormat().
getRecordReader(new FileSplit(split.getPath(), 0,
FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();

String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");

// validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);

// use reader to read base Parquet File and log file, merge in flight and return latest commit
// here the first 50 records should be updated, see above
Void key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsRead = 0;
while (recordReader.next(key, value)) {
int currentRecordNo = numRecordsRead;
++numRecordsRead;
Writable[] values = value.get();
String recordCommitTime;
//check if the record written is with latest commit, here "101"
if (numRecordsRead > numberOfLogRecords) {
recordCommitTime = commitTime;
} else {
recordCommitTime = newCommitTime;
}
String recordCommitTimeSuffix = "@" + recordCommitTime;

Assert.assertEquals(values[0].toString(), recordCommitTime);
key = recordReader.createKey();
value = recordReader.createValue();

// Assert type STRING
Assert.assertEquals("test value for field: field1", values[5].toString(), "field" + currentRecordNo);
Assert.assertEquals("test value for field: field2",values[6].toString(), "field" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: name", values[7].toString(), "name" + currentRecordNo);

// Assert type INT
IntWritable intWritable = (IntWritable)values[8];
Assert.assertEquals("test value for field: favoriteIntNumber", intWritable.get(), currentRecordNo + recordCommitTime.hashCode());

// Assert type LONG
LongWritable longWritable = (LongWritable)values[9];
Assert.assertEquals("test value for field: favoriteNumber", longWritable.get(), currentRecordNo + recordCommitTime.hashCode());

// Assert type FLOAT
FloatWritable floatWritable = (FloatWritable)values[10];
Assert.assertEquals("test value for field: favoriteFloatNumber", floatWritable.get(), (float)((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0);

// Assert type DOUBLE
DoubleWritable doubleWritable = (DoubleWritable)values[11];
Assert.assertEquals("test value for field: favoriteDoubleNumber", doubleWritable.get(), (currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0);

// Assert type MAP
ArrayWritable mapItem = (ArrayWritable)values[12];
Writable[] mapItemValues = mapItem.get();
ArrayWritable mapItemValue1 = (ArrayWritable)mapItemValues[0];
ArrayWritable mapItemValue2 = (ArrayWritable)mapItemValues[1];
Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(), "mapItem1");
Assert.assertEquals("test value for field: tags", mapItemValue2.get()[0].toString(), "mapItem2");
ArrayWritable mapItemValue1value = (ArrayWritable)mapItemValue1.get()[1];
ArrayWritable mapItemValue2value = (ArrayWritable)mapItemValue2.get()[1];
Assert.assertEquals("test value for field: tags", mapItemValue1value.get().length, 2);
Assert.assertEquals("test value for field: tags", mapItemValue2value.get().length, 2);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1", mapItemValue1value.get()[0].toString(), "item" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1", mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2", mapItemValue1value.get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2", mapItemValue2value.get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix);

// Assert type RECORD
ArrayWritable recordItem = (ArrayWritable)values[13];
Writable[] nestedRecord = recordItem.get();
Assert.assertEquals("test value for field: testNestedRecord.isAdmin", ((BooleanWritable)nestedRecord[0]).get(), false);
Assert.assertEquals("test value for field: testNestedRecord.userId", nestedRecord[1].toString(), "UserId" + currentRecordNo + recordCommitTimeSuffix);

// Assert type ARRAY
ArrayWritable arrayValue = (ArrayWritable)values[14];
Writable[] arrayValues = arrayValue.get();
for (int i = 0; i < arrayValues.length; i++) {
Assert.assertEquals("test value for field: stringArray", arrayValues[i].toString(), "stringArray" + i + recordCommitTimeSuffix);
}
}
}
}