Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Commit

Permalink
Use GenericRecordBuilder to handle missing default values, convert to…
Browse files Browse the repository at this point in the history
… java.util.List for array type
  • Loading branch information
lindblombr committed Mar 30, 2017
1 parent f3814be commit 71733da
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path

import scala.collection.immutable.Map
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder}
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyOutputFormat
Expand All @@ -40,6 +40,8 @@ import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[avro] class AvroOutputWriter(
Expand Down Expand Up @@ -174,12 +176,14 @@ private[avro] class AvroOutputWriter(
val sourceArray = item.asInstanceOf[Seq[Any]]
val sourceArraySize = sourceArray.size
val targetArray = new Array[Any](sourceArraySize)
// val targetArray = new util.ArrayList[Any]()
var idx = 0
while (idx < sourceArraySize) {
targetArray(idx) = elementConverter(sourceArray(idx))
// targetArray.add(elementConverter(sourceArray(idx)))
idx += 1
}
targetArray
targetArray.toSeq.asJava
}
}
case MapType(StringType, valueType, _) =>
Expand Down Expand Up @@ -260,7 +264,11 @@ private[avro] class AvroOutputWriter(
}
}
}
record
if(forceAvroSchema.isDefined) {
new GenericRecordBuilder(record).build()
} else {
record
}
}
}
}
Expand Down

0 comments on commit 71733da

Please sign in to comment.