-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table which uses Avro schema url 'avro.schema.url' #19779
Conversation
Test build #83984 has finished for PR 19779 at commit
|
034b246
to
a59bd09
Compare
Test build #83985 has finished for PR 19779 at commit
|
@@ -89,6 +90,8 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) | |||
val fileSinkConfSer = fileSinkConf | |||
new OutputWriterFactory { | |||
private val jobConf = new SerializableJobConf(new JobConf(conf)) | |||
private val broadcastHadoopConf = sparkSession.sparkContext.broadcast( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use jobConf as hive serde initialize param directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment, I'll change code to use jobConf
a59bd09
to
9beb53f
Compare
Test build #84011 has finished for PR 19779 at commit
|
Hi, @vinodkc .
|
@@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging { | |||
} | |||
} | |||
|
|||
test(s"$version: Insert into/overwrite external avro table") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to have this in VersionSuite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add SPARK-19878
in a test case name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine to keep it in VersionSuite
, since it is related to Hive.
""".stripMargin | ||
) | ||
versionSpark.sql( | ||
s"""insert overwrite table $destTableName select * from $srcTableName""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit.
INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review comments, I'll fix all your comments
assert(versionSpark.table(destTableName).count() === | ||
versionSpark.table(srcTableName).count()) | ||
versionSpark.sql( | ||
s"""insert into table $destTableName select * from $srcTableName""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
versionSpark.sql( | ||
s"""insert into table $destTableName select * from $srcTableName""".stripMargin) | ||
assert(versionSpark.table(destTableName).count()/2 === | ||
versionSpark.table(srcTableName).count()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, can we check values instead of count?
Ping @cloud-fan and @gatorsmile . |
The fix looks good to me. You can address the comments left by @dongjoon-hyun |
9beb53f
to
f92f44c
Compare
@@ -800,7 +800,7 @@ class VersionsSuite extends SparkFunSuite with Logging { | |||
} | |||
} | |||
|
|||
test(s"$version: read avro file containing decimal") { | |||
test(s"$version: SPARK-17920: read avro file containing decimal") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean SPARK-17920 is already fixed because this test passes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan , Sorry, wrong test case updated,I'll change it
Test build #84065 has finished for PR 19779 at commit
|
d09211f
to
b7d7a3c
Compare
Test build #84069 has finished for PR 19779 at commit
|
b7d7a3c
to
68ee79d
Compare
Test build #84071 has finished for PR 19779 at commit
|
test(s"$version: SPARK-17920: Insert into/overwrite external avro table") { | ||
withTempDir { dir => | ||
val path = dir.getAbsolutePath | ||
val schemaPath = s"""$path${File.separator}avroschemadir""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val schemaFile = new File(dir, "avroDecimal.avsc")
val writer = new PrintWriter(schemaFile)
writer.write(avroSchema)
writer.close()
...
|STORED AS | ||
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' | ||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' | ||
|LOCATION '$destLocation' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to provide a location for an empty table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to 'CREATE EXTERNAL TABLE'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bug is for external table only? how about managed table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan , This bug is for both external and managed tables.
I've added a new test case for managed table too. However, to avoid code duplication, should I include both tests inside same test method?. Please suggest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just test the managed table, to avoid creating a temp directory for external table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I've updated the test case to test only managed tables and avoided creating a temp directory.
68ee79d
to
a2560b3
Compare
Test build #84076 has finished for PR 19779 at commit
|
Test build #84081 has finished for PR 19779 at commit
|
a2560b3
to
083e1b3
Compare
Test build #84090 has finished for PR 19779 at commit
|
083e1b3
to
51999d0
Compare
""".stripMargin | ||
) | ||
versionSpark.sql( | ||
s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stripMargin
is useless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll update it
val result = versionSpark.table(srcTableName).collect() | ||
assert(versionSpark.table(destTableName).collect() === result) | ||
versionSpark.sql( | ||
s"""INSERT INTO TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
LGTM |
cc @felixcheung This sounds critical for Spark 2.2 too. |
51999d0
to
e3651ef
Compare
Test build #84092 has finished for PR 19779 at commit
|
Thanks! Merged to master. @vinodkc Could you submit a separate PR for backporting it to 2.2? |
@gatorsmile , @cloud-fan and @dongjoon-hyun |
Test build #84094 has finished for PR 19779 at commit
|
…nch-2.2 - Support writing to Hive table which uses Avro schema url 'avro.schema.url' ## What changes were proposed in this pull request? > Backport #19779 to branch-2.2 SPARK-19580 Support for avro.schema.url while writing to hive table SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url Support writing to Hive table which uses Avro schema url 'avro.schema.url' For ex: create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); insert overwrite table avro_out select * from avro_in; // fails with java.lang.NullPointerException WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate problem java.lang.NullPointerException at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174) ## Changes proposed in this fix Currently 'null' value is passed to serializer, which causes NPE during insert operation, instead pass Hadoop configuration object ## How was this patch tested? Added new test case in VersionsSuite Author: vinodkc <vinod.kc.in@gmail.com> Closes #19795 from vinodkc/br_Fix_SPARK-17920_branch-2.2.
…nch-2.2 - Support writing to Hive table which uses Avro schema url 'avro.schema.url' ## What changes were proposed in this pull request? > Backport apache#19779 to branch-2.2 SPARK-19580 Support for avro.schema.url while writing to hive table SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url Support writing to Hive table which uses Avro schema url 'avro.schema.url' For ex: create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); insert overwrite table avro_out select * from avro_in; // fails with java.lang.NullPointerException WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate problem java.lang.NullPointerException at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174) ## Changes proposed in this fix Currently 'null' value is passed to serializer, which causes NPE during insert operation, instead pass Hadoop configuration object ## How was this patch tested? Added new test case in VersionsSuite Author: vinodkc <vinod.kc.in@gmail.com> Closes apache#19795 from vinodkc/br_Fix_SPARK-17920_branch-2.2.
What changes were proposed in this pull request?
SPARK-19580 Support for avro.schema.url while writing to hive table
SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala
SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url
Support writing to Hive table which uses Avro schema url 'avro.schema.url'
For ex:
create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');
create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');
insert overwrite table avro_out select * from avro_in; // fails with java.lang.NullPointerException
WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate problem
java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
Changes proposed in this fix
Currently 'null' value is passed to serializer, which causes NPE during insert operation, instead pass Hadoop configuration object
How was this patch tested?
Added new test case in VersionsSuite