diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java index 3eb68a8ff..5d9b0688b 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java @@ -308,12 +308,14 @@ public int compare(FileStatus f1, FileStatus f2) { String countersPathString = props.getProperty(CAMUS_COUNTERS_PATH); if (countersPathString != null) { - Path countersPath = new Path(countersPathString); + Path countersDir = new Path(countersPathString); + if (!fs.exists(countersDir)) + fs.mkdirs(countersDir); + Path countersPath = new Path(countersPathString, "counters.json"); fs.delete(countersPath, true); - fs.mkdirs(countersPath); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( - fs.create(new Path(countersPath, "counters.json")))); + fs.create(countersPath))); writer.write(jsonData.toJSONString()); writer.close(); } diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java index 0d53eac6e..d3be650b1 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java @@ -4,6 +4,7 @@ import com.linkedin.camus.coders.MessageDecoder; import com.linkedin.camus.etl.kafka.CamusJob; import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory; +import com.linkedin.camus.etl.kafka.common.DateUtils; import com.linkedin.camus.etl.kafka.common.EtlKey; import com.linkedin.camus.etl.kafka.common.EtlRequest; import com.linkedin.camus.etl.kafka.common.ExceptionWritable; @@ -25,6 +26,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; public class EtlRecordReader extends RecordReader { private static final String PRINT_MAX_DECODER_EXCEPTIONS = "max.decoder.exceptions.to.print"; @@ -308,6 +310,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException { mapperContext.getCounter("total", "decode-time(ms)").increment(decodeTime); + // timestamp of the nearest hour in our configuration + long datePartition = DateUtils.getPartition(EtlMultiOutputFormat.getEtlOutputFileTimePartitionMins(context) * 60000L, timeStamp); + // more readable form + String datePartitionString = new DateTime(datePartition, DateTimeZone.UTC).toString("YYYY/MM/dd/HH"); + + mapperContext.getCounter("total", datePartition + "_" + datePartitionString).increment(1); + if (reader != null) { mapperContext.getCounter("total", "request-time(ms)").increment( reader.getFetchTime());