Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some Extractors fail with ConcurrentModificationException #9

Closed
nilesh-c opened this issue May 22, 2014 · 14 comments
Closed

Some Extractors fail with ConcurrentModificationException #9

nilesh-c opened this issue May 22, 2014 · 14 comments

Comments

@nilesh-c
Copy link
Member

I have tested the following extractors till now (it does not matter if they are together in a pipeline or not, results are same):

  • RedirectExtractor
  • PageIdExtractor
  • PageLinksExtractor
  • LabelExtractor
  • ArticlePageExtractor

Out of the above, only RedirectExtractor and PageIdExtractor work properly, and the resulting outputs match with with the original extraction-framework. Adding any of the other 3 into the pipeline causes this:

...
...

May 22, 2014 9:17:17 PM org.dbpedia.extraction.dump.extract.ExtractionProgress start
INFO: li: 1 extractors (LabelExtractor), 1 datasets (labels) started
14/05/22 21:17:17 ERROR actor.OneForOneStrategy: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.spark.storage.BlockManager)
blockManager (org.apache.spark.storage.BlockManagerSource)
blockManagerSource (org.apache.spark.SparkContext)
_sparkContext (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anon$1)
context (org.dbpedia.extraction.mappings.LabelExtractor)
extractors (org.dbpedia.extraction.mappings.CompositeParseExtractor)
extractor (org.dbpedia.extraction.mappings.RootExtractor)
extractor (org.dbpedia.extraction.dump.extract.ExtractorMapper)
com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.spark.storage.BlockManager)
blockManager (org.apache.spark.storage.BlockManagerSource)
blockManagerSource (org.apache.spark.SparkContext)
_sparkContext (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anon$1)
context (org.dbpedia.extraction.mappings.LabelExtractor)
extractors (org.dbpedia.extraction.mappings.CompositeParseExtractor)
extractor (org.dbpedia.extraction.mappings.RootExtractor)
extractor (org.dbpedia.extraction.dump.extract.ExtractorMapper)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:30)
    at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:28)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.twitter.chill.TraversableSerializer.write(Traversable.scala:28)
    at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124)
    at org.dbpedia.extraction.spark.serialize.KryoSerializer$.serialize(KryoSerializer.scala:25)
    at org.dbpedia.extraction.spark.serialize.KryoSerializationWrapper.getValueSerialized(KryoSerializationWrapper.scala:19)
    at org.dbpedia.extraction.spark.serialize.KryoSerializationWrapper.writeObject(KryoSerializationWrapper.scala:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28)
    at org.apache.spark.scheduler.ResultTask$.serializeInfo(ResultTask.scala:48)
    at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:123)
    at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1456)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:778)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.ConcurrentModificationException
    at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
    at java.util.Vector$Itr.next(Vector.java:1133)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    ... 81 more
@nilesh-c
Copy link
Member Author

This is a common problem with Spark, because the closures (anon function going inside RDD.map(...)) are serialized by Spark before distributing them. Hadoop does not have this problem because it binary-serializes the whole .jar and copies it over the network. Spark uses JavaSerialization by default, but it is very slow compared to, say, Kryo. So we use Kryo to do that by using a wrapper (Spark doesn't support kryo-serde for closures, not yet).

And uptill now the org.dbpedia.extraction.spark.serializeKryoSerializationWrapper class has been working perfectly. Some freak extractors seem to fail though. :(

@nilesh-c
Copy link
Member Author

Pheew. I just found the bug.

If you notice carefully, this problem occurs in every class where there is a reference to the context object inside the def extract() { ... } body. Example from ArticlePageExtractor:

override def extract(page : PageNode, subjectUri : String, pageContext : PageContext): Seq[Quad] =
{
  if(page.title.namespace != Namespace.Main) return Seq.empty

  val quads = new ArrayBuffer[Quad]()

  quads += new Quad(context.language, DBpediaDatasets.LinksToWikipediaArticle, subjectUri, isPrimaryTopicOf,  page.title.pageIri, page.sourceUri)
  quads += new Quad(context.language, DBpediaDatasets.LinksToWikipediaArticle, page.title.pageIri, primaryTopic, subjectUri, page.sourceUri)
  quads += new Quad(context.language, DBpediaDatasets.LinksToWikipediaArticle, page.title.pageIri, dcLanguage, context.language.wikiCode, page.sourceUri)
  quads += new Quad(context.language, DBpediaDatasets.LinksToWikipediaArticle, page.title.pageIri, typeOntProperty, foafDocument.uri, page.sourceUri)

  quads
}

context.language is being called inside extract. Now, the extract method is what acts as the mapper for Spark, therefore, it needs to be serialized along with everything it references, which means Spark attempts to serialize the whole DistributedExtractionContext object, and fails because it contains the complex SparkContext instance along with Java ClassLoaders and what not, but most importantly a mutable java.util.Vector that stores a list of classes that changes from time to time - this causes our ConcurrentModificationException.

@nilesh-c
Copy link
Member Author

I propose the following options to solve this:

(1) Instead of:

class BlahExtractor(   
  context: {
    def ontology: Ontology
    def language: Language
  }
)

we could use:

class BlahExtractor(   
  ontology: Ontology,
  language: Language
)

But I think instantiating the classes could become a huge problem? It won't be as streamlined as just passing in the context as parameter.

(2): Simply adding private val contextOntology = context.ontology, private val contextLanguage = context.language, private val contextRedirects = context.redirects to the classes as needed individually. We could try enforcing that with traits having abstract var's.

@jimkont @sangv Let me know what you think and I'll make a pull request to extraction-framework master.

@nilesh-c nilesh-c added the bug label May 23, 2014
@ninniuz
Copy link

ninniuz commented May 23, 2014

I am more comfortable with solution (2) at the moment but I understand your doubts. I think you can add some comments to the source code to explain why those fields are added, for future reference.
Let's wait for the other mentors' opinion on this though. Well done.

@jimkont
Copy link
Member

jimkont commented May 23, 2014

(1) will cause big issues in the whole framework so I vote for (2) too.

But let's create vals only when needed at the moment. e.g. in
LabelExtractor we can change it to

val datatype = context.ontology.datatypes("rdf:langString") and use
datatype in extract()

Not all extractors need all context so I woudlnt force that in the
Extractor trait

On Fri, May 23, 2014 at 9:27 AM, Andrea Di Menna
notifications@github.comwrote:

I am more comfortable with solution (2) at the moment but I understand
your doubts. I think you can add some comments to the source code to
explain why those fields are added, for future reference.
Let's wait for the other mentors' opinion on this though. Well done.


Reply to this email directly or view it on GitHubhttps://github.com//issues/9#issuecomment-43975180
.

Kontokostas Dimitris

@nilesh-c
Copy link
Member Author

Thanks Andrea.

Yes, going with (1) will give ugly problems.

But let's create vals only when needed at the moment. e.g. in
LabelExtractor we can change it to

OK, I'll send a pull request for this after testing all the extractors.

@sangv
Copy link
Contributor

sangv commented Jun 20, 2014

@nilesh-c

I ran into the same issue when running extractors on english language dumps using the nildev2 branch (probably more serializers need to be registered).

@nilesh-c
Copy link
Member Author

@sangv can you post the entire error message (it'll probably be big)? That's weird. I just tested nildev2 with the liwiki dump and it worked fine. I can't do english language right now because I haven't downloaded the huge dump as of yet.

Also, where are you testing this on? On your PC or on a server?

@nilesh-c
Copy link
Member Author

BTW, you did not include InfoboxExtractor right? That one doesn't work yet (the only bad one) because it maintains state in a HashSet. I will need to write a separate distributed version of it to make it work.

Apart from state, it references org.dbpedia.extraction.dataparser.ParserUtils which needs a new serializer just like you said. I did not add the serializer to the repo yet because we had decided to worry about InfoboxExtractor later. I can add the serializer and extraction will proceed just fine, but the infobox extraction outputs will not match until I write a distributed version for it.

@sangv
Copy link
Contributor

sangv commented Jun 25, 2014

en: extracted 9620000 pages in 76:51.307s (per page: 0.479345841995842 ms;
failed pages: 0).

14/06/20 16:05:08 WARN storage.BlockManager: Putting block rdd_5_3611 failed

14/06/20 16:05:08 ERROR executor.Executor: Exception in task ID 13245

com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException

Serialization trace:

classes (sun.misc.Launcher$AppClassLoader)

classLoader (org.apache.hadoop.conf.Configuration)

hadoopConf (org.dbpedia.extraction.dump.extract.DistConfig)

distConfig (org.dbpedia.extraction.dump.extract.DistConfigLoader)

$outer (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anonfun$1)

org$dbpedia$extraction$util$Finder$$wrap
(org.dbpedia.extraction.util.Finder)

finder$1 (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anon$1)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:88)

at
org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:83)

at
org.apache.spark.serializer.KryoSerializationStream.writeAll(KryoSerializer.scala:84)

at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:821)

at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:69)

at
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:699)

at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$5.apply(MemoryStore.scala:240)

at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$5.apply(MemoryStore.scala:229)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:229)

at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:157)

at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:75)

at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:557)

at org.apache.spark.storage.BlockManager.put(BlockManager.scala:482)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:76)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)

at org.apache.spark.scheduler.Task.run(Task.scala:53)

at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:722)

Caused by: java.util.ConcurrentModificationException

at java.util.Vector$Itr.checkForComodification(Vector.java:1156)

at java.util.Vector$Itr.next(Vector.java:1133)

at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)

at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

... 48 more

14/06/20 16:05:08 WARN scheduler.TaskSetManager: Lost TID 13245 (task
3613.0:0)

14/06/20 16:05:08 WARN scheduler.TaskSetManager: Loss was due to
com.esotericsoftware.kryo.KryoException

com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException

Serialization trace:

classes (sun.misc.Launcher$AppClassLoader)

classLoader (org.apache.hadoop.conf.Configuration)

hadoopConf (org.dbpedia.extraction.dump.extract.DistConfig)

distConfig (org.dbpedia.extraction.dump.extract.DistConfigLoader)

$outer (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anonfun$1)

org$dbpedia$extraction$util$Finder$$wrap
(org.dbpedia.extraction.util.Finder)

finder$1 (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anon$1)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:88)

at
org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:83)

at
org.apache.spark.serializer.KryoSerializationStream.writeAll(KryoSerializer.scala:84)

at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:821)

at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:69)

at
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:699)

at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$5.apply(MemoryStore.scala:240)

at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$5.apply(MemoryStore.scala:229)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:229)

at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:157)

at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:75)

at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:557)

at org.apache.spark.storage.BlockManager.put(BlockManager.scala:482)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:76)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)

at org.apache.spark.scheduler.Task.run(Task.scala:53)

at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:722)

14/06/20 16:05:08 ERROR scheduler.TaskSetManager: Task 3613.0:0 failed 1
times; aborting job

java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:601)

at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)

at
scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)

Caused by: org.apache.spark.SparkException: Job aborted: Task 3613.0:0
failed 1 times (most recent failure: Exception failure:
com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException

Serialization trace:

classes (sun.misc.Launcher$AppClassLoader)

classLoader (org.apache.hadoop.conf.Configuration)

hadoopConf (org.dbpedia.extraction.dump.extract.DistConfig)

distConfig (org.dbpedia.extraction.dump.extract.DistConfigLoader)

$outer (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anonfun$1)

org$dbpedia$extraction$util$Finder$$wrap
(org.dbpedia.extraction.util.Finder)

finder$1 (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anon$1))

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.org
http://org.apache.spark.scheduler.dagscheduler.org/
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

On Sat, Jun 21, 2014 at 4:30 AM, Nilesh Chakraborty <
notifications@github.com> wrote:

BTW, you did not include InfoboxExtractor right? That one doesn't work
yet (the only bad one) because it maintains state in a HashSet. I will
need to write a separate distributed version of it to make it work.

Apart from state, it references
org.dbpedia.extraction.dataparser.ParserUtils which needs a new
serializer just like you said. I did not add the serializer to the repo yet
because we had decided to worry about InfoboxExtractor later. I can add the
serializer and extraction will proceed just fine, but the infobox
extraction outputs will not match until I write a distributed version for
it.


Reply to this email directly or view it on GitHub
#9 (comment)
.

@nilesh-c
Copy link
Member Author

@sangv I just made a new pull request. Can you run the same test on the milestone3 branch and confirm that you can reproduce this error?

Thanks.

@sangv
Copy link
Contributor

sangv commented Jun 25, 2014

I will try it later this week, but you should just try pulling down the english
dumps and see if it works correctly.

On Tue, Jun 24, 2014 at 8:19 PM, Nilesh Chakraborty <
notifications@github.com> wrote:

@sangv https://github.com/sangv I just made a new pull request. Can you
run the same test on the milestone3 branch and confirm that you can
reproduce this error?

Thanks.


Reply to this email directly or view it on GitHub
#9 (comment)
.

@nilesh-c
Copy link
Member Author

OK, I'll try that, will probably need a couple of days to download the whole 10.8 GB dump. But this kind of error should be reproducible over any language dump - that's a puzzle.

And here's something else:

Serialization trace:

classes (sun.misc.Launcher$AppClassLoader)

classLoader (org.apache.hadoop.conf.Configuration)

hadoopConf (org.dbpedia.extraction.dump.extract.DistConfig)

distConfig (org.dbpedia.extraction.dump.extract.DistConfigLoader)

$outer (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anonfun$1)

org$dbpedia$extraction$util$Finder$$wrap
(org.dbpedia.extraction.util.Finder)

finder$1 (org.dbpedia.extraction.dump.extract.DistConfigLoader$$anon$1)

at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)

The above is an excerpt from your error that tells what's happening. Spark tries to serialize a Finder instance inside DistConfigLoader, and recursively tries to serialize the DistConfig instance. This should not be happening with the way the code is right now, at least not in milestone3. I could easily add a serializer for Hadoop's Configuration but there will still remain two problems:

  • It may cause other serialization errors too, like while trying to serialize SparkContext, which it should not, similar to above.
  • Writing a serializer only helps it to serialize the stuff, but we still need to pay the extra overhead for serialization. I just want to check if we can work around it, and resort to writing another serializer if all fails.

@nilesh-c
Copy link
Member Author

@jimkont @sangv is it possible to get temporary remote access to some machine with high bandwidth? Something with 2-4 cores and 6-8GB RAM? We can think about clusters later, but I think having a single test server would be nice. Downloading dumps and ad-hoc testing would be less cumbersome. :-/ At home I have download limits for high speed internet, and it's costly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants