Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove com.databricks:spark-avro to build spark avro schema by itself #770

Closed
wants to merge 1 commit into from

Conversation

cdmikechen
Copy link
Contributor

Provide a way to let hoodie support timestamp and decimal.
Change the type of timestamp from long to int64(logical_type=timestamp-millis).
Change the type of date from int to int32(logical_type=date).
Change the type of decimal from string to fix(logical_type=decimal).

In spark, hoodi can correctly convert the all data type of the primitive into the parquet type(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L372).
In hive, hoodie can correctly convert the decimal type of the primitive into the parquet type, but only read timestamp as long(ParquetHiveSerDe can not read logical_type).

Another things to mention: We need to replace avro*-1.7.7.jar in SPARK_HOME/jars to avro*-1.8.2.jar, so that spark can use logical type classes.

@cdmikechen cdmikechen changed the title remove com.databricks:spark-avro to build spark avro schema to insert parquet. remove com.databricks:spark-avro to build spark avro schema by itself Jul 1, 2019
@vinothchandar
Copy link
Member

I am all for removing the dependency. but since this touches a bunch of things, any idea how to test this more thoroughly? @n3nash @bvaradar as well

@n3nash
Copy link
Contributor

n3nash commented Jul 12, 2019

@cdmikechen Have you tried running the demo steps to ensure these changes work fine ?

@vinothchandar vinothchandar added the pr:wip Work in Progress/PRs label Jul 17, 2019
@@ -272,12 +112,23 @@ object AvroConversionUtils {
case ShortType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Short].intValue
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line needs to be removed, otherwise Decimal is still being converted to String.

@umehrot2
Copy link
Contributor

I pulled in this PR and ran tests with Decimal types. These changes are not sufficient to support Decimal types it seems.

The tables in Hive end up being created with Binary type for Decimal type columns, making them un-queryable.

hive> describe my_table;
_hoodie_commit_time 	string              	                    
_hoodie_commit_seqno	string              	                    
_hoodie_record_key  	string              	                    
_hoodie_partition_path	string              	                    
_hoodie_file_name   	string 
....
wholesale_cost   	binary  // Should have been Decimal(7,2)   	                    
list_price       	binary  // Should have been Decimal(7,2)	                    
sales_price      	binary  // Should have been Decimal(7,2) 	                    
discount_amt 	binary  // Should have been Decimal(7,2)

Upon diving further into this issue, I am able to narrow it down to this line, where the Parquet footer is read to get the schema which is written as parquet.avro.schema

https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L437

What is happening here, is that in this schema conversion from parquet.avro.schema to Parquet's schema i.e. MessageType it is loosing context of Avro's LogicalType Decimal.

The following blob for Decimal in parquet.avro.schema:

{
    "name" : "wholesale_cost",
    "type" : [ {
      "type" : "fixed",
      "name" : "wholesale_cost",
      "size" : 4,
      "logicalType" : "decimal",
      "precision" : 7,
      "scale" : 2
    }, "null" ]
  }

It end's up as following upon conversion to MessageType:

{
    "name" : "wholesale_cost",
    "type" : [ "null", {
      "type" : "fixed",
      "name" : "wholesale_cost",
      "namespace" : "",
      "size" : 4
    } ],
    "default" : null
  }

Thus any context of this field being Decimal is lost. Now, when this parquet schema is later converted to hive schema to generate the DDL for creating table, it treats is Fixed Length Byte Array.

The following line which checks whether OriginalType is Decimal has no affect, because OriginalType ends up as Null for Decimal fields:
https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java#L179

Ultimately it is converting it to Binary by treating it as Fixed Length Byte Array:
https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java#L218

Create Table command generated by Hudi:

19/09/16 23:53:35 INFO HoodieHiveClient: Creating table with CREATE EXTERNAL TABLE  IF NOT EXISTS xxxx( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string,  ... `wholesale_cost` binary, `list_price` binary, `sales_price` binary, `discount_amt` binary...) PARTITIONED BY (sold_date string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'xxxxx'

@vinothchandar
Copy link
Member

should we first rebase and resolve the conflicts?

@umehrot2
Copy link
Contributor

umehrot2 commented Sep 17, 2019

should we first rebase and resolve the conflicts?

For my testing, I had re-based this patch on top of release-0.5.0. But yes, @cdmikechen should may be rebase the PR. But the issue will still exist.

@vinothchandar
Copy link
Member

@umehrot2 Great analysis.. Would upgrading parquet-avro help?

@umehrot2
Copy link
Contributor

umehrot2 commented Sep 17, 2019

@umehrot2 Great analysis.. Would upgrading parquet-avro help?

Good point @vinothchandar . Upon a quick look at AvroSchemaConverter code in parquet-avro it seems like handling of LogicalType conversion was added since parquet 1.8.2:

https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L379

It is not there in parquet 1.8.1 which is what Hudi uses right now:
https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java

I will upgrade the parquet version and test. Will update here with what I find.

@cdmikechen
Copy link
Contributor Author

cdmikechen commented Sep 17, 2019

@umehrot2 It is base in 0.4.8 version, You also need to upgrade avro to 1.8.2 or higher version (support logicaltype), and parquet 1.8.2 or higher.
My current project is using hoodie 0.4.8. There are still some problems to be adjusted in my project, so I haven't made any improvements yet. By October, I will adjust the PR code based on version 0.5.0.
If you want to know the final result, you can test it on version 0.4.8 first.

@umehrot2
Copy link
Contributor

@vinothchandar @cdmikechen

I was able to read and write Decimal type correctly by upgrading that parquet version to 1.8.2. This PR needs to be updated accordingly.

Is there a way we can prioritize this work and get it merged ? Is there any additional testing that I can help perform which can give us confidence that it can be merged ? @cdmikechen you mentioned there are still some issues. If you would like and can point it out here, I would be willing to help out with that as well.

@vinothchandar
Copy link
Member

@umehrot2 #903 opened this for shading changes.. FYI..

On bumping up versions, there are few compatibility considerations.

  • Bumping parquet to 1.8.2 may be ok, since spark 2.2+ have that
  • Avro however is still 1.7.7 till spark 2.3
    spark bundle will only include parquet-avro and use avro jars from the spark installation. Thus simply bumping parquet to 1.8.2 inside Hudi and then using against a spark 2.4 could work. Supporting Decimal on spark 2.3 and earlier might be tricky.. thoughts?

Also feel free to open a new PR, since @cdmikechen will take few weeks to circle back, as he mentioned.

@cdmikechen
Copy link
Contributor Author

cdmikechen commented Sep 18, 2019

@umehrot2
In addition to the decimal problem, I also modified a timestamp conversion problem.
On spark dataset, this PR get the right result. But there are still some problems on Hive and sparksql. Hive 2.3 does not correctly identify the logical-type in parquet-avro file, timestamp type may be cast to long type in Hive 2.3.
I modified some of Hive's source in org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector code to solve this problem.

package org.apache.hadoop.hive.serde2.objectinspector.primitive;

import java.sql.Timestamp;

import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;

public class WritableTimestampObjectInspector extends
    AbstractPrimitiveWritableObjectInspector implements
    SettableTimestampObjectInspector {

  public WritableTimestampObjectInspector() {
    super(TypeInfoFactory.timestampTypeInfo);
  }

  @Override
  public TimestampWritable getPrimitiveWritableObject(Object o) {
    if (o instanceof LongWritable) {
      return (TimestampWritable) PrimitiveObjectInspectorFactory.writableTimestampObjectInspector
              .create(new Timestamp(((LongWritable) o).get()));
    }
    return o == null ? null : (TimestampWritable) o;
  }

  public Timestamp getPrimitiveJavaObject(Object o) {
    if (o instanceof LongWritable) {
        return new Timestamp(((LongWritable) o).get());
    }
    return o == null ? null : ((TimestampWritable) o).getTimestamp();
  }

  public Object copyObject(Object o) {
    if (o instanceof LongWritable) {
        return new TimestampWritable(new Timestamp(((LongWritable) o).get()));
    }
    return o == null ? null : new TimestampWritable((TimestampWritable) o);
  }

  public Object set(Object o, byte[] bytes, int offset) {
    if (o instanceof LongWritable) {
      o = PrimitiveObjectInspectorFactory.writableTimestampObjectInspector
              .create(new Timestamp(((LongWritable) o).get()));
    } else
    ((TimestampWritable) o).set(bytes, offset);
    return o;
  }

  public Object set(Object o, Timestamp t) {
    if (t == null) {
      return null;
    }
    if (o instanceof LongWritable) {
      o = PrimitiveObjectInspectorFactory.writableTimestampObjectInspector.create(t);
    } else
    ((TimestampWritable) o).set(t);
    return o;
  }

  public Object set(Object o, TimestampWritable t) {
    if (t == null) {
      return null;
    }
    if (o instanceof LongWritable) {
      o = PrimitiveObjectInspectorFactory.writableTimestampObjectInspector
              .create(new Timestamp(((LongWritable) o).get()));
    } else
    ((TimestampWritable) o).set(t);
    return o;
  }

  public Object create(byte[] bytes, int offset) {
    return new TimestampWritable(bytes, offset);
  }

  public Object create(Timestamp t) {
    return new TimestampWritable(t);
  }
}

I'm looking for a solution that doesn't need to modify the hive source code. See if you can come up with any good ideas.

@cdmikechen
Copy link
Contributor Author

cdmikechen commented Sep 18, 2019

@vinothchandar
The jar of avro 1.7.7 under spark can be directly replaced by 1.8.2. I have tested some codes in spark2.2 and proved that direct replacement of jar is a feasible method. In most cases, the method of avro1.8.2 is compatible with avro1.7.7.
This method is applicable to spark 2.2 and 2.3. Spark 2.4 can be used directly because it has its own.

@umehrot2
Copy link
Contributor

@vinothchandar At the moment, I cannot think of a good way how we can upgrade avro version while still continuing to support Spark 2.3 or earlier. What @cdmikechen has mentioned about asking users for this additional step of dropping avro 1.8.2 jars in spark's classpath could be one option.

If we agree that it is fine, either me or @cdmikechen can create a new PR based off this, with following changes:

  • Upgrade parquet version
  • Rollback Timestamp conversion to Logical Type, and continue to support it like String

It appears like with the above 2 changes, this PR can be in a state to be merged. We can continue on the Timestamp issue in a separate Jira/PR.

@vinothchandar
Copy link
Member

I have a slightly different strategy. We can move to spark 2.4 and match its parquet (1.10.1), avro
(1.8.2) versions. Match parquet-avro to parquet version. As long as Spark 2.3 can work with parquet-avro 1.10.1, (we bundle this), it should be fine?

Also @umehrot2 , is supporting 2.3 a must or can we drop Hudi support for lower than 2.4 versions? Hudi community is ok per se to just support 2.4. if so, then we can also drop com.databricks from the code and use org.apache.spark.avro (which is only in version 2.4)

cc @bvaradar @bhasudha who are looking into the spark 2.4 move

@umehrot2
Copy link
Contributor

@vinothchandar At EMR we do not have a use-case to support Spark 2.3 or earlier. We would be offering Hudi starting with our latest release which has Spark 2.4.3. Anything earlier than this we would not be supporting.

So, it might be a good idea to just move to 2.4 and drop support for earlier versions.

@vinothchandar
Copy link
Member

@umehrot2 I think balaji has his hands full with the release atm. Do you have bandwidth to try moving to spark 2.4 and do these changes on top?

@umehrot2
Copy link
Contributor

@umehrot2 I think balaji has his hands full with the release atm. Do you have bandwidth to try moving to spark 2.4 and do these changes on top?

Sure. I will take this up then.

@vinothchandar
Copy link
Member

sg. assigned you https://issues.apache.org/jira/browse/HUDI-91 . lets continue there

@cdmikechen
Copy link
Contributor Author

cdmikechen commented Sep 19, 2019

@vinothchandar @umehrot2
I've tried changing avro version to 1.8.2 in hudi pom.xml before. Spark 2.2 or 2.3 don't use avro 1.8.2 jars in hoodie jar, It will use avro 1.7.7 first and I will still encounter the same mistake(report missing logical type class and so on).
Maybe you can try using below codes in shell to test avro 1.8.2

--conf spark.driver.userClassPathFirst=true 
--conf spark.executor.userClassPathFirst=true

OR do some change like hive dependence in spark-bundle

<relocation>
    <pattern>org.apache.avro</pattern>
    <shadedPattern>com.apache.hudi.org.apache.avro</shadedPattern>
</relocation>

In this way, we can be compatible with spark 2.2, 2.3 and 2.4.

@cdmikechen
Copy link
Contributor Author

I will continue to discuss this issue on JIRA later.

The version I'm running in the production environment now is the Hudi 0.4.8 version with this PR added. If there are new changes, I can also do some experiments in my test environment.

@cdmikechen
Copy link
Contributor Author

Close thie PR right now. Some problems have been fixed in PR #1005. The remaining timestamp type problem will be further discussed in other JIRA's issues.

@cdmikechen cdmikechen closed this Jan 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr:wip Work in Progress/PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants