Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27216][CORE] Upgrade RoaringBitmap to 0.7.45 #24157

Closed
wants to merge 4 commits into from

Conversation

LantaoJin
Copy link
Contributor

What changes were proposed in this pull request?

HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.

How was this patch tested?

Adding UT

  test("kryo serialization with RoaringBitmap") {
    val bitmap = new RoaringBitmap
    bitmap.add(1787)

    val safeSer = new KryoSerializer(conf).newInstance()
    val bitmap2 : RoaringBitmap = safeSer.deserialize(safeSer.serialize(bitmap))
    assert(bitmap2.equals(bitmap))

    conf.set("spark.kryo.unsafe", "true")
    val unsafeSer = new KryoSerializer(conf).newInstance()
    val bitmap3 : RoaringBitmap = unsafeSer.deserialize(unsafeSer.serialize(bitmap))
    assert(bitmap3.equals(bitmap)) // this will fail
  }

@LantaoJin
Copy link
Contributor Author

This UT only works after #24156 fixed. Now it's easy to reproduce by replacing conf.set("spark.kryo.unsafe", "true") to conf.set("spark.kyro.unsafe", "true").

@LantaoJin
Copy link
Contributor Author

Since current RoaringBitmap couldn't be ser/deser correctly in unsafe KryoSerializer, first thing I could think out is replacing this data structure totally or when use unsafe Kryo. How do you think about it?
cc @dongjoon-hyun @vanzin @squito @gatorsmile

@srowen
Copy link
Member

srowen commented Mar 20, 2019

Does this need to be serialized? I wouldn't think so if it doesn't work!

val bitmap2 : RoaringBitmap = safeSer.deserialize(safeSer.serialize(bitmap))
assert(bitmap2.equals(bitmap))

conf.set("spark.kryo.unsafe", "true")
Copy link
Contributor

@attilapiros attilapiros Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are changing the conf which also used by other tests within the suite and now the execution order of these tests are important. If the test execution starts with this test and others are executed latter they might fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be move to a totally new Suite. I will update it.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@squito
Copy link
Contributor

squito commented Mar 20, 2019

@srowen I think this is only necessary with "spark.kryo.unsafe=true" -- it probably never worked with that configuration before, but did work with the default "spark.kryo.unsafe=false"

Err, scratch that, I was looking at entirely the wrong thing. I'm also confused here -- so far, this change is just the failing UT, right? you will add the actual fix to behavior as part of this pr?

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 21, 2019

@srowen @squito I've added another UT which is the minimized dataset from our product issue.
In this UT, I roughly comment one line in ShuffleBlockFetcherIterator to avoid job fail.

if (buf.size == 0) {
// throwFetchFailedException(blockId, address, new IOException(msg))
}

After that, the testing fail zero-size blocks in ShuffleBlockFetcherIteratorSuite will fail. This was introduced by #21219. So in Spark2.3.x, this UT doesn't need this hard code commenting.

@squito
Copy link
Contributor

squito commented Mar 22, 2019

@LantaoJin I'm still confused by the status of this -- it seems its just test changes, not behavior changes, but it sounds like you are saying some behavior is just broken. Its labeled as a WIP, but you've also pinged people for review. Are you looking for help in determining the right fix? If so, it would help us if you could give a more complete description of what goes wrong. I don't see anything obviously wrong with unsafe kryo and roaring bitmap -- you could try serializing a tiny bitmap and see if the bits make sense

Or do you believe this by itself is actually the complete change?

@srowen srowen closed this Mar 29, 2019
@LantaoJin LantaoJin changed the title [WIP][SPARK-27216][CORE] Kryo serialization with RoaringBitmap [SPARK-27216][CORE] Upgrade RoaringBitmap to 0.7.45 Apr 1, 2019
@LantaoJin
Copy link
Contributor Author

LantaoJin commented Apr 1, 2019

How can I reopen this closed ticket? @srowen @srowen Now I have detected the root cause. It's a bug of RoaringBitmap and fixed in latest version. (Above unit tests could illustrate it observably. I renamed this PR and am going to push new commit.

@LantaoJin
Copy link
Contributor Author

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Apr 1, 2019

Since SQLQueryWithKryoSuite is overkill for this PR but useful to illustrate this problem. I will delete it from code and keep it in comment here. After upgraded to latest version, below UT could pass.

package org.apache.spark.sql

import org.apache.spark.internal.config
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.internal.config.SERIALIZER
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class SQLQueryWithKryoSuite extends QueryTest with SharedSQLContext {

  override protected def sparkConf = super.sparkConf
    .set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
    .set(KRYO_USE_UNSAFE, true)

  test("kryo unsafe data quality issue") {
    // This issue can be reproduced when
    // 1. Enable KryoSerializer
    // 2. Set spark.kryo.unsafe to true
    // 3. Use HighlyCompressedMapStatus since it uses RoaringBitmap
    // 4. Set spark.sql.shuffle.partitions to 6000, 6000 can trigger issue based the supplied data
    // 5. Comment the zero-size blocks fetch fail exception in ShuffleBlockFetcherIterator
    //    or this job will failed with FetchFailedException.
    withSQLConf(
      SQLConf.SHUFFLE_PARTITIONS.key -> "6000",
      config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.key -> "-1") {
      withTempView("t") {
        val df = spark.read.parquet(testFile("test-data/dates.parquet")).toDF("date")
        df.createOrReplaceTempView("t")
        checkAnswer(
          sql("SELECT COUNT(*) FROM t"),
          sql(
            """
              |SELECT SUM(a) FROM
              |(
              |SELECT COUNT(*) a, date
              |FROM t
              |GROUP BY date
              |)
            """.stripMargin))
      }
    }
  }
}

@srowen
Copy link
Member

srowen commented Apr 1, 2019

@LantaoJin you should be able to reopen this, or it will reopen if you push a new commit.

@LantaoJin
Copy link
Contributor Author

Sorry I can not reopen it since a force pushing. I open a new #24264 as a updating.

@srowen
Copy link
Member

srowen commented Apr 1, 2019

That's fine, I can reopen them too, but you already have a new PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants