Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-3517]mapPartitions is not correct clearing up the closure #2376

Closed
wants to merge 1 commit into from

Conversation

witgo
Copy link
Contributor

@witgo witgo commented Sep 13, 2014

commit 9d841bb is just a temporary solution,The problems should be in ClosureCleaner

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 2376 at commit 9d841bb.

  • This patch merges cleanly.

@witgo witgo changed the title [SPARK-3517]mapPartitions is not correct clearing up the closure [WIP][SPARK-3517]mapPartitions is not correct clearing up the closure Sep 13, 2014
@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 2376 at commit 9d841bb.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Dummy(object):

@rxin
Copy link
Contributor

rxin commented Sep 13, 2014

What is the problem here? Can you give an example or test case?

@witgo
Copy link
Contributor Author

witgo commented Sep 13, 2014

@rxin Code like this:

     val topicModel =  "Big object"
      val broadcastModel = data.context.broadcast(topicModel) 
      corpus = corpus.mapPartitions { docs =>
        val topicModel = broadcastModel.value
       .....
      }

The serialized corpus RDD Task and serialized topicModel broadcast almost as big.
cat spark.log | grep 'stored as values in memory' =>

14/09/13 00:49:21 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 197.5 MB, free 2.6 GB)
14/09/13 00:49:24 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 197.7 MB, free 2.3 GB)

@rxin
Copy link
Contributor

rxin commented Sep 13, 2014

Can you add a unit test? You can just create a RDD and serialize it to test the size.

@JoshRosen
Copy link
Contributor

It seems like the issue here is that unnecessary objects are being included in the closure, since presumably this bug would also manifest itself through serialization errors if the extra objects aren't serializable

Maybe we can test this directly with a mock object and a check that its writeObject method isn't called.

@witgo
Copy link
Contributor Author

witgo commented Sep 17, 2014

I tried to reproduce it with the way the test case, but was unsuccessful.But like the code below does appear the problem, do not know why

def runGibbsSampling(
    data: RDD[Document], initModel: TopicModel,
    totalIter: Int, burnInIter: Int
  ): (TopicModel, RDD[Document]) = {
    require(totalIter > burnInIter, "totalIter is less than burnInIter")
    require(totalIter > 0, "totalIter is less than 0")
    require(burnInIter > 0, "burnInIter is less than 0")

    val (numTopics, numTerms, alpha, beta) = (initModel.topicCounts_.size,
      initModel.topicTermCounts_.head.size,
      initModel.alpha, initModel.beta)
    val probModel = TopicModel(numTopics, numTerms, alpha, beta)

    logInfo("Start initialization")
    var (topicModel, corpus) = sampleTermAssignment(data, initModel)

    for (iter <- 1 to totalIter) {
      logInfo("Start Gibbs sampling (Iteration %d/%d)".format(iter, totalIter))
      val broadcastModel = data.context.broadcast(topicModel)
      val previousCorpus = corpus
      corpus = corpus.mapPartitions { docs =>
        val rand = new Random
        val topicModel = broadcastModel.value
        val topicThisTerm = BDV.zeros[Double](numTopics)
        docs.map { doc =>
          val content = doc.content
          val topics = doc.topics
          val topicsDist = doc.topicsDist
          for (i <- 0 until content.length) {
            val term = content(i)
            val topic = topics(i)
            val chosenTopic = topicModel.dropOneDistSampler(topicsDist, topicThisTerm,
              rand, term, topic)
            if (topic != chosenTopic) {
              topics(i) = chosenTopic
              topicsDist(topic) += -1
              topicsDist(chosenTopic) += 1
              topicModel.update(term, topic, -1)
              topicModel.update(term, chosenTopic, 1)
            }
          }
          doc
        }
      }.setName(s"LDA-$iter").persist(StorageLevel.MEMORY_AND_DISK)

      if (iter % 5 == 0 && data.context.getCheckpointDir.isDefined) {
        corpus.checkpoint()
      }
      topicModel = collectTopicCounters(corpus, numTerms, numTopics)
      if (iter > burnInIter) {
        probModel.merge(topicModel)
      }
      previousCorpus.unpersist()
      broadcastModel.unpersist()
    }
    val burnIn = (totalIter - burnInIter).toDouble
    probModel.topicCounts_ :/= burnIn
    probModel.topicTermCounts_.foreach(_ :/= burnIn)
    (probModel, corpus)
  }

@witgo
Copy link
Contributor Author

witgo commented Sep 28, 2014

I temporarily can not reproduce it, and close this PR

@witgo witgo closed this Sep 28, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants