Problem in output.format #132

Closed
SudipSinha opened this Issue Sep 21, 2012 · 2 comments

2 participants

@SudipSinha

I tried running this simple code. [Here "iris.csv" can be replaced by any csv file in the HDFS.]

Code

mapreduce(
  input = "iris.csv", output = NULL,
  input.format  = make.input.format("csv", sep=","),
  output.format = make.output.format("csv", sep=","),
  vectorized = TRUE, structured = TRUE
)

The code exited with error code 1. I've put the logs below.
If I remove the output.format argument, the code works.
I'm confused that such a simple code is not working for me.

My system configuration is as follows:
CentOS; 8GB RAM; CDH3u4; rmr v1.3.1

Runtime Output

packageJobJar: [/tmp/Rtmpsjr7nh/rmr-local-env, /tmp/Rtmpsjr7nh/rmr-global-env, /tmp/Rtmpsjr7nh/rhstr.mapf673b9d9ebd, /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/hadoop-unjar7199379222409918582/] [] /tmp/streamjob7688681986774163414.jar tmpDir=null
12/09/21 11:33:45 WARN snappy.LoadSnappy: Snappy native library is available
12/09/21 11:33:45 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/21 11:33:45 INFO snappy.LoadSnappy: Snappy native library loaded
12/09/21 11:33:45 INFO mapred.FileInputFormat: Total input paths to process : 1
12/09/21 11:33:45 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local]
12/09/21 11:33:45 INFO streaming.StreamJob: Running job: job_201209211122_0003
12/09/21 11:33:45 INFO streaming.StreamJob: To kill this job, run:
12/09/21 11:33:45 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201209211122_0003
12/09/21 11:33:45 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201209211122_0003
12/09/21 11:33:46 INFO streaming.StreamJob: map 0% reduce 0%
12/09/21 11:34:09 INFO streaming.StreamJob: map 100% reduce 100%
12/09/21 11:34:09 INFO streaming.StreamJob: To kill this job, run:
12/09/21 11:34:09 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201209211122_0003
12/09/21 11:34:09 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201209211122_0003
12/09/21 11:34:09 ERROR streaming.StreamJob: Job not successful. Error: NA
12/09/21 11:34:09 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
Error in mr(map = map, reduce = reduce, combine = combine, in.folder = if (is.list(input)) { :
hadoop streaming failed with error code 1

Task logs from Job tracker

stdout logs

stderr logs

Loading required package: Rcpp
Loading required package: RJSONIO
Loading required package: methods
Loading required package: itertools
Loading required package: iterators
Loading required package: digest
Error in data.frame(..., check.names = FALSE) :
arguments imply differing number of rows: 0, 75
Calls: ... write.table -> is.data.frame -> cbind -> cbind -> data.frame
Execution halted
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
at org.apache.hadoop.mapred.Child.main(Child.java:264)

syslog logs

2012-09-21 11:37:50,319 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2012-09-21 11:37:50,438 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/rmr-local-env <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/rmr-local-env
2012-09-21 11:37:50,447 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/.job.jar.crc <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/.job.jar.crc
2012-09-21 11:37:50,448 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/job.jar <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/job.jar
2012-09-21 11:37:50,449 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/rmr-global-env <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/rmr-global-env
2012-09-21 11:37:50,451 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/rhstr.mapf67208bd2e5 <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/rhstr.mapf67208bd2e5
2012-09-21 11:37:50,516 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=MAP, sessionId=
2012-09-21 11:37:50,648 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0
2012-09-21 11:37:50,651 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@6244b0f
2012-09-21 11:37:50,773 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library is available
2012-09-21 11:37:50,773 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library loaded
2012-09-21 11:37:50,780 INFO org.apache.hadoop.mapred.MapTask: numReduceTasks: 1
2012-09-21 11:37:50,786 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2012-09-21 11:37:50,811 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
2012-09-21 11:37:50,811 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
2012-09-21 11:37:50,838 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2012-09-21 11:37:50,839 INFO org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/Rscript, rhstr.mapf67208bd2e5]
2012-09-21 11:37:50,859 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2012-09-21 11:37:50,860 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2012-09-21 11:37:51,566 INFO org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2012-09-21 11:37:51,567 INFO org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed!
2012-09-21 11:37:51,592 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-09-21 11:37:51,595 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
2012-09-21 11:37:51,598 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task

@piccolbo

Thanks for the thorough report, that gave me a very good starting point. This is a bug in rmr. The problem is that when the structured option is on and the key is NULL, the conversion to data frame routine returns a 0x0 data frame instead of just NULL. The output format tries to convert k and v to data frame (they are already in this case) and the does a cbind before writing out and that fails because the two data frames have different numbers of rows (recycling is not applied uniformly in R and it would fail in this case anyway). So the workarounds are:

  1. Upgrade to rmr2. Unless you are in production, it seems like a good idea. It's available for download albeit marked experimental or from git as rmr-2.0 branch. It's a lot better for structured data, no question, but the API is not backwards compatible, some porting is required. I ported all my examples and I don't have any scars to show for it, but if you have lots of code it may take some time. By they way all my examples are shorter, faster and more readable. I verified that rmr2 doesn't have this problem.
  2. Make sure the key being written out is not NULL, writing your own mapper or reducer, whichever is last.
  3. Write your own output format that can handle that. This is what we have now

    csv.output.format = function(...) function(k, v, con, vectorized) 
    write.table(file = con, 
                x = if(is.null(k)) v else cbind(k,v), 
                ..., 
                row.names = FALSE, 
                col.names = FALSE)
    

    just replace that if with a more detailed check

  4. wait for me to issue a hotfix. With the next release imminent and some workarounds available, I am not sure we are going to do it. Will get back to you on this one.

I hope this helps

Antonio

@piccolbo

rmr-2.0.2 is now the stable release

@piccolbo piccolbo closed this Dec 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment