Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Commit

Permalink
Spark 2.2.0 Support (#242)
Browse files Browse the repository at this point in the history
* simple support for spark 2.2+ as well by copying old conversion logic

* fix test dependencies

* update java version & compatibility in README

* go back to building against spark 2.1 for travis, since its configured to use java 1.7

* update README

* update travis test matrix

* fix hadoop version for spark 2.2 tests

* cleanup inadvertent change
  • Loading branch information
squito authored and rxin committed Aug 17, 2017
1 parent c19f01a commit 204864b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
8 changes: 8 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ matrix:
- jdk: openjdk7
scala: 2.10.6
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0"
# Spark 2.2.0, Scala 2.11, and Avro 1.7.x
- jdk: openjdk8
scala: 2.11.8
env: TEST_HADOOP_VERSION="2.6.5" TEST_SPARK_VERSION="2.2.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7"
# Spark 2.2.0, Scala 2.10, and Avro 1.8.x
- jdk: openjdk8
scala: 2.10.6
env: TEST_HADOOP_VERSION="2.6.5" TEST_SPARK_VERSION="2.2.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0"
script:
- ./dev/run-tests-travis.sh
after_success:
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ A library for reading and writing Avro data from [Spark SQL](http://spark.apache
This documentation is for version 3.1.0 of this library, which supports Spark 2.0+. For
documentation on earlier versions of this library, see the links below.

This library has different versions for Spark 1.2, 1.3, 1.4+, and 2.0:
This library has different versions for Spark 1.2, 1.3, 1.4+, 2.0 - 2.1, and 2.2:

| Spark Version | Compatible version of Avro Data Source for Spark |
| ------------- | ------------------------------------------------ |
| `1.2` | `0.2.0` |
| `1.3` | [`1.0.0`](https://github.com/databricks/spark-avro/tree/v1.0.0) |
| `1.4+` | [`2.0.1`](https://github.com/databricks/spark-avro/tree/v2.0.1) |
| `2.0` | `3.1.0` (this version) |
| `2.0 - 2.1` | `3.2.0` (latest released version) |
| `2.2` | `3.3.0` (unreleased) |

## Linking

Expand Down
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ libraryDependencies ++= Seq(
"commons-io" % "commons-io" % "2.4" % "test"
)

// curator leads to conflicting guava dependencies
val curatorExclusion = ExclusionRule(organization = "org.apache.curator")

libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test",
"org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client"),
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client"),
"org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test" excludeAll(curatorExclusion),
"org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") excludeAll(curatorExclusion),
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") excludeAll(curatorExclusion),
"org.apache.avro" % "avro" % testAvroVersion.value % "test" exclude("org.mortbay.jetty", "servlet-api"),
"org.apache.avro" % "avro-mapred" % testAvroMapredVersion.value % "test" classifier("hadoop2") exclude("org.mortbay.jetty", "servlet-api")
)
Expand Down
14 changes: 12 additions & 2 deletions src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.sql.Date
import java.util.HashMap

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import scala.collection.immutable.Map

import org.apache.avro.generic.GenericData.Record
Expand All @@ -43,9 +44,12 @@ private[avro] class AvroOutputWriter(
context: TaskAttemptContext,
schema: StructType,
recordName: String,
recordNamespace: String) extends OutputWriter {
recordNamespace: String) extends OutputWriter {

private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace)
// copy of the old conversion logic after api change in SPARK-19085
private lazy val internalRowConverter =
CatalystTypeConverters.createToScalaConverter(schema).asInstanceOf[InternalRow => Row]

/**
* Overrides the couple of methods responsible for generating the output streams / files so
Expand All @@ -66,7 +70,13 @@ private[avro] class AvroOutputWriter(

}.getRecordWriter(context)

override def write(row: Row): Unit = {
// this is the new api in spark 2.2+
def write(row: InternalRow): Unit = {
write(internalRowConverter(row))
}

// api in spark 2.0 - 2.1
def write(row: Row): Unit = {
val key = new AvroKey(converter(row).asInstanceOf[GenericRecord])
recordWriter.write(key, NullWritable.get())
}
Expand Down

1 comment on commit 204864b

@VigneshMohan1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen when there will be next spark-avro release?

Please sign in to comment.