Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overhaul 'druid-parquet-extensions' module, promoting from 'contrib' to 'core' #6360

Merged
merged 16 commits into from
Nov 6, 2018

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Sep 20, 2018

This PR promotes the druid-parquet-extensions module from 'contrib' to 'core' and introduces a new hadoop parser that is not based on converting to avro first, instead using the SimpleGroup based reference implementation of the parquet-column package of parquet-mr. This is likely not be the best or most efficient way to parse and convert parquet files... but its raw structure suited my needs of supporting converting int96 timestamp columns into longs (for #5150) and additionally provide the ability to support a flattenSpec.

changes:

  • druid-parquet-extensions now provides 2 types of hadoop parsers, parquet and parquet-avro, which use org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat and org.apache.druid.data.input.parquet.avro.DruidParquetAvroInputFormat hadoop input formats respectively.
  • parquet and parquet-avro parsers now both support flattenSpec by specifying parquet and avro in the parseSpec respectively. parquet-avro re-uses the druid-avro-extensions spec and flattener. There may be minor behavior differences in how parquet logical types are handled.
  • extracted abstract type NestedDataParseSpec<TFlattenSpec> for ParseSpecs which support a flattenSpec property, used by JSONParseSpec, AvroParseSpec, and ParquetParseSpec (also introduced in this PR)
  • lightly modified behavior of avro flattener auto field discovery to be more discerning about arrays (only primitive arrays are now considered) and to allow nullable primitive fields to be picked up. The array thing might need to be called out, since previously it would have the toString array contents of complex types, which I don't think is correct behavior, but could trip up anyone relying on that to happen.
  • adds many tests and parquet test files ("donated" from spark-sql tests here) to ensure conversion correctness (though probably still not enough)

On top of all of the added tests, I've lightly tested both parsers on a local druid/hadoop cluster on my laptop.

Fixes #5150 with parquet parser (parquet-avro still does not support INT96)
Fixes #5433 by defaulting "parquet.avro.add-list-element-records":"false" for parquet-avro

@clintropolis clintropolis force-pushed the core-parquet branch 3 times, most recently from 11cac0d to d7bc3f0 Compare September 25, 2018 19:57
@amalakar
Copy link
Contributor

The INT96 bug has been one of the annoying issues we have had. It had caused us to convert to csv first before loading into druid. Looking forward to this fix.

@clintropolis
Copy link
Member Author

FYI I'm working on refactoring/parameterizing tests to cut down on the amount of dupe and json, but haven't had the chance to finish yet.


public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Group>
{
private static final MappingProvider DEFAULT_MAPPING_PROVIDER = new MappingProvider()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks the same as GenericAvroMappingProvider, can these be merged?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, forgot about this. I was planning to make a default do nothing implementation since that is what these are both doing 👍

@Override
public Function<Group, Object> makeJsonQueryExtractor(String expr)
{
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better to throw an exception here

{
return schema.getType().equals(Schema.Type.UNION) &&
schema.getTypes().size() == 2 &&
schema.getTypes().get(0).getType().equals(Schema.Type.NULL) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the union types, is the NULL type guaranteed to appear before the actual type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, i'm actually not certain it's guaranteed, from spec

Unions
Unions, as mentioned above, are represented using JSON arrays. For example, ["null", "string"] declares a schema which may be either a null or string.

(Note that when a default value is specified for a record field whose type is a union, the type of the default value must match the first element of the union. Thus, for unions containing "null", the "null" is usually listed first, since the default value of such unions is typically null.)

Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum. For example, unions containing two array types or two map types are not permitted, but two types with different names are permitted. (Names permit efficient resolution when reading and writing unions.)

Unions may not immediately contain other unions.

I'm actually not certain what would be best to do in this case, but since we didn't support nullable fields at all previously afaict maybe it's ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, from that it sounds like [primitive, null] is valid if the default is non-null then, can you make this check non-order dependent?

HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
ParseSpec parseSpec = config.getParser().getParseSpec();

// todo: this is kind of lame, maybe we can still trim what we read if we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, rather than parsing the flattenspec, maybe this could be supported with a "requiredFields" method on flatten specs, but I would remove the "todo" part for now

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished initial review, the tests are somewhat unwieldy right now so I'll wait for your update there and do another review pass.

@Override
public boolean isArray(final Object o)
{
if (o instanceof List) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can just be return (o instanceof List)

@Override
public boolean isMap(final Object o)
{
if (o instanceof Map) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be (o instanceof Map) || (o instanceof Group)

return converter.unwrapListPrimitive(o);
} else if (o instanceof List) {
List<Object> asList = (List<Object>) o;
if (asList.stream().allMatch(ParquetGroupConverter::isWrappedListPrimitive)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases where such lists have some items "wrapped" but some items "unwrapped"?

I'm wondering if the allMatch isWrappedListPrimitive check is necessary, is it possible and correct to remove that pass and just unwrap anything that's wrapped in the list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what is allowed to happen, but I think if the list is not homogeneous it is safer to not do this conversion rather than do a partial conversion. Leaving this as is for now

@amalakar
Copy link
Contributor

amalakar commented Nov 1, 2018

Is this PR going to be merged soon? Would love to try it out.

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jon-wei jon-wei merged commit 1224d8b into apache:master Nov 6, 2018
@clintropolis clintropolis deleted the core-parquet branch November 6, 2018 21:35
@jon-wei jon-wei removed their assignment Nov 16, 2018
gianm pushed a commit to implydata/druid-public that referenced this pull request Nov 16, 2018
…to 'core' (apache#6360)

* move parquet-extensions from contrib to core, adds new hadoop parquet parser that does not convert to avro first and supports flattenSpec and int96 columns, add support for flattenSpec for parquet-avro conversion parser, much test with a bunch of files lifted from spark-sql

* fix avro flattener to support nullable primitives for auto discovery and now only supports primitive arrays instead of all arrays

* remove leftover print

* convert micro timestamp to millis

* checkstyle

* add ignore for .parquet and .parq to rat exclude

* fix legit test failure from avro flattern behavior change

* fix rebase

* add exclusions to pom to cut down on redundant jars

* refactor tests, add support for unwrapping lists for parquet-avro, review comments

* more comment

* fix oops

* tweak parquet-avro list handling

* more docs

* fix style

* grr styles
@dclim
Copy link
Contributor

dclim commented Nov 27, 2018

Change to index spec inputFormat (org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat) should be called out in release notes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants