Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE in HeartBeat.java in development snapshot #148

Closed
rbraley opened this Issue Feb 24, 2014 · 11 comments

Comments

Projects
None yet
3 participants
@rbraley
Copy link

rbraley commented Feb 24, 2014

Not sure if I am missing something but I thought this error might be upstream. Here is how to repeat.

build.sbt

name := "elasticsearch-spark-example"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
  "org.apache.spark"  %% "spark-core" % "0.9.0-incubating",
  "org.elasticsearch"  % "elasticsearch-hadoop" % "1.3.0.BUILD-SNAPSHOT"
)

resolvers ++= Seq(
  "Typesafe Sonatype Snapshots" at "http://typesafe.artifactoryonline.com/typesafe/sonatype-snapshots/",
  "Sonatype OSS" at "http://oss.sonatype.org/content/repositories/snapshots/",
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Conjars" at "http://conjars.org/repo"
)

SimpleApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.JobConf
import org.elasticsearch.hadoop.mr.EsInputFormat


object SimpleApp {
  def main(args: Array[String]) {
    val conf = new JobConf()
    conf.set("es.resource", "demo/demo")

    val sc = new SparkContext("local", "Simple App")

    val data = sc.hadoopRDD(conf, classOf[EsInputFormat[String,String]], classOf[String], classOf[String], 6)

    data foreach println
  }
}

Here's the stacktrace

$ sbt run
[info] Loading project definition from /Users/rbraley/IdeaProjects/elasticsearch-spark-example/project
[info] Set current project to elasticsearch-spark-example (in build file:/Users/rbraley/IdeaProjects/elasticsearch-spark-example/)
[info] Updating {file:/Users/rbraley/IdeaProjects/elasticsearch-spark-example/}elasticsearch-spark-example...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Running io.traintracks.elasticsearch.spark.example.SimpleApp
14/02/24 17:28:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/02/24 17:28:00 INFO Remoting: Starting remoting
14/02/24 17:28:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.2.88:61423]
14/02/24 17:28:00 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.2.88:61423]
14/02/24 17:28:00 INFO spark.SparkEnv: Registering BlockManagerMaster
14/02/24 17:28:00 INFO storage.DiskBlockManager: Created local directory at /var/folders/rn/p2d7mh016b34qvm47jybmg380000gn/T/spark-local-20140224172800-6e2d
14/02/24 17:28:00 INFO storage.MemoryStore: MemoryStore started with capacity 890.9 MB.
14/02/24 17:28:00 INFO network.ConnectionManager: Bound socket to port 61424 with id = ConnectionManagerId(192.168.2.88,61424)
14/02/24 17:28:00 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/02/24 17:28:00 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 192.168.2.88:61424 with 890.9 MB RAM
14/02/24 17:28:00 INFO storage.BlockManagerMaster: Registered BlockManager
14/02/24 17:28:00 INFO spark.HttpServer: Starting HTTP Server
14/02/24 17:28:00 INFO server.Server: jetty-7.6.8.v20121106
14/02/24 17:28:00 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61425
14/02/24 17:28:00 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.2.88:61425
14/02/24 17:28:00 INFO spark.SparkEnv: Registering MapOutputTracker
14/02/24 17:28:00 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/rn/p2d7mh016b34qvm47jybmg380000gn/T/spark-bec0c9ef-0032-4894-a4cb-25da4a33e0b0
14/02/24 17:28:00 INFO spark.HttpServer: Starting HTTP Server
14/02/24 17:28:00 INFO server.Server: jetty-7.6.8.v20121106
14/02/24 17:28:00 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61426
14/02/24 17:28:01 INFO server.Server: jetty-7.6.8.v20121106
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/02/24 17:28:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/02/24 17:28:01 INFO ui.SparkUI: Started Spark Web UI at http://192.168.2.88:4040
2014-02-24 17:28:01.361 java[80822:6313] Unable to load realm info from SCDynamicStore
14/02/24 17:28:01 INFO storage.MemoryStore: ensureFreeSpace(32969) called with curMem=0, maxMem=934163251
14/02/24 17:28:01 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.2 KB, free 890.9 MB)
14/02/24 17:28:01 INFO mr.EsInputFormat: Discovered mapping {demo=[test=STRING]} for [demo/demo]
14/02/24 17:28:01 INFO mr.EsInputFormat: Created [5] shard-splits
14/02/24 17:28:01 INFO spark.SparkContext: Starting job: foreach at SimpleApp.scala:18
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Got job 0 (foreach at SimpleApp.scala:18) with 5 output partitions (allowLocal=false)
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Final stage: Stage 0 (foreach at SimpleApp.scala:18)
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Missing parents: List()
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at hadoopRDD at SimpleApp.scala:16), which has no missing parents
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Submitting 5 missing tasks from Stage 0 (HadoopRDD[0] at hadoopRDD at SimpleApp.scala:16)
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 5 tasks
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2705 bytes in 5 ms
14/02/24 17:28:01 INFO executor.Executor: Running task ID 0
14/02/24 17:28:01 INFO storage.BlockManager: Found block broadcast_0 locally
14/02/24 17:28:01 INFO rdd.HadoopRDD: Input split: ShardInputSplit [node=[tXu2eOa5SsS3xwBId-m81A/Magnum I|127.0.0.1:9200],shard=0]
14/02/24 17:28:01 ERROR executor.Executor: Exception in task ID 0
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.HeartBeat.<init>(HeartBeat.java:51)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.init(EsInputFormat.java:204)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.<init>(EsInputFormat.java:167)
    at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.<init>(EsInputFormat.java:328)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:449)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:66)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 2705 bytes in 0 ms
14/02/24 17:28:01 INFO executor.Executor: Running task ID 1
14/02/24 17:28:01 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/02/24 17:28:01 INFO storage.BlockManager: Found block broadcast_0 locally
14/02/24 17:28:01 INFO rdd.HadoopRDD: Input split: ShardInputSplit [node=[tXu2eOa5SsS3xwBId-m81A/Magnum I|127.0.0.1:9200],shard=1]
14/02/24 17:28:01 ERROR executor.Executor: Exception in task ID 1
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.HeartBeat.<init>(HeartBeat.java:51)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.init(EsInputFormat.java:204)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.<init>(EsInputFormat.java:167)
    at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.<init>(EsInputFormat.java:328)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:449)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:66)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 17:28:01 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.HeartBeat.<init>(HeartBeat.java:51)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.init(EsInputFormat.java:204)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.<init>(EsInputFormat.java:167)
    at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.<init>(EsInputFormat.java:328)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:449)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:66)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 17:28:01 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Ignoring update with state RUNNING from TID 1 because its task set is gone
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Ignoring update with state FAILED from TID 1 because its task set is gone
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Failed to run foreach at SimpleApp.scala:18
[error] (run-main) org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.
14/02/24 17:28:02 INFO network.ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
    at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 10 s, completed Feb 24, 2014 5:28:02 PM

costin added a commit that referenced this issue Feb 24, 2014

fallback from task attempt to task id
in case the task id information is not present, log existing props for better debugging...
relates #148
@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

Hi,

The error stems due to the lack of a hadoop property that we use to identify the task running. I'm not sure whether Spark sets this or not - it looks like it sets internally when writing to Hadoop but I'm not clear about it being set when reading from Hadoop.

The master tries a different property and additionally gives a long error message with all the properties. Can you please post/upload the properties somewhere so I can take a better look at them?

Thanks.

@fedesilva

This comment has been minimized.

Copy link

fedesilva commented Feb 24, 2014

Here is a properties dump from my environment as described in #144

14/02/24 16:55:58 ERROR EsInputFormat: Cannot determine task id - current properties are {mapred.task.cache.levels=2, ha.failover-controller.cli-check.rpc-timeout.ms=20000, mapred.job.restart.recover=true, ipc.client.connect.max.retries.on.timeouts=45, map.sort.class=org.apache.hadoop.util.QuickSort, hadoop.tmp.dir=/tmp/hadoop-${user.name}, es.internal.mr.target.resource=logstash-batanga-radio-2014.02.18/logs, ha.health-monitor.check-interval.ms=1000, ipc.client.idlethreshold=4000, mapred.system.dir=${hadoop.tmp.dir}/mapred/system, kfs.blocksize=67108864, fs.trash.checkpoint.interval=0, mapred.job.tracker.persist.jobstatus.hours=0, io.skip.checksum.errors=false, mapred.cluster.reduce.memory.mb=-1, mapred.child.tmp=./tmp, es.internal.es.version=0.90.5, mapred.skip.reduce.max.skip.groups=0, mapred.heartbeats.in.second=100, mapred.jobtracker.instrumentation=org.apache.hadoop.mapred.JobTrackerMetricsInst, mapred.tasktracker.dns.nameserver=default, fs.defaultFS=file:///, io.sort.factor=10, mapred.task.timeout=600000, mapred.max.tracker.failures=4, hadoop.rpc.socket.factory.class.default=org.apache.hadoop.net.StandardSocketFactory, mapred.job.tracker.jobhistory.lru.cache.size=5, kfs.replication=3, mapred.skip.map.auto.incr.proc.count=true, mapreduce.job.complete.cancel.delegation.tokens=true, io.mapfile.bloom.size=1048576, hadoop.rpc.protection=authentication, es.query=

{      
  "query": {
     "match_all": {}
  },
  "fields": [
    "btng.listenerdjid"
  ]
}
, mapreduce.reduce.shuffle.connect.timeout=180000, hadoop.ssl.require.client.cert=false, hadoop.skip.worker.version.check=false, tasktracker.http.threads=40, mapred.job.shuffle.merge.percent=0.66, io.bytes.per.checksum=512, mapred.output.compress=false, mapred.healthChecker.script.timeout=600000, file.stream-buffer-size=4096, ha.failover-controller.new-active.rpc-timeout.ms=60000, mapred.reduce.slowstart.completed.maps=0.05, mapred.reduce.max.attempts=4, es.ser.reader.value.class=org.elasticsearch.hadoop.mr.WritableValueReader, ha.zookeeper.acl=world:anyone:rwcda, mapreduce.ifile.readahead.bytes=4194304, fs.ftp.host.port=21, mapred.skip.map.max.skip.records=0, kfs.client-write-packet-size=65536, kfs.bytes-per-checksum=512, mapred.cluster.map.memory.mb=-1, hadoop.security.group.mapping=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback, hadoop.ssl.keystores.factory.class=org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory, s3.replication=3, net.topology.node.switch.mapping.impl=org.apache.hadoop.net.ScriptBasedMapping, mapred.job.tracker.persist.jobstatus.dir=/jobtracker/jobsInfo, fs.s3.buffer.dir=${hadoop.tmp.dir}/s3, job.end.retry.attempts=0, s3native.bytes-per-checksum=512, mapred.local.dir.minspacestart=0, mapred.output.compression.type=RECORD, s3.client-write-packet-size=65536, io.mapfile.bloom.error.rate=0.005, ftp.bytes-per-checksum=512, mapred.cluster.max.reduce.memory.mb=-1, mapred.max.tracker.blacklists=4, mapred.task.profile.maps=0-2, hadoop.security.group.mapping.ldap.search.attr.group.name=cn, mapred.userlog.retain.hours=24, ha.health-monitor.rpc-timeout.ms=45000, mapred.job.tracker.persist.jobstatus.active=false, hadoop.security.authorization=false, local.cache.size=10737418240, s3.bytes-per-checksum=512, mapreduce.shuffle.ssl.enabled=${hadoop.ssl.enabled}, mapred.min.split.size=0, mapred.map.tasks=2, mapred.child.java.opts=-Xmx200m, mapred.map.child.log.level=INFO, mapred.job.queue.name=default, mapred.job.tracker.retiredjobs.cache.size=1000, ipc.server.listen.queue.size=128, mapred.inmem.merge.threshold=1000, job.end.retry.interval=30000, mapred.skip.attempts.to.start.skipping=2, s3native.blocksize=67108864, mapred.reduce.tasks=1, mapred.merge.recordsBeforeProgress=10000, mapred.userlog.limit.kb=0, file.replication=1, mapred.job.reduce.memory.mb=-1, ftp.client-write-packet-size=65536, hadoop.work.around.non.threadsafe.getpwuid=false, mapred.job.shuffle.input.buffer.percent=0.70, io.sort.spill.percent=0.80, mapreduce.shuffle.ssl.port=50443, hadoop.http.staticuser.user=dr.who, mapred.map.tasks.speculative.execution=true, hadoop.http.authentication.type=simple, hadoop.util.hash.type=murmur, hadoop.security.instrumentation.requires.admin=false, mapred.map.max.attempts=4, mapreduce.job.acl-view-job= , mapreduce.ifile.readahead=true, io.map.index.interval=128, mapred.job.tracker.handler.count=10, mapreduce.reduce.shuffle.read.timeout=180000, mapred.tasktracker.expiry.interval=600000, hadoop.ssl.client.conf=ssl-client.xml, mapred.reduce.child.log.level=INFO, mapred.jobtracker.maxtasks.per.job=-1, mapred.jobtracker.job.history.block.size=3145728, keep.failed.task.files=false, hadoop.kerberos.kinit.command=kinit, ipc.client.tcpnodelay=false, mapred.task.profile.reduces=0-2, fs.AbstractFileSystem.hdfs.impl=org.apache.hadoop.fs.Hdfs, mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec, io.map.index.skip=0, hadoop.http.authentication.token.validity=36000, ipc.server.tcpnodelay=false, hadoop.jetty.logs.serve.aliases=true, ftp.replication=3, ha.failover-controller.graceful-fence.connection.retries=1, jobclient.progress.monitor.poll.interval=1000, ha.health-monitor.sleep-after-disconnect.ms=1000, es.resource=logstash-batanga-radio-2014.02.18/logs, mapred.job.map.memory.mb=-1, file.client-write-packet-size=65536, mapred.reduce.tasks.speculative.execution=true, fs.AbstractFileSystem.viewfs.impl=org.apache.hadoop.fs.viewfs.ViewFs, hadoop.security.group.mapping.ldap.search.filter.group=(objectClass=group), mapreduce.tasktracker.outofband.heartbeat=false, mapreduce.reduce.input.limit=-1, fs.s3n.block.size=67108864, net.topology.script.number.args=100, dfs.ha.fencing.ssh.connect-timeout=30000, hadoop.security.authentication=simple, tfile.fs.output.buffer.size=262144, mapred.job.reuse.jvm.num.tasks=1, mapred.jobtracker.completeuserjobs.maximum=100, hadoop.security.groups.cache.secs=300, ha.failover-controller.graceful-fence.rpc-timeout.ms=5000, fs.AbstractFileSystem.file.impl=org.apache.hadoop.fs.local.LocalFs, mapred.task.tracker.task-controller=org.apache.hadoop.mapred.DefaultTaskController, ha.health-monitor.connect-retry-interval.ms=1000, kfs.stream-buffer-size=4096, fs.s3.maxRetries=4, mapred.cluster.max.map.memory.mb=-1, file.blocksize=67108864, mapreduce.reduce.shuffle.maxfetchfailures=10, fs.ftp.host=0.0.0.0, file.bytes-per-checksum=512, ha.zookeeper.parent-znode=/hadoop-ha, mapreduce.job.acl-modify-job= , mapred.local.dir=${hadoop.tmp.dir}/mapred/local, fs.s3.sleepTimeSeconds=10, fs.trash.interval=0, mapred.submit.replication=10, hadoop.relaxed.worker.version.check=true, mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec, mapred.tasktracker.dns.interface=default, ftp.stream-buffer-size=4096, mapred.job.tracker=local, hadoop.http.authentication.signature.secret.file=${user.home}/hadoop-http-auth-signature-secret, io.seqfile.sorter.recordlimit=1000000, s3.blocksize=67108864, mapreduce.tasktracker.cache.local.numberdirectories=10000, mapred.jobtracker.taskScheduler=org.apache.hadoop.mapred.JobQueueTaskScheduler, mapred.line.input.format.linespermap=1, fs.permissions.umask-mode=022, mapred.tasktracker.instrumentation=org.apache.hadoop.mapred.TaskTrackerMetricsInst, hadoop.ssl.server.conf=ssl-server.xml, mapreduce.jobtracker.split.metainfo.maxsize=10000000, jobclient.completion.poll.interval=5000, mapred.local.dir.minspacekill=0, s3native.stream-buffer-size=4096, io.sort.record.percent=0.05, hadoop.http.authentication.kerberos.principal=HTTP/_HOST@LOCALHOST, mapred.temp.dir=${hadoop.tmp.dir}/mapred/temp, mapred.tasktracker.reduce.tasks.maximum=2, mapred.tasktracker.tasks.sleeptime-before-sigkill=5000, mapred.job.reduce.input.buffer.percent=0.0, mapred.tasktracker.indexcache.mb=10, es.internal.hosts=, hadoop.security.group.mapping.ldap.search.filter.user=(&(objectClass=user)(sAMAccountName={0})), fs.automatic.close=true, mapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.MapTask$MapOutputBuffer, mapred.skip.reduce.auto.incr.proc.count=true, s3.stream-buffer-size=4096, ha.zookeeper.session-timeout.ms=5000, io.seqfile.compress.blocksize=1000000, hadoop.http.filter.initializers=org.apache.hadoop.http.lib.StaticUserWebFilter, fs.s3.block.size=67108864, mapred.tasktracker.taskmemorymanager.monitoring-interval=5000, hadoop.http.authentication.simple.anonymous.allowed=true, mapred.acls.enabled=false, mapred.queue.default.state=RUNNING, mapreduce.jobtracker.staging.root.dir=${hadoop.tmp.dir}/mapred/staging, ftp.blocksize=67108864, mapreduce.shuffle.ssl.address=0.0.0.0, mapred.queue.names=default, mapred.task.tracker.http.address=0.0.0.0:50060, mapred.disk.healthChecker.interval=60000, mapred.reduce.parallel.copies=5, io.seqfile.lazydecompress=true, hadoop.common.configuration.version=0.23.0, hadoop.ssl.enabled=false, hadoop.security.group.mapping.ldap.search.attr.member=member, io.sort.mb=100, ipc.client.connection.maxidletime=10000, mapred.task.tracker.report.address=127.0.0.1:0, mapred.compress.map.output=false, hadoop.security.uid.cache.secs=14400, mapred.healthChecker.interval=60000, ipc.client.kill.max=10, ipc.client.connect.max.retries=10, io.seqfile.local.dir=${hadoop.tmp.dir}/io/local, mapred.user.jobconf.limit=5242880, mapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapred.ReduceTask$ReduceCopier, io.native.lib.available=true, mapred.job.tracker.http.address=0.0.0.0:50030, io.file.buffer.size=4096, mapred.jobtracker.restart.recover=false, io.serializations=org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization, tfile.fs.input.buffer.size=262144, mapred.task.profile=false, hadoop.security.group.mapping.ldap.ssl=false, jobclient.output.filter=FAILED, fs.df.interval=60000, s3native.client-write-packet-size=65536, hadoop.http.authentication.kerberos.keytab=${user.home}/hadoop.keytab, s3native.replication=3, mapred.tasktracker.map.tasks.maximum=2, tfile.io.chunk.size=1048576, hadoop.ssl.hostname.verifier=DEFAULT}

costin added a commit that referenced this issue Feb 24, 2014

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

Thanks.
I've pushed a fix to master which ignores the task id in heartbeat (since it's an optional thing). However it looks to me like a bug since several Hadoop properties should be present but they're not.
I'll probably raise this on the Spark mailing list at some point (if you guys do that in the meantime it would be even better).

Either way, please try the latest master and let me know how it goes. Thanks!

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

@fedesilva @rbraley guys, if you are available in 2h or so maybe we can connect on IRC to speed things up?
Just in case there are still bugs, we can chat directly and I'll do my best to fix the issues as they appear.

Let me know if that works for you guys.

@fedesilva

This comment has been minimized.

Copy link

fedesilva commented Feb 24, 2014

The last master fixes the NPE. I have succesfully iterated over an RDD.

I am now getting this, that does not seem to abort the tasks.

14/02/24 18:47:56 WARN HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.ReportingUtils.report(ReportingUtils.java:38)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.close(EsInputFormat.java:274)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:174)
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
    at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
    at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
@fedesilva

This comment has been minimized.

Copy link

fedesilva commented Feb 24, 2014

Regarding IRC: Ok, I'm online in elasticsearch IRC channel right now as fedesilva and will be available until 23 GMT.
I am GMT -3 (Montevideo).

costin added a commit that referenced this issue Feb 24, 2014

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

Can you please try the latest master. This should address the NPE (since the reporting facility seems to be disabled).

On 2/24/2014 8:53 PM, federico silva wrote:

The last master fixes the NPE. I have succesfully iterated over an RDD.

I am now getting this, that does not seem to abort the tasks.

|14/02/24 18:47:56 WARN HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
at org.elasticsearch.hadoop.mr.ReportingUtils.report(ReportingUtils.java:38)
at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.close(EsInputFormat.java:274)
at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:174)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
|


Reply to this email directly or view it on GitHub
#148 (comment).

Costin

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

Cool - see you in a bit (my nick is typically costin) :)

On 2/24/2014 8:57 PM, federico silva wrote:

Regarding IRC: Ok, I'm online in es IRC right now as fedesilva and will be available until 23 GMT.
I am GMT -3 (Montevideo).


Reply to this email directly or view it on GitHub
#148 (comment).

Costin

@fedesilva

This comment has been minimized.

Copy link

fedesilva commented Feb 24, 2014

Ok, see you soon.

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

As this is fixed, I'm closing the issue. Guys, please continue creating issues if you encounter any problems but do mention the new issue under #151

@rbraley

This comment has been minimized.

Copy link
Author

rbraley commented Feb 25, 2014

Hi guys #152 is also related to this. I will hang in irc as rbraley for a
while to try to resolve issues together.

On Tue, Feb 25, 2014 at 3:25 AM, Costin Leau notifications@github.comwrote:

Closed #148#148
.

Reply to this email directly or view it on GitHubhttps://github.com//issues/148
.

Ryan Braley | Founder
http://traintracks.io/ http://www.traintracks.io/

US: +1 (206) 866 5661
CN: +86 156 1153 7598
Coding the future. Decoding the game.

costin added a commit that referenced this issue Apr 8, 2014

fallback from task attempt to task id
in case the task id information is not present, log existing props for better debugging...
relates #148

costin added a commit that referenced this issue Apr 8, 2014

costin added a commit that referenced this issue Apr 8, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.