diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index e6d29d04acbf3..54608d203133c 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -17,9 +17,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.12.0.jar -arrow-memory-0.12.0.jar -arrow-vector-0.12.0.jar +arrow-format-0.15.1.jar +arrow-memory-0.15.1.jar +arrow-vector-0.15.1.jar audience-annotations-0.5.0.jar automaton-1.11-8.jar avro-1.8.2.jar @@ -83,7 +83,6 @@ hadoop-yarn-server-web-proxy-2.7.4.jar hk2-api-2.5.0.jar hk2-locator-2.5.0.jar hk2-utils-2.5.0.jar -hppc-0.7.2.jar htrace-core-3.1.0-incubating.jar httpclient-4.5.6.jar httpcore-4.4.10.jar diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 8f1e7fe125b9f..917fde61fad1a 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -12,9 +12,9 @@ antlr4-runtime-4.7.1.jar aopalliance-1.0.jar aopalliance-repackaged-2.5.0.jar arpack_combined_all-0.1.jar -arrow-format-0.12.0.jar -arrow-memory-0.12.0.jar -arrow-vector-0.12.0.jar +arrow-format-0.15.1.jar +arrow-memory-0.15.1.jar +arrow-vector-0.15.1.jar audience-annotations-0.5.0.jar automaton-1.11-8.jar avro-1.8.2.jar @@ -96,7 +96,6 @@ hive-vector-code-gen-2.3.6.jar hk2-api-2.5.0.jar hk2-locator-2.5.0.jar hk2-utils-2.5.0.jar -hppc-0.7.2.jar htrace-core4-4.1.0-incubating.jar httpclient-4.5.6.jar httpcore-4.4.10.jar diff --git a/pom.xml b/pom.xml index 5110285547ab3..a6a82b3339d08 100644 --- a/pom.xml +++ b/pom.xml @@ -200,9 +200,9 @@ 1.0.0 - 0.12.0 + 0.15.1 ${java.home} diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 83afafdd8b138..4260c06f06060 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -160,9 +160,10 @@ def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. - minimum_pyarrow_version = "0.12.1" + minimum_pyarrow_version = "0.15.1" from distutils.version import LooseVersion + import os try: import pyarrow have_arrow = True @@ -174,6 +175,9 @@ def require_minimum_pyarrow_version(): if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version): raise ImportError("PyArrow >= %s must be installed; however, " "your version was %s." % (minimum_pyarrow_version, pyarrow.__version__)) + if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1": + raise RuntimeError("Arrow legacy IPC format is not supported in PySpark, " + "please unset ARROW_PRE_0_15_IPC_FORMAT") def require_test_compiled(): diff --git a/python/setup.py b/python/setup.py index 092bdd3f90117..138161ff13b41 100755 --- a/python/setup.py +++ b/python/setup.py @@ -105,7 +105,7 @@ def _supports_symlinks(): # For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the # binary format protocol with the Java version, see ARROW_HOME/format/* for specifications. _minimum_pandas_version = "0.23.2" -_minimum_pyarrow_version = "0.12.1" +_minimum_pyarrow_version = "0.15.1" try: # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 1a6f4acb63521..d1076d9d0156c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -26,7 +26,7 @@ import org.apache.arrow.flatbuf.MessageHeader import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel} -import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer} +import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer} import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD @@ -64,7 +64,7 @@ private[sql] class ArrowBatchStreamWriter( * End the Arrow stream, does not close output stream. */ def end(): Unit = { - ArrowStreamWriter.writeEndOfStream(writeChannel) + ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption) } } @@ -251,8 +251,8 @@ private[sql] object ArrowConverters { // Only care about RecordBatch messages, skip Schema and unsupported Dictionary messages if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) { - // Buffer backed output large enough to hold the complete serialized message - val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength) + // Buffer backed output large enough to hold 8-byte length + complete serialized message + val bbout = new ByteBufferOutputStream(8 + msgMetadata.getMessageLength + bodyLength) // Write message metadata to ByteBuffer output stream MessageSerializer.writeMessageBuffer(