Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"java.io.IOException: Not an Avro data file" when using AvroBigQueryInputFormat #14

Closed
nevillelyh opened this issue Oct 20, 2015 · 5 comments

Comments

@nevillelyh
Copy link

Code looks like this

val conf = self.hadoopConfiguration

conf.setClass(
  AbstractBigQueryInputFormat.INPUT_FORMAT_CLASS_KEY,
  classOf[AvroBigQueryInputFormat], classOf[InputFormat[_, _]])
BigQueryConfiguration.configureBigQueryInput(conf, table.getProjectId, table.getDatasetId, table.getTableId)

sc.newAPIHadoopFile(
  BigQueryStrings.toString(table).replace(':', '.'),
  classOf[AvroBigQueryInputFormat],
  classOf[LongWritable], classOf[GenericData.Record]).map(_._2)

Stacktrace:

Caused by: java.io.IOException: Not an Avro data file
    at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)
    at com.google.cloud.hadoop.io.bigquery.AvroRecordReader.initialize(AvroRecordReader.java:49)
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:176)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

I saw these in the log:
15/10/20 14:51:54 INFO bigquery.AbstractBigQueryInputFormat: Resolved GCS export path: 'gs://starship/hadoop/tmp/bigquery/job_201510201451_0002' 15/10/20 14:51:55 INFO bigquery.ShardedExportToCloudStorage: Computed '2' shards for sharded BigQuery export. 15/10/20 14:51:55 INFO bigquery.ShardedExportToCloudStorage: Table 'prefab-wave-844:bigquery_staging.spark_query_20151020144723_1201584542' to be exported has 4251779 rows and 1641186694 bytes 15/10/20 14:51:55 INFO bigquery.ShardedExportToCloudStorage: Computed '2' shards for sharded BigQuery export. 15/10/20 14:51:55 INFO bigquery.ShardedExportToCloudStorage: Table 'prefab-wave-844:bigquery_staging.spark_query_20151020144723_1201584542' to be exported has 4251779 rows and 1641186694 bytes
And verified that the export path indeed contains Avro files only and nothing else.
I also tried reading the export path with spark-avro and that works fine.

@nevillelyh
Copy link
Author

More stacktrace from a failed executor.

15/10/20 15:26:01 INFO bigquery.DynamicFileListRecordReader: Initializing DynamicFileListRecordReader with split 'InputSplit:: length:2125889 locations: [] toString(): gs://starship/hadoop/tmp/bigquery/job_201510201525_0024/shard-1/data-*.avro[2125889 estimated records]', task context 'TaskAttemptContext:: TaskAttemptID:attempt_201510201525_0024_m_000001_0 Status:'
15/10/20 15:26:01 INFO bigquery.DynamicFileListRecordReader: Adding new file 'data-000000000000.avro' of size 0 to knownFileSet.
15/10/20 15:26:01 INFO bigquery.DynamicFileListRecordReader: Moving to next file 'gs://starship/hadoop/tmp/bigquery/job_201510201525_0024/shard-1/data-000000000000.avro' which has 0 bytes. Records read so far: 0
15/10/20 15:26:02 WARN bigquery.DynamicFileListRecordReader: Got non-null delegateReader during close(); possible premature close() call.
15/10/20 15:26:02 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
java.lang.IllegalStateException
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:158)
    at com.google.cloud.hadoop.io.bigquery.AvroRecordReader.close(AvroRecordReader.java:110)
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.close(DynamicFileListRecordReader.java:239)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.org$apache$spark$rdd$NewHadoopRDD$$anon$$close(NewHadoopRDD.scala:170)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1$$anonfun$3.apply(NewHadoopRDD.scala:136)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1$$anonfun$3.apply(NewHadoopRDD.scala:136)
    at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:60)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77)
    at org.apache.spark.scheduler.Task.run(Task.scala:90)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/10/20 15:26:02 ERROR executor.Executor: Exception in task 1.1 in stage 6.0 (TID 31)
java.io.IOException: Not an Avro data file
    at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)
    at com.google.cloud.hadoop.io.bigquery.AvroRecordReader.initialize(AvroRecordReader.java:49)
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:176)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/10/20 15:26:02 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 34
15/10/20 15:26:02 INFO executor.Executor: Running task 1.3 in stage 6.0 (TID 34)
15/10/20 15:26:02 INFO spark.CacheManager: Partition rdd_25_1 not found, computing it
15/10/20 15:26:02 INFO rdd.NewHadoopRDD: Input split: gs://starship/hadoop/tmp/bigquery/job_201510201525_0024/shard-1/data-*.avro[2125889 estimated records]
15/10/20 15:26:02 INFO bigquery.DynamicFileListRecordReader: Initializing DynamicFileListRecordReader with split 'InputSplit:: length:2125889 locations: [] toString(): gs://starship/hadoop/tmp/bigquery/job_201510201525_0024/shard-1/data-*.avro[2125889 estimated records]', task context 'TaskAttemptContext:: TaskAttemptID:attempt_201510201525_0024_m_000001_0 Status:'
15/10/20 15:26:02 INFO bigquery.DynamicFileListRecordReader: Adding new file 'data-000000000000.avro' of size 0 to knownFileSet.
15/10/20 15:26:02 INFO bigquery.DynamicFileListRecordReader: Moving to next file 'gs://starship/hadoop/tmp/bigquery/job_201510201525_0024/shard-1/data-000000000000.avro' which has 0 bytes. Records read so far: 0
15/10/20 15:26:02 WARN bigquery.DynamicFileListRecordReader: Got non-null delegateReader during close(); possible premature close() call.
15/10/20 15:26:02 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
java.lang.IllegalStateException
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:158)
    at com.google.cloud.hadoop.io.bigquery.AvroRecordReader.close(AvroRecordReader.java:110)
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.close(DynamicFileListRecordReader.java:239)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.org$apache$spark$rdd$NewHadoopRDD$$anon$$close(NewHadoopRDD.scala:170)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1$$anonfun$3.apply(NewHadoopRDD.scala:136)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1$$anonfun$3.apply(NewHadoopRDD.scala:136)
    at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:60)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77)
    at org.apache.spark.scheduler.Task.run(Task.scala:90)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/10/20 15:26:02 ERROR executor.Executor: Exception in task 1.3 in stage 6.0 (TID 34)
java.io.IOException: Not an Avro data file
    at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)
    at com.google.cloud.hadoop.io.bigquery.AvroRecordReader.initialize(AvroRecordReader.java:49)
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:176)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@AngusDavis
Copy link
Contributor

Thanks for the report, @nevillelyh. I'm going to start looking at this. AFAIU, BigQuery shouldn't be producing 0-length files, but instead 0-record files.

As a workaround, consider setting mapred.bq.input.sharded.export.enable to false (or AbstractBigQueryinputFormat.setEnableShardedOutput(conf, false). The overall job will run more slowly as BigQuery won't be writing records to GCS as frequently, but it may allow you to make progress.

@nevillelyh
Copy link
Author

I tried adding the following line:

conf.set(BigQueryConfiguration.ENABLE_SHARDED_EXPORT_KEY, "false")

But am getting this stacktrace now:

Caused by: java.lang.IllegalArgumentException: Split should be instance of UnshardedInputSplit.
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.createRecordReader(AbstractBigQueryInputFormat.java:158)
    at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.createRecordReader(AbstractBigQueryInputFormat.java:145)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    ... 3 more

@AngusDavis
Copy link
Contributor

@nevillelyh - apologies for the broken-ness here. I've opened PR #17 to fix unsharded exports. For the original issue of Avro files being invalid with sharded exports, BigQuery is writing an initial 0-byte file to GCS before writing the finalized object which is does not seem like valid behavior (and is different from how JSON is exported) and a bug is open with the BigQuery team on this. I have not yet seen the 0-byte file issue with unsharded exports.

@AngusDavis
Copy link
Contributor

I've done some testing and BigQuery is no longer writing 0-length files as part of sharded exports.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants