From 986262d218726aa25854df5bb9ab5f7737b067d4 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 26 May 2017 21:11:18 +0800 Subject: [PATCH 1/3] enrich MAP_REDUCE_JOB_STREAM --- .../jpm/analyzer/mr/rpc/JobRpcEvaluator.java | 38 ++++++++++--- .../JobRpcAnalysisAPIEntity.java | 55 +++++++++++++++++++ .../JobRpcAnalysisStreamPublisher.java | 5 ++ ...istory.MRHistoryJobApplicationProvider.xml | 20 +++++++ 4 files changed, 109 insertions(+), 9 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java index 457f0c5d9e..ed69ce99e8 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java @@ -54,8 +54,12 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { long reduceStartTime = Long.MAX_VALUE; long reduceEndTime = 0; + double totalMapTime = 0; + double totalReduceTime = 0; + for (TaskExecutionAPIEntity task : entity.getTasksMap().values()) { if (task.getTags().get(TASK_TYPE.toString()).equalsIgnoreCase(Constants.TaskType.MAP.toString())) { + totalMapTime += task.getDuration(); if (mapStartTime > task.getStartTime()) { mapStartTime = task.getStartTime(); } @@ -63,6 +67,7 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { mapEndTime = task.getEndTime(); } } else { + totalReduceTime += task.getDuration(); if (reduceStartTime > task.getStartTime()) { reduceStartTime = task.getStartTime(); } @@ -83,19 +88,32 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { analysisAPIEntity.setTags(tags); analysisAPIEntity.setTimestamp(entity.getStartTime()); analysisAPIEntity.setTrackingUrl(entity.getTrackingUrl()); + analysisAPIEntity.setDuration(entity.getDurationTime()); + analysisAPIEntity.setNumTotalMaps(entity.getTotalMaps()); + analysisAPIEntity.setNumTotalReduces(entity.getTotalReduces()); double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 : (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000); - double mapOpsPerSecond = (entity.getTotalMaps() == 0) ? 0 : - totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000); - double reduceOpsPerSecond = (entity.getTotalReduces() == 0) ? 0 : - totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000); - + double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces()); - double avgOpsPerMap = (entity.getTotalMaps() == 0) ? 0 : - totalMapHdfsOps / entity.getTotalMaps(); - double avgOpsPerReduce = (entity.getTotalReduces() == 0) ? 0 : - totalReduceHdfsOps / entity.getTotalReduces(); + + double avgOpsPerMap = 0; + double avgMapTime = 0; + double avgOpsPerReduce = 0; + double avgReduceTime = 0; + double mapOpsPerSecond = 0; + double reduceOpsPerSecond = 0; + + if (entity.getTotalMaps() > 0) { + avgMapTime = totalMapTime / entity.getTotalMaps(); + avgOpsPerMap = totalMapHdfsOps / entity.getTotalMaps(); + mapOpsPerSecond = totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000); + } + if (entity.getTotalReduces() > 0) { + avgReduceTime = totalReduceTime / entity.getTotalReduces(); + avgOpsPerReduce = totalReduceHdfsOps / entity.getTotalReduces(); + reduceOpsPerSecond = totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000); + } analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond); analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond); @@ -103,6 +121,8 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { analysisAPIEntity.setAvgOpsPerTask(avgOpsPerTask); analysisAPIEntity.setAvgOpsPerMap(avgOpsPerMap); analysisAPIEntity.setAvgOpsPerReduce(avgOpsPerReduce); + analysisAPIEntity.setAvgMapTime(avgMapTime); + analysisAPIEntity.setAvgReduceTime(avgReduceTime); Result.EvaluatorResult result = new Result.EvaluatorResult(); result.addProcessorEntity(JobRpcEvaluator.class, analysisAPIEntity); diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java index 6c0e539215..ec04286c1f 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java @@ -51,6 +51,16 @@ public class JobRpcAnalysisAPIEntity extends TaggedLogAPIEntity { private double avgOpsPerMap; @Column("h") private double avgOpsPerReduce; + @Column("i") + private double avgMapTime; + @Column("j") + private double avgReduceTime; + @Column("k") + private int numTotalMaps; + @Column("l") + private int numTotalReduces; + @Column("m") + private long duration; public String getTrackingUrl() { return trackingUrl; @@ -124,5 +134,50 @@ public void setAvgOpsPerReduce(double avgOpsPerReduce) { valueChanged("avgOpsPerReduce"); } + public double getAvgMapTime() { + return avgMapTime; + } + + public void setAvgMapTime(double avgMapTime) { + this.avgMapTime = avgMapTime; + valueChanged("avgMapTime"); + } + + public double getAvgReduceTime() { + return avgReduceTime; + } + + public void setAvgReduceTime(double avgReduceTime) { + this.avgReduceTime = avgReduceTime; + valueChanged("avgReduceTime"); + } + + public int getNumTotalMaps() { + return numTotalMaps; + } + + public void setNumTotalMaps(int numTotalMaps) { + this.numTotalMaps = numTotalMaps; + valueChanged("numTotalMaps"); + } + + public int getNumTotalReduces() { + return numTotalReduces; + } + + public void setNumTotalReduces(int numTotalReduces) { + this.numTotalReduces = numTotalReduces; + valueChanged("numTotalReduces"); + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + valueChanged("duration"); + } + } diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java index 3b91fbfea7..5549b56aeb 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java @@ -47,6 +47,11 @@ public void flush(JobRpcAnalysisAPIEntity entity) { fields.put("avgOpsPerMap", entity.getAvgOpsPerMap()); fields.put("avgOpsPerReduce", entity.getAvgOpsPerReduce()); fields.put("currentState", entity.getCurrentState()); + fields.put("numTotalMaps", entity.getNumTotalMaps()); + fields.put("numTotalReduces", entity.getNumTotalReduces()); + fields.put("duration", entity.getDuration()); + fields.put("avgMapTime", entity.getAvgMapTime()); + fields.put("avgReduceTime", entity.getAvgReduceTime()); collector.collect(stormStreamId, new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields)); } diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml index 01c5e59278..90c002a3e7 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -225,6 +225,26 @@ double 0.0 + + avgMapTime + double + + + avgReduceTime + double + + + numTotalMaps + int + + + numTotalReduces + int + + + duration + long + currentState string From 023fc8738980db35c73c3dda19387507372b0043 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 26 May 2017 21:14:03 +0800 Subject: [PATCH 2/3] update --- .../org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java index ed69ce99e8..a8e02b1325 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java @@ -91,6 +91,7 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { analysisAPIEntity.setDuration(entity.getDurationTime()); analysisAPIEntity.setNumTotalMaps(entity.getTotalMaps()); analysisAPIEntity.setNumTotalReduces(entity.getTotalReduces()); + analysisAPIEntity.setCurrentState(entity.getCurrentState()); double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 : (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000); From 8a68c2ed46d35abffbd1e2c1266e99fff28ba8e3 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 26 May 2017 21:26:11 +0800 Subject: [PATCH 3/3] fix checkStyle bug --- .../eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java index a8e02b1325..86ad2c1400 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java @@ -93,11 +93,6 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { analysisAPIEntity.setNumTotalReduces(entity.getTotalReduces()); analysisAPIEntity.setCurrentState(entity.getCurrentState()); - double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 : - (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000); - - double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces()); - double avgOpsPerMap = 0; double avgMapTime = 0; double avgOpsPerReduce = 0; @@ -116,6 +111,11 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity entity) { reduceOpsPerSecond = totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000); } + double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 : + (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000); + + double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces()); + analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond); analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond); analysisAPIEntity.setReduceOpsPerSecond(reduceOpsPerSecond);