Skip to content

Conversation

ConeyLiu
Copy link
Contributor

@ConeyLiu ConeyLiu commented May 10, 2017

What changes were proposed in this pull request?

This path aims to solve the poor performance of RDD.cartesian. In the original method of cartesian, it need repeatedly fetch remotely data or recompute, so the performance is poor. In this path cache the second partition into the local with BlockManager. There are two advantage:

  • Because we cache it with BlockManager and set the storage level as MEMORY_AND_DISK, so we don't need care the OOM caused by the buffer.

  • Many task may depend on the same block (the second partition) for calculation, so don't remove the block when other task need it. This can reduce the times of fetching or calculate.

How was this patch tested?

Test enviroments: 4 Executors(10 core, 30GB Mem) in one node with(4 ssd + 7hdd)
Test case:

   def randomValue(): String = {
      val random = Random.alphanumeric
      random.take(100).mkString
    }

    val keys = sc.parallelize(1 to 10000L)
    val data1 = keys.map(id => (id, randomValue()))
    val data2 = keys.map(id => (id, randomValue()))

    data1.repartition(40)
    data2.repartition(40)

    val pairs = data1.cartesian(data2).filter {
      case (x, y) => StringUtils.getLevenshteinDistance(x._2, y._2) < 5
    }

    val start = System.nanoTime()
    pairs.count()
    println((System.nanoTime() - start) / 1e6)

Before:
353491.027379
After:
94516.680067

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 10, 2017

Hi @viirya, can you help to review this? I thinks you are familiar with this, because you had tried to solve it before.

And also ping @srowen , @mridulm, @jerryshao.

@ConeyLiu
Copy link
Contributor Author

A cluster version of the comparison results, I will be given later.

@jerryshao
Copy link
Contributor

Looks like there's a similar PR #17898 trying to address this issue, can you please elaborate your difference compared to that one?

@ConeyLiu
Copy link
Contributor Author

hi @jerryshao,thanks for your review. In #17898,there is a potential buffer to cache the data,so we should control the groupsize very careful. Because for small size,it need fetch more times. For lager,there is potential OOM. So,in this pr we using blockmanager to cache it. And the block cached can be used multiple task in same executor. But that patch changed little.

@jerryshao
Copy link
Contributor

jerryshao commented May 10, 2017

From my first glance, I have several questions:

  1. If the parent's partition has already been cached in local blockmanager, do we need to cache again?
  2. There will be situation several tasks are waiting for one task to materialize the iterator into blockmanager, can we improve this?
  3. If the memory is not enough, is it always faster to read from disk than to re-computation from parent partition? In your case parent RDD is shuffleRDD, so you need to fetch partitions remotely, but if parent partition can be calculated locally, your improvement still stands?

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 10, 2017

Cool, you see the iterator operation can be divided in two cases:

  1. get the block from local, this case is very good.
  2. get the block from remote.
    • The block is cached in remote. So we should get it through network. (NetWork IO)
    • The block need recalculate. This ways we need repeatedly recalculate. Firstly, read the block from disk, then transmit it through network, then calculate. (Disk IO, NetWork IO, Waste of computing resources)

For your question answers:
1. If the block is cached before, we don't cache it again.
2. If several task need the same block, they need wait. Because we only can one write lock for the same block.
3. In shuffle case. For reduce phase, get one block we need to read data from the disk, network transmission and calculation. And these times are determined by the upper loop. And the middle result generated by map is present at all nodes, so this overhead is high.

And also, this patch has some insufficient. The highest case is delete the cached block after the TaskSet finished, because the block may be used by the next task. However, there is not a api to access DAGScheduler or other relate (maybe I miss some). So in this patch we remove the cached
block only if the block is not locked.

@SparkQA
Copy link

SparkQA commented May 10, 2017

Test build #3708 has finished for PR 17936 at commit 08c1849.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 12, 2017

The cluster test result. The RDD.cartesian is used in Spark mllib ALS algorithm, so use this path compared with the latest spark master branch.

Environments: Spark on Yarn with 9 executors(10 cores & 30 GB Mem) on three nodes.
Test Data: User 480,000, and Item 17,000.

Test Case:

object TestNetflixlib {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Test Netflix mlib")
    val sc = new SparkContext(conf)

    val data = sc.textFile("hdfs://10.1.2.173:9000/nf_training_set.txt")

    val ratings = data.map(_.split("::") match {
      case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
    })

    val rank = 0
    val numIterations = 10
    val train_start = System.nanoTime()
    val model = ALS.train(ratings, rank, numIterations, 0.01)
    val training_time = (System.nanoTime() - train_start)/ 1e9
    println(s"Training time(s): $training_time")

    val rec_start = System.nanoTime()
    val userRec = model.recommendProductsForUsers(20)
    println(userRec.count())
    val rec_time = (System.nanoTime() - rec_start) / 1e9
    println(s"Recommend time(s): $rec_time")
  }
}

Test Results:

Improved Branch Master Branch Percentage of ascension
139.934s 162.597s 16 %
148.138s 157.597s 6%
157.899s 189.580s 20%
135.520s 152.486s 13%
166.101s 184.485s 11 %

@ConeyLiu ConeyLiu changed the title [SPARK-20638][Core][WIP]Optimize the CartesianRDD to reduce repeatedly data fetching [SPARK-20638][Core]Optimize the CartesianRDD to reduce repeatedly data fetching May 12, 2017
@jtengyp
Copy link

jtengyp commented May 15, 2017

I think you@ConeyLiu should directly test the Cartesian phase with the following patch.

val user = model.userFeatures
val item = model.productFeatures
val start = System.nanoTime()
val rate = user.cartesian(item)
println(rate.count())
val time = (System.nanoTime() - start) / 1e9

The recommendForAll in mllib ALS has been merged a new PR (#17742). Your PR may not fit this case.

@ConeyLiu
Copy link
Contributor Author

Yeah, I can test it. You see, the ALS is an pratical use case. So, choose it as a test case more convincing. And I also want to see the improvement of this pr even after merged #17742.

@ConeyLiu
Copy link
Contributor Author

Hi, @jtengyp the test Results as follow:

Improved Branch Master Branch Percentage of ascension
15.877s 2827.373s 178x
16.781s 2809.502s 167x
16.320s 2845.699s 174x
19.437s 2860.387s 147x
16.793s 2931.667s 174x

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a complex change to a critical component, so I'm not sure I can review this. Is there no eaiser way to achieve this? I'm wondering whether it's too good to be true and whether there are downsides or cases this doesn't work?

val iterator = rdd.iterator(partition, context)
// Keep read lock, because next we need read it. And don't tell master.
blockManager.putIterator[U](blockId2, iterator, level, false, true) match {
case true =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I think match is confusing overkill for booleans. Just use if-else


val tmpRdd = sc.textFile(tmpFilePath, numPartitions)
tmpRdd.cache()
tmpRdd.count()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we cache the rdd in the CartesianRDD compute method, so there we should count the bytes read from memory.

sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
val aRdd = sc.textFile(cartFilePath, numPartitions)
aRdd.cache()
aRdd.count()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a very strange mistake. If we cache both aRdd & tmpRdd, this pr and master branch all pasted the test. But if we just cache the tmpRdd, both the branch are failed. So here are temporarily set to cache. I will look at the details of the problem, it may be a bug, if I understand the wrong me, please pointer me.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 19, 2017

@srowen Sorry for the late reply. I updated the code. Because we should reduce times of the remotely fetch, the second partition should be cached in locally. There are two ways, first cached by the TaskConsumer which controlled by the Execution Memory(this method seems #9969); Second, cached by the BlockManager which controlled by the Storage Memory. Through the experiment found that the first way gc problem is very serious.

Cartesian only used in ALS and UnsafeCartesianRDD. However, the latter itself implements a Cartesian, you can see as follow:

class UnsafeCartesianRDD(
    left : RDD[UnsafeRow],
    right : RDD[UnsafeRow],
    numFieldsOfRight: Int,
    spillThreshold: Int)
  extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

  override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
    val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)

    val partition = split.asInstanceOf[CartesianPartition]
    rdd2.iterator(partition.s2, context).foreach(rowArray.add)

    // Create an iterator from rowArray
    def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator()

    val resultIter =
      for (x <- rdd1.iterator(partition.s1, context);
           y <- createIter()) yield (x, y)
    CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
      resultIter, rowArray.clear())
  }
}

So I think there should be no other impact.

@ConeyLiu
Copy link
Contributor Author

Hi, @squito, @cloud-fan. Can you help review this code? Thanks a lot.


val iterator = rdd.iterator(partition, context)
if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
// If the block is cached in local, wo shouldn't cache it again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even getStorageLevel is not StorageLevel.NONE, we still can't guarantee the block can be successfully cached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Thanks for review. If the storeage level of rdd2 is not StorageLevel.NONE, it will cached by in the method RDD.getOrCompute. So I think we should cache it again, because the blockManger.getOrElseUpdate call the same method as blockManager.putIterator.

Copy link
Member

@viirya viirya May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrElseUpdate doesn't guarantee the block can be successfully cached. It can be failed to cache it. In this case, it simply returns the iterator.

Copy link
Contributor Author

@ConeyLiu ConeyLiu May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but if it isn't cached in local, the next loop will try call the iterator again, then we will call the getOrElseUpdate. You means we should check if it is not cached, try cached it again?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I mean here if getStorageLevel != StorageLevel.NONE, you assume the block is cached and return the iterator. However, the caching can be failed and you just return the computed iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll change it, thanks very much.

@jerryshao
Copy link
Contributor

@ConeyLiu , I would suggest to add a flag cartesianRDD to specify whether local cache should be enabled. User could choose to enable it or not. Besides, if cache into BlockManager is failed, can we offload to original cartesian computation, so that the task will not be failed.

@ConeyLiu
Copy link
Contributor Author

Hi @jerryshao . Good advice. Because here choose MEMORY_AND_DISK, it should be failed from the logic of blockManager.putIterator, or else the error should be irrevesible. Maybe I understand the wrong, please pointer.

@jerryshao
Copy link
Contributor

I see. I think at least we should make this cache mechanism controllable by flag. I'm guessing in some HPC clusters or single node cluster this problem is not so severe.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 19, 2017

OK, I'll add it. From the test result, performance is still very obvious. Mainly from the network and disk overhead.

@viirya
Copy link
Member

viirya commented May 19, 2017

How much difference this performs, compared with caching the two RDDs before doing cartesian with current codebase?

var holdReadLock = false

// Try to get data from the local, otherwise it will be cached to the local.
def getOrElseCache(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, can we move those functions out of compute? Too many nested functions here and making compute too big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will change it too.

@viirya
Copy link
Member

viirya commented May 19, 2017

I agreed with @srowen. This adds quite complexity. If there is no much difference comparing with caching RDDs before doing cartesian (or other ways), it may not worth such complexity.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 19, 2017

I did not directly test this situation. But I have test the this pr compared with latest ALS(after merge #17742 ). In ALS, the both RDDs are cached, and also grouped the iterator(iterator.grouped). You can see the test result above, and the directly test I will give next week due to maintenance of server.

@jerryshao
Copy link
Contributor

@viirya , this is slightly different from caching RDD. It is more like broadcasting, the final state is that each executor will hold the whole data of RDD2, the difference is that this is executor-executor sync, not driver-executor sync.

I also have the similar concern. The performance can be varied by workloads, we'd better have some different workloads to see general improvements.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented May 19, 2017

Sorry for the mistake, this test result should be the cached situation:

Improved Branch Master Branch Percentage of ascension
15.877s 2827.373s 178x
16.781s 2809.502s 167x
16.320s 2845.699s 174x
19.437s 2860.387s 147x
16.793s 2931.667s 174x

Test case:

object TestNetflixlib {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Test Netflix mlib")
    val sc = new SparkContext(conf)

    val data = sc.textFile("hdfs://10.1.2.173:9000/nf_training_set.txt")

    val ratings = data.map(_.split("::") match {
      case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
    })

    val rank = 0
    val numIterations = 10
    val train_start = System.nanoTime()
    val model = ALS.train(ratings, rank, numIterations, 0.01)
    val user = model.userFeatures
    val item = model.productFeatures
    val start = System.nanoTime()
    val rate = user.cartesian(item)
    println(rate.count())
    val time = (System.nanoTime() - start) / 1e9
    println(time)
  }
}

The RDDs (user and item) should be cached.

@viirya
Copy link
Member

viirya commented May 19, 2017

@jerryshao Yeah, the reason I mentioned caching is to know how much re-computing RDD costs in the performance. It seems to me that if re-computing is much more costing than transferring the data, only caching can be helpful.

@viirya
Copy link
Member

viirya commented May 19, 2017

@jerryshao As you mentioned broadcasting, another question might be, can we just use broadcasting to achieve similar performance without such changes?

@ConeyLiu
Copy link
Contributor Author

Broadcast should first fetch the all block to driver, and cached in the local, then the executor fetch it from the driver. I think it's really time consuming.

@viirya
Copy link
Member

viirya commented May 19, 2017

Seems it should be still better than original cartesian, since it saves re-computing RDD, re-transferring data?

@ConeyLiu
Copy link
Contributor Author

Yeah, I think I can do the performance comparison.

@ConeyLiu
Copy link
Contributor Author

Hi @rxin, would you mind take a look?

@cloud-fan
Copy link
Contributor

In Spark SQL we have UnsafeCartesianRDD which already has this optimization so this patch won't benifit Spark SQL.

As we are encouraging users to use Spark SQL as the main programing interface instead of RDD, it seems to me that this patch is not very useful for Spark.

BTW I think it's hard to optimize CartesianRDD without regression, IIRC there were many PRs try to optimize it but didn't get a consensus.

@chenghao-intel
Copy link
Contributor

I can understand any code change in Spark core will be hard to review due to the regression concern, I think we can leave the PR for discussion.

  1. Actually the UnsafeCartesianRDD doesn't aware the block locality and will re-fetch the data from remote even the data has been fetched by another local node task, that's why we have to change some code in BlockManager.
  2. For some existing application based on RDD, like the MLLib still are using the CartesianRDD, and we can observe 50x performance boosting in ALS prediction. Previously even we couldn't finish the ALS predication without this optimization until we well tuning lots of things.
  3. Repeatable data block iterations probably very useful for new API implementations like Cartesian Product for Machine Learning due to performance concern, unfortunately the BlockManager doesn't provide this feature, and we may add some other operations based on this improvement in the future, that's why we think it's important.

@cloud-fan
Copy link
Contributor

... will re-fetch the data from remote even the data has been fetched by another local node task ...

This is a good point, we should improve it, but I don't think relying on block manager is a good idea:

  1. the memory used here is actually execution memory, not storage memory. (different spilling priority)
  2. block manager flushes data to disk with partition granularity, i.e. it will flush the whole partition to disk for memory outage, while it's better to have record granularity.

Maybe we can use a hash map to reuse already fetched partitions in UnsafeCartesianRDD and see how it goes, and then apply similar optimization in CartesianRDD.

@suyanNone
Copy link
Contributor

suyanNone commented Sep 15, 2017

May create a MemoryAndDiskArray like ExternalAppendOnlyMap? MemoryAndDiskArray, not only use here but also groupByKey? and its memory can controlle by MemoryManager

@ConeyLiu
Copy link
Contributor Author

You are saying to use MemoryAndDiskArray cached data?UnsafeCartesianRDD also use ExternalAppendOnlyUnsafeRowArray to caching data. But in that implementation, we need fetch data for each task. However, we only fetch data per executor when cache data to BlockManager.

@suyanNone
Copy link
Contributor

suyanNone commented Sep 15, 2017

So careless to notice UnsafeCartesianRDD's ExternalAppendOnlyUnsafeRowArray, that nice, I am not read all discussion here...the solution unify with unsafeCartesionRDD already have a big improvement for CartesionRDD, and it seams more simple and easy to understand... (In our inner change, we adopt a memory and disk array to store graphx Array[EdgeAttr])... I not sure it will have a strong optimize requirement to avoid per task fetch

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 20, 2017

rdd1.cartesian(rdd2). For each task we need pool all the data of rdd1 (or rdd2) from the cluster. If we have n task running parallel in the same executor, that means we need duplicate poll n same data to same executor. This can bring seriously gc problem and network I/O (maybe disk I/O if the memory and disk
array can't fit it in memory totally).

@jiangxb1987
Copy link
Contributor

I'm going to close this PR because it goes stale, please feel free to reopen it or open another PR if anyone have more thoughts on this issue.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 7, 2017

OK, thanks a lot.

@asfgit asfgit closed this in ed1478c Nov 7, 2017
@ConeyLiu ConeyLiu deleted the cartesian branch November 7, 2017 10:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants