|
18 | 18 | */
|
19 | 19 | package org.elasticsearch.hadoop.mr;
|
20 | 20 |
|
| 21 | +import org.apache.hadoop.mapred.Counters.Counter; |
21 | 22 | import org.apache.hadoop.mapred.Reporter;
|
22 | 23 | import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
23 | 24 | import org.apache.hadoop.util.Progressable;
|
|
26 | 27 | class ReportingUtils {
|
27 | 28 |
|
28 | 29 | // handles Hadoop 'old' and 'new' API reporting classes, namely {@link Reporter} and {@link TaskInputOutputContext}
|
29 |
| - @SuppressWarnings({ "rawtypes", "unchecked" }) |
| 30 | + @SuppressWarnings({ "rawtypes" }) |
30 | 31 | static void report(Progressable progressable, Stats stats) {
|
31 |
| - if (progressable == null) { |
| 32 | + if (progressable == null || progressable == Reporter.NULL) { |
32 | 33 | return;
|
33 | 34 | }
|
34 | 35 |
|
35 | 36 | if (progressable instanceof Reporter) {
|
36 | 37 | Reporter reporter = (Reporter) progressable;
|
37 | 38 |
|
38 |
| - reporter.getCounter(Counters.BYTES_WRITTEN).increment(stats.bytesWritten); |
39 |
| - reporter.getCounter(Counters.BYTES_READ).increment(stats.bytesRead); |
40 |
| - reporter.getCounter(Counters.DOCS_WRITTEN).increment(stats.docsWritten); |
41 |
| - reporter.getCounter(Counters.BULK_WRITES).increment(stats.bulkWrites); |
42 |
| - reporter.getCounter(Counters.DOCS_RETRIED).increment(stats.docsRetried); |
43 |
| - reporter.getCounter(Counters.BULK_RETRIES).increment(stats.bulkRetries); |
44 |
| - reporter.getCounter(Counters.DOCS_READ).increment(stats.docsRead); |
45 |
| - reporter.getCounter(Counters.NODE_RETRIES).increment(stats.nodeRetries); |
46 |
| - reporter.getCounter(Counters.NET_RETRIES).increment(stats.netRetries); |
| 39 | + oldApiCounter(reporter, Counters.BYTES_WRITTEN, stats.bytesWritten); |
| 40 | + oldApiCounter(reporter, Counters.BYTES_READ, stats.bytesRead); |
| 41 | + oldApiCounter(reporter, Counters.DOCS_WRITTEN, stats.docsWritten); |
| 42 | + oldApiCounter(reporter, Counters.BULK_WRITES, stats.bulkWrites); |
| 43 | + oldApiCounter(reporter, Counters.DOCS_RETRIED, stats.docsRetried); |
| 44 | + oldApiCounter(reporter, Counters.BULK_RETRIES, stats.bulkRetries); |
| 45 | + oldApiCounter(reporter, Counters.DOCS_READ, stats.docsRead); |
| 46 | + oldApiCounter(reporter, Counters.NODE_RETRIES, stats.nodeRetries); |
| 47 | + oldApiCounter(reporter, Counters.NET_RETRIES, stats.netRetries); |
47 | 48 | }
|
48 | 49 |
|
49 | 50 | if (progressable instanceof TaskInputOutputContext) {
|
50 | 51 | TaskInputOutputContext tioc = (TaskInputOutputContext) progressable;
|
51 | 52 |
|
52 |
| - tioc.getCounter(Counters.BYTES_WRITTEN).increment(stats.bytesWritten); |
53 |
| - tioc.getCounter(Counters.BYTES_READ).increment(stats.bytesRead); |
54 |
| - tioc.getCounter(Counters.DOCS_WRITTEN).increment(stats.docsWritten); |
55 |
| - tioc.getCounter(Counters.BULK_WRITES).increment(stats.bulkWrites); |
56 |
| - tioc.getCounter(Counters.DOCS_RETRIED).increment(stats.docsRetried); |
57 |
| - tioc.getCounter(Counters.BULK_RETRIES).increment(stats.bulkRetries); |
58 |
| - tioc.getCounter(Counters.DOCS_READ).increment(stats.docsRead); |
59 |
| - tioc.getCounter(Counters.NODE_RETRIES).increment(stats.nodeRetries); |
60 |
| - tioc.getCounter(Counters.NET_RETRIES).increment(stats.netRetries); |
| 53 | + newApiCounter(tioc, Counters.BYTES_WRITTEN, stats.bytesWritten); |
| 54 | + newApiCounter(tioc, Counters.BYTES_READ, stats.bytesRead); |
| 55 | + newApiCounter(tioc, Counters.DOCS_WRITTEN, stats.docsWritten); |
| 56 | + newApiCounter(tioc, Counters.BULK_WRITES, stats.bulkWrites); |
| 57 | + newApiCounter(tioc, Counters.DOCS_RETRIED, stats.docsRetried); |
| 58 | + newApiCounter(tioc, Counters.BULK_RETRIES, stats.bulkRetries); |
| 59 | + newApiCounter(tioc, Counters.DOCS_READ, stats.docsRead); |
| 60 | + newApiCounter(tioc, Counters.NODE_RETRIES, stats.nodeRetries); |
| 61 | + newApiCounter(tioc, Counters.NET_RETRIES, stats.netRetries); |
| 62 | + } |
| 63 | + } |
| 64 | + |
| 65 | + private static void oldApiCounter(Reporter reporter, Enum<?> counter, long value) { |
| 66 | + Counter c = reporter.getCounter(counter); |
| 67 | + if (c != null) { |
| 68 | + c.increment(value); |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + @SuppressWarnings("unchecked") |
| 73 | + private static void newApiCounter(TaskInputOutputContext tioc, Enum<?> counter, long value) { |
| 74 | + org.apache.hadoop.mapreduce.Counter c = tioc.getCounter(counter); |
| 75 | + if (c != null) { |
| 76 | + c.increment(value); |
61 | 77 | }
|
62 | 78 | }
|
63 | 79 | }
|
0 commit comments