Skip to content

Commit

Permalink
[SPARK-10914] UnsafeRow serialization breaks when two machines have d…
Browse files Browse the repository at this point in the history
…ifferent Oops size.

UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).

To reproduce, launch Spark using

MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"

And then run the following

scala> sql("select 1 xx").collect()

Author: Reynold Xin <rxin@databricks.com>

Closes #9030 from rxin/SPARK-10914.
  • Loading branch information
rxin committed Oct 9, 2015
1 parent 02149ff commit 84ea287
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.catalyst.expressions;

import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
Expand All @@ -35,6 +39,7 @@
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.types.DataTypes.*;
import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
Expand All @@ -52,7 +57,7 @@
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
public final class UnsafeRow extends MutableRow {
public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {

//////////////////////////////////////////////////////////////////////////////
// Static methods
Expand Down Expand Up @@ -596,4 +601,40 @@ public boolean anyNull() {
public void writeToMemory(Object target, long targetOffset) {
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}

@Override
public void write(Kryo kryo, Output out) {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}

@Override
public void read(Kryo kryo, Input in) {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.read((byte[]) baseObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql

import java.io.ByteArrayOutputStream

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.types._
Expand All @@ -29,6 +30,32 @@ import org.apache.spark.unsafe.types.UTF8String

class UnsafeRowSuite extends SparkFunSuite {

test("UnsafeRow Java serialization") {
// serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data
val data = new Array[Byte](1024)
val row = new UnsafeRow
row.pointTo(data, 1, 16)
row.setLong(0, 19285)

val ser = new JavaSerializer(new SparkConf).newInstance()
val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
assert(row1.getLong(0) == 19285)
assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
}

test("UnsafeRow Kryo serialization") {
// serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data
val data = new Array[Byte](1024)
val row = new UnsafeRow
row.pointTo(data, 1, 16)
row.setLong(0, 19285)

val ser = new KryoSerializer(new SparkConf).newInstance()
val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
assert(row1.getLong(0) == 19285)
assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
}

test("bitset width calculation") {
assert(UnsafeRow.calculateBitSetWidthInBytes(0) === 0)
assert(UnsafeRow.calculateBitSetWidthInBytes(1) === 8)
Expand Down

0 comments on commit 84ea287

Please sign in to comment.