Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial pass at adding ORC to Iceberg. #12

Closed
wants to merge 5 commits into from

Conversation

omalley
Copy link
Contributor

@omalley omalley commented Feb 23, 2018

Known problems:

  • Doesn't do schema evolution.
  • Doesn't include column size metrics.
  • Doesn't properly handle timestamp with timezone.
  • Doesn't do the schema mangling for partitions.

@omalley
Copy link
Contributor Author

omalley commented Feb 26, 2018

Other comments:

  • ORC reads/writes to/from VectorizedColumnBatches in groups of 1024 rows. I made the Spark interfaces implement an iterator to an UnsafeRow. I believe I have it so that there won't be any allocations in the inner loop (except for Decimals).
  • I extended the RandomData class to also generate InternalRows, which is a parent interface for UnsafeRow.
  • In RandomData I limited the year range for timestamps to 1970+-50years. I got tired of debugging with dates 20,000 years in the future.
  • I added a comparison between InternalRow and Row in TestHelpers to deal with the data generator creating InternalRow instances.
  • I pulled out TestParquetWrite.Record to a top level class so that I could reuse it in TestOrcWrite.
  • Added a method on UpdateProperties to set the default file format.

@rdblue
Copy link
Contributor

rdblue commented Feb 28, 2018

Thanks! I'll have a look shortly.

break;
}
case LIST:
compareLists(prefix + "." + fieldName, childType.asListType(),
Copy link
Contributor

@rdblue rdblue Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer the assert naming convention to compare because it is clear that the result when something is different is an AssertionError. Same with compareMaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@SuppressWarnings("unchecked")
SparkOrcWriter writer = new SparkOrcWriter(ORC.write(file)
.schema(schema)
.partitionSpec(spec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think spec can be removed because it isn't relevant at a the level of a single file. It also isn't used in the OrcFileAppender.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> fieldTypes = schema.getChildren();
List<Types.NestedField> fields = new ArrayList<>(fieldNames.size());
for(int c=0; c < fieldNames.size(); ++c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no space between for and (

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

for(int c=0; c < fieldNames.size(); ++c) {
String name = fieldNames.get(c);
TypeDescription type = fieldTypes.get(c);
fields.add(Types.NestedField.optional(columnIds[type.getId()], name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ORC not support a distinction between required and optional fields?

Because Iceberg assumes that the files in a given table are managed by Iceberg, I think we can probably work around this by adding non-null checks to the write path and assuming non-null on the read path. But I would still prefer to have a guarantee that the files won't contain null values.

Copy link
Contributor Author

@omalley omalley Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, ORC doesn't support required fields. The history is that Hive doesn't support required fields, so it wasn't that important. Once schema evolution was added, it really didn't make any sense to add required fields.

That said, the column statistics track whether there are any nulls in each column. So given a file footer you can tell whether there are any nulls in that column or not. You're right that it would be easy to check for null values on the Iceberg write path. On the read path, you always need to check for null because the column may not be present in the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema evolution doesn't allow you to add a required column, so we shouldn't need to worry about missing required columns. I like that we can use the stats to make sure the file doesn't contain any nulls. Let's plan on doing that for the read path and throwing an exception.

Is it possible to add the metadata to ORC? We have a fairly compelling use case for it: when there are null values in a foreign key column, a SQL inner join will ignore the rows with null because joins use null-safe equality (null == null returns null). We want to ensure we don't accidentally lose rows this way because it is a subtle correctness issue and we can't expect users to know they should do an outer join everywhere. Instead, we want to ensure that the foreign key is never null.

return Types.BinaryType.get();
case DATE:
return Types.DateType.get();
case TIMESTAMP:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ORC differentiate between with zone and without zone? I think we need support for both. We can work around it by keeping Iceberg metadata, but I'd rather have everything represented correctly in data files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I have jira open to add it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, what's the JIRA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case STRING:
case CHAR:
case VARCHAR:
return Types.MapType.ofOptional(columnIds[key.getId()],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are keys required in ORC, or can they be null as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys can be null.

return toOrc(schema.asStruct(), columnIds);
}

static TypeDescription toOrc(Type type, List<Integer> columnIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider using a SchemaVisitor for conversion to separate the conversion logic from type traversal. Here's an example for Avro: https://github.com/Netflix/iceberg/blob/master/avro/src/main/java/com/netflix/iceberg/avro/TypeToSchema.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the SchemaVisitor. I found it far more difficult to read than having a single function that did the recursion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. I think it's important for more complicated transformations, but this does look straight-forward.

}
}
default:
// We don't have an answer for union types -> blech.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested to hear how ORC handles unions when only some of the columns are projected. This issue is why we never quite standardized unions in Parquet and why I'm reluctant to add them. (That, and I don't see a very distinct use case for them.)

I'm not against it in principle, but I'm skeptical and want to keep everything as small as possible until we're sure they are really necessary. Plus, many engines have no support for unions (e.g., Spark) and might not intend to support them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, I haven't seen that case come up.

If you dropped some of the union children, those values would become null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that returning null is the reasonable option, but doesn't that defeat the purpose of the guarantee that one branch of the union is non-null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the discussion about a UNION type in Parquet, for reference: apache/parquet-format#44

I think Alex makes some good points, like that projection should not affect the result, only the efficiency of the query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ORC it is a bit easier. As I wrote elsewhere, the user can't have required fields, so if the user projected a single one of the union children, they would get nulls in the values where the value was one of the other children. This is the same as what would happen if the file had an extra child in the union. Those values would become null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional piece of the ORC format is that we don't push the metadata down into the leaf types. So the data for the union is a selector that says which child is selected for that value. Then that child's value is used. So if child 3 is picked, but child 3 was not in the projection, it would just have a null value. But the reader could tell that it was a child 3 value that was null.

https://orc.apache.org/api/hive-storage-api/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.html

return this;
}

public WriteBuilder tableProperties(Properties properties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use Properties to pass options? I'd normally use a config(String, String) method instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, what I was trying to do was:

  • Not create a new Configuration. They are expensive from both time and memory.
  • Not change the Configuration that was passed in, since I don't own it.

That said, I'm not against changing it so that I clone the passed in Configuration. I've had bad experiences making changes to a configuration that was passed in, so I'd rather avoid that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to hide the Properties so that the caller only has to work with the builder. Properties can be used to accumulate the configuration and passed in. If this is a good way to avoid needing to change the Configuration, then I'm all for it.

@@ -68,6 +68,12 @@ public UpdateProperties remove(String key) {
return this;
}

@Override
public UpdateProperties format(FileFormat format) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like adding the method, but it should be clear that this is the default file format for the table. Tables, by design, can contain multiple file formats so you can change from one format to another and so you can write from a streaming system to Avro and then compact to a long-term storage format later.

Maybe this should be preferredFormat or defaultFormat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with defaultFormat.

return this;
}

public WriteBuilder conf(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to avoid Hadoop classes in APIs like this (even if they aren't in the iceberg-api module). In Parquet, we added config that adds properties to the internal Configuration. That configuration is passed in as part of a HadoopOutputFile instance. If HadoopOutputFile isn't used, then the writer should fall back to new Configuration() if it is required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating new Configuration object is very expensive. You really want to avoid doing at all costs. And it limits what the configuration to the default. ORC's API needs a Hadoop Configuration. So if I don't pass one down, I'll need to create it and deprive the user of the ability to pass down a context specific Configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the right way to pass that configuration is through a HadoopOutputFile. If there is a Configuration to pass, then the file passed in will be a Hadoop one and you can use it. If it isn't, then you'd have to create a configuration anyway in order to use ORC because it requires one. I guess a better way to describe it is not avoiding Hadoop classes, but keeping them in Hadoop-specific areas (not in API) so we can possibly remove the classes later.

For this, I think just moving to a config(String, String) method is a good idea, and using those configs to update a Configuration from the HadoopOutputFile instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, but I don't see why making a strong dependence on HadoopOutputFile and HadoopInputFile is ok, but Configuration is bad. In either case, you aren't going to be able to compile or run the Iceberg code without hadoop-common on the class path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just partitions Hadoop classes in their own package so that we don't leak them in the main parts of the API. The eventual goal is to support tables without pulling in Hadoop.

return this;
}

public ReadBuilder conf(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here. I'd like to avoid passing Configuration where possible.

* @return the ORC schema
*/
public static TypeDescription toOrc(Schema schema,
List<Integer> columnIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to pass column IDs back? Isn't this dependent on the order you use to traverse the Schema? Why not use a BiMap between column ID and full column names?

In other places, we use a BiMap. One key part of that is that you only ever convert in one direction: from components like ["a", "b"] to a full name, "a.b". That way, you never have to parse the names and avoid the problem of fields named "a.b".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh. I avoid Guava at all costs. They are really bad at breaking API compatibility.

I haven't done the schema evolution stuff yet, so maybe. I mostly need these so that I can stringify it to store it into the file.

All ORC TypeDescriptions have automatically assigned ids that run 0 (for the root) to N-1 for the right most leaf. So mapping from the ORC to Iceberg id is easy given the list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that this depends on the order of traversal, which is easy to accidentally break. Since this is a critical piece of information, I'd like to see a different solution to track IDs. It doesn't have to be guava (though we do use guava elsewhere because it provides so much value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The relevant constraint is that the columnIds.get(orcSchema.getId()) gives you Iceberg id.

This code does depend on the fact that ORC's ids are assigned sequentially given a prefix ordering. On the other hand, changing that would be a serious change to ORC's API.

But fine, I'll change this to a Map.

}
case LIST: {
TypeDescription child = schema.getChildren().get(0);
return Types.ListType.ofOptional(columnIds[child.getId()],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC's column ID is the ordinal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

orcType.getId() gives you the automatically assigned id number above.

@@ -103,7 +113,7 @@ public Object map(Types.MapType map, Supplier<Object> valueResult) {

Map<String, Object> result = Maps.newLinkedHashMap();
for (int i = 0; i < numEntries; i += 1) {
String key = randomString(random) + i; // add i to ensure no collisions
String key = randomString(random).toString() + i; // add i to ensure no collisions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would randomString return something that isn't a String?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

randomString returns a UTF8String.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense. Thanks!

@rdblue
Copy link
Contributor

rdblue commented Mar 2, 2018

This looks promising. I think we should be able to get everything working.

I added ORC to TestDataFrameWrites and tried it out but hit a couple of test failures with it. You might want to test that next.

Would you like to open PRs for some of the independent changes in here? Moving SimpleRecord out and the changes to RandomData are good candidates to get some of the changes in right away.

Known problems:
* Doesn't do schema evolution.
* Doesn't include column size metrics.
* Doesn't properly handle timestamp with timezone.
* Doesn't do the schema mangling for partitions.
@omalley
Copy link
Contributor Author

omalley commented Mar 6, 2018

Ok, I've updated the branch with the changes based on the comments.

@omalley
Copy link
Contributor Author

omalley commented Mar 6, 2018

Snyk isn't letting me see what the problem is. Should we add a travis-ci build?

@rdblue
Copy link
Contributor

rdblue commented Mar 6, 2018

I'm all for travis CI, but we need a Parquet release first. I'm working on that next.

@omalley
Copy link
Contributor Author

omalley commented Mar 6, 2018

Ok, I think I have all of the feedback resolved.

@omalley
Copy link
Contributor Author

omalley commented Mar 6, 2018

I assume you'll squash the commit before committing. I've just left them as separate commits to make review easier.

@omalley
Copy link
Contributor Author

omalley commented Mar 7, 2018

With the last commit the TestDataFramesWrites works with ORC also.

@rdblue
Copy link
Contributor

rdblue commented Mar 8, 2018

I'm on leave today and tomorrow, I'll review the changes on Monday. Thanks!

@rdblue
Copy link
Contributor

rdblue commented Mar 13, 2018

Looks good to me. I'll do a little more digging into the random data tomorrow to make sure it covers everything, but otherwise I think this is ready to commit.

@rdblue
Copy link
Contributor

rdblue commented Mar 13, 2018

@omalley, I have a few minor updates. Do you want me to include them when I squash, or do you want me to open a PR for your branch?

@omalley
Copy link
Contributor Author

omalley commented Mar 13, 2018

You can go ahead and include them in the squash. What were they?

@rdblue
Copy link
Contributor

rdblue commented Mar 13, 2018

Fix spelling (primative -> primitive), pass floats through for validation, and use strict equality when checking floats and doubles because for a storage system, the bits really should be identical. I'm also adding generic type args for maps and lists in some places.

@omalley
Copy link
Contributor Author

omalley commented Mar 13, 2018

I've been burned so badly by using equality for floats & doubles over the years that I always avoid it. Even for a storage system, cases like non-normalized or various NaN will cause problems. In this case, because we are generating the data, it is no doubt fine. I still wouldn't recommend it.

@rdblue
Copy link
Contributor

rdblue commented Mar 13, 2018

I think the good outweighs the bad here. A storage system should guarantee that the bits you pass in are the bits you get out, and we should verify that's the case. For values like NaN where it could be reasonable to normalize, we should explicitly test for it and not rely on undefined behavior.

@rdblue
Copy link
Contributor

rdblue commented Mar 13, 2018

Merged as c59138e. Thanks for contributing, @omalley!

Would you mind opening a few issues to cover the remaining features that we need for ORC before it is ready for a release?

@rdblue rdblue closed this Mar 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants