Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22367][WIP][CORE] Separate the serialization of class and object for iteraor #19586

Closed
wants to merge 4 commits into from

Conversation

ConeyLiu
Copy link
Contributor

@ConeyLiu ConeyLiu commented Oct 27, 2017

What changes were proposed in this pull request?

Becuase they are all the same class for an iterator. So there is no need write class information for every record in the iterator. We only need write the class information once at the serialization beginning, also only need read the class information once for deserialization.

In this patch, we separate the serialization of class and object for an iterator serialized by Kryo. This can improve the performance of the serialization and deserialization, and save the space.

Test case:

    val conf = new SparkConf().setAppName("Test for serialization")
    val sc = new SparkContext(conf)

    val random = new Random(1)
    val data = sc.parallelize(1 to 1000000000).map { i =>
      Person("id-" + i, random.nextInt(Integer.MAX_VALUE))
    }.persist(StorageLevel.OFF_HEAP)

    var start = System.currentTimeMillis()
    data.count()
    println("First time: " + (System.currentTimeMillis() - start))

    start = System.currentTimeMillis()
    data.count()
    println("Second time: " + (System.currentTimeMillis() - start))

Test result:

The size of serialized:
before: 34.3GB
after: 17.5GB

before(cal+serialization) before(deserialization) after(cal+serialization) after(deserialization)
63869 21882 45513 15158
59368 21507 51683 15524
66230 21481 62163 14903
62399 22529 52400 16255

How was this patch tested?

Existing UT.

@ConeyLiu
Copy link
Contributor Author

Hi, @cloud-fan @jiangxb1987 @chenghao-intel. Would you mind take a look? Thanks a lot.

pom.xml Outdated
@@ -133,7 +133,7 @@
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.3.20.v20170531</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<chill.version>0.8.4</chill.version>
<chill.version>0.9.2</chill.version>
Copy link
Contributor Author

@ConeyLiu ConeyLiu Oct 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether it should be changed. If it is unreasonable, I can change it back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to update it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. Chill 0.9.2 uses kryo 4.0. I can change it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

library upgrading deserves a separated PR.

@ConeyLiu
Copy link
Contributor Author

One executor, the configuration as follows:
the script:

${SPARK_HOME}/bin/spark-submit \
        --class com.intel.KryoTest  \
        --master yarn                   \
        --deploy-mode  cluster           \
        --conf spark.memory.offHeap.enabled=true   \
        --conf spark.memory.offHeap.size=50g       \
        --conf spark.serializer=org.apache.spark.serializer.KryoSerializer  \
        --driver-memory 5G         \
        --driver-cores  10        \
        --executor-memory  40G          \
        --executor-cores  20            \
        --num-executors 1               \

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also slightly concerned about compatibility here. I'm trying to think if there's any case where we intend to support kryo/java serialized objects from 2.x in 2.y.

Also, doesn't Kryo registration solve the problem of writing the class name every time? that's why it compresses it to an identifier.

@@ -205,11 +205,45 @@ class KryoSerializationStream(

private[this] var kryo: Kryo = serInstance.borrowKryo()

// This is only used when we write object and class separately.
var classWrote = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why you need this state and need to repeat the logic about writing / not writing classes everywhere. Surely this just goes in one writeAll / asIterator pair?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it used for writeAll / asIterator. But for MemoryStorea.putIteratorAsBytes, we don't use the writeAll, we use this state to indicate whether we have written the class first.

Copy link
Member

@srowen srowen Oct 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not write that state as an iterator of stuff, if that's how it behaves? rather than duplicate code. 'values' is already an iterator there. Either way there's no need for state here. This state is local to the writing process. This seems like a recipe for a thread-safety bug later.

pom.xml Outdated
@@ -133,7 +133,7 @@
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.3.20.v20170531</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<chill.version>0.8.4</chill.version>
<chill.version>0.9.2</chill.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to update it?

@ConeyLiu
Copy link
Contributor Author

@srowen Thanks for the reviewing.
What do you meaning here?

I'm trying to think if there's any case where we intend to support kryo/java serialized objects from 2.x in 2.y.

After you registered. It still writes the class (not class full name but just a class ID) if you call writeObjectAndClass. In order to get the class id, there is also need some calculation. And then write the class ID and object.

val value = values.next()
if (kryoSerializationStream != null) {
if (!kryoSerializationStream.classWrote) {
kryoSerializationStream.writeClass(value.getClass)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen you can see here. Here we don't use the writeAll, because we need acquire memory according to the written size.

@@ -205,11 +205,45 @@ class KryoSerializationStream(

private[this] var kryo: Kryo = serInstance.borrowKryo()

// This is only used when we write object and class separately.
var classWrote = false

override def writeObject[T: ClassTag](t: T): SerializationStream = {
kryo.writeClassAndObject(output, t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting kryo to buffer the distinct classes and only store an identifier/pointer for duplicated classes. Even if we write object and class every time, the overhead should be small. This is not true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, it just write a varInt if the class have been registered. And also there need some calculation for getting the varInt. But from the test, the overhead looks more serious than I expected.

@cloud-fan
Copy link
Contributor

looking at the SerializationStream interface, I think it's designed for read/write objects of different classes, so your optimization should not be applied there.

Instead, I think we should introduce SerializerInstance#serializeStreamForClass[T], which returns ClassSpecificSerializationStream[T] that is designed for writing objects of same class.

@ConeyLiu ConeyLiu changed the title [SPARK-22367][CORE] Separate the serialization of class and object for iteraor [SPARK-22367][WIP][CORE] Separate the serialization of class and object for iteraor Oct 30, 2017
@cloud-fan
Copy link
Contributor

OK to test

@ConeyLiu
Copy link
Contributor Author

Hi @cloud-fan, thanks for reviewing. There are some errors about UnsafeShuffleWrite need further fixed. I am not familiar with this code, so I need some time.

@jerryshao
Copy link
Contributor

@ConeyLiu what about the below example, does your implementation support this?

trait Base { val name: String }
case class A(name: String) extends Base
case class B(name: String) extends Base

sc.parallelize(Seq(A("a"), B("b"))).map { i => (i, 1) }.reduceByKey(_ + _).collect()

Here not all the elements have same class type, does your PR here support such scenario?

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 1, 2017

Hi @jerryshao, Thanks for the reminder, it doesn't support it. I'm sorry I did not take that into account. How about using configuration to determine whether we should use SerializerInstance#serializeStreamForClass[T]. For most case the data type should be same.

Can you give some advice? Also cc @cloud-fan @srowen

@jerryshao
Copy link
Contributor

Using configurations seems not so elegant, also configuration is application based, how would you turn off/on this feature in the runtime? Sorry I cannot give you a good advice, maybe kryo's solution is the best option for general case.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 1, 2017

Currently, I use it directly. Maybe this is suitable for some special case which has same type data, such as ml or else.

@cloud-fan
Copy link
Contributor

For these cases, they can write their own serializer and set it via spark.serializer. I don't think Spark should have built-in support for them because it's not general.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 2, 2017

Hi @cloud-fan, for most case the data type should be same. So I think this optimization is valuable, because it can save the space and cpu resource considerable. What about setting a flag for the RDD, which indicates whether the RDD only has the same types. If it'st not valid, could we putting it to the ml package for special serializer, then user could configure it. But for this case, there must be provided the exactly classtag of the RDD for serialization due to the relocation of unsafeshufflewrite.

@jerryshao
Copy link
Contributor

jerryshao commented Nov 2, 2017

I tend to agree with @cloud-fan , I think you can implement your own serializer out of Spark to be more specialized for your application, that will definitely be more efficient than the built-in one. But for the Spark's default solution, it should be general enough to cover all cases. Setting a flag or a configuration is not intuitive enough from my understanding.

And for ML, can you please provide an example about how this could be improved with your approach. From my understanding you approach is more useful when leverage custom class definition, like Person in your example. But for ML/SQL cases, all the types should be predefined or primitives, will that improve a lot?

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 2, 2017

OK, I can understand your concern. There is huge gc problem for K-means workload, it occupied about 10-20% percent. The source data is cached in memory in our test, there is even worse performance when the source data can't be cached in memory. So we try the source data to off-heap. However, the training time even worse after using the off-heap memory. Because the gc only occupied about 10-20% with on-heap memory, while deserialization occupied about 30-40% with off-heap memory even if the gc problem solved.
capture

capture

You can see the pic, the readClass occupied about 13% . So I opened this pr. With this patch test result, the total time (loading data + training kmeans model) saved about 10% . The above picture is only about training phase, not include the loading source data phase, so the improvement should be larger than we expected. And I plan to optimize the readObjectOrNull after this.

Also, I found the Vector is not registered, so I will test the performance with the registered vector. This maybe can reduce the cpu occupied, but can't save the serialized memory.

@cloud-fan
Copy link
Contributor

I think this problem will go away after mllib migrate to Spark SQL completely. For now I think we can make the serializer config job-wise and set this special serializer for ml jobs.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 3, 2017

Hi @cloud-fan, @jerryshao. The problem of writeClass and readClass can be solved by register the class: Vector, DenseVector, SparseVector. The follow is the test results:

val conf = new SparkConf().setAppName("Vector Register Test")
    conf.registerKryoClasses(Array(classOf[Vector], classOf[DenseVector], classOf[SparseVector]))
    val sc = new SparkContext(conf)

    val sourceData = sc.sequenceFile[LongWritable, VectorWritable](args(0))
      .map { case (k, v) =>
        val vector = v.get()
        val tmpVector = new Array[Double](v.get().size())
        for (i <- 0 until vector.size()) {
          tmpVector(i) = vector.get(i)
        }
        Vectors.dense(tmpVector)
      }

    sourceData.persist(StorageLevel.OFF_HEAP)
    var start = System.currentTimeMillis()
    sourceData.count()
    println("First: " + (System.currentTimeMillis() - start))
    start = System.currentTimeMillis()
    sourceData.count()
    println("Second: " + (System.currentTimeMillis() - start))

    sc.stop()

Results:
serialized size: before 38.4GB after: 30.5GB
First time: before 93318ms after: 80708ms
Second time: before: 5870ms after: 3382ms

Those classes are very common for ML, and also Matrix, DenseMatrix and SparseMatrix too. I'm not sure whether we should register those classes in core directly, because this could introduce extra jar dependency. So could you give some advice? Or else we just remind in the ml doc?

The reason shoule be the problem of kryo, it will write the full class name instead of the classID if the class is not registered.

@cloud-fan
Copy link
Contributor

You can call SparkConf#registerKryoClasses manually, maybe we can also register these ml classes automatically in KryoSerializer.newKryo via reflection.

cc @yanboliang @srowen

@jiangxb1987
Copy link
Contributor

also cc @WeichenXu123

@WeichenXu123
Copy link
Contributor

We can config the class to register by config spark.kryo.classesToRegister, does it need to add into spark code ?

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Nov 4, 2017

and in ml, if we want to register class before running algos, Some other classes like LabeledPoint, Instance also need registered.
and there're some class temporary defined in some ml algos (when using RDD).

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 6, 2017

Thanks for the suggestion, I re-raised a pr to solve this problem. Close it now.

@ConeyLiu ConeyLiu closed this Nov 6, 2017
@ConeyLiu ConeyLiu deleted the kryo branch November 7, 2017 01:44
asfgit pushed a commit that referenced this pull request Nov 10, 2017
## What changes were proposed in this pull request?

There are still some algorithms based on mllib, such as KMeans. For now, many mllib common class (such as: Vector, DenseVector, SparseVector, Matrix, DenseMatrix, SparseMatrix) are not registered in Kryo. So there are some performance issues for those object serialization or deserialization.
Previously dicussed: #19586

## How was this patch tested?

New test case.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #19661 from ConeyLiu/register_vector.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants