Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Camus] date partition the counters for number of records read; #3

Merged
merged 1 commit into from
Aug 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,14 @@ public int compare(FileStatus f1, FileStatus f2) {

String countersPathString = props.getProperty(CAMUS_COUNTERS_PATH);
if (countersPathString != null) {
Path countersPath = new Path(countersPathString);
Path countersDir = new Path(countersPathString);
if (!fs.exists(countersDir))
fs.mkdirs(countersDir);
Path countersPath = new Path(countersPathString, "counters.json");
fs.delete(countersPath, true);
fs.mkdirs(countersPath);

BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
fs.create(new Path(countersPath, "counters.json"))));
fs.create(countersPath)));
writer.write(jsonData.toJSONString());
writer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.camus.coders.MessageDecoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory;
import com.linkedin.camus.etl.kafka.common.DateUtils;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.EtlRequest;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
Expand All @@ -25,6 +26,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

public class EtlRecordReader extends RecordReader<EtlKey, CamusWrapper> {
private static final String PRINT_MAX_DECODER_EXCEPTIONS = "max.decoder.exceptions.to.print";
Expand Down Expand Up @@ -308,6 +310,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException {

mapperContext.getCounter("total", "decode-time(ms)").increment(decodeTime);

// timestamp of the nearest hour in our configuration
long datePartition = DateUtils.getPartition(EtlMultiOutputFormat.getEtlOutputFileTimePartitionMins(context) * 60000L, timeStamp);
// more readable form
String datePartitionString = new DateTime(datePartition, DateTimeZone.UTC).toString("YYYY/MM/dd/HH");

mapperContext.getCounter("total", datePartition + "_" + datePartitionString).increment(1);

if (reader != null) {
mapperContext.getCounter("total", "request-time(ms)").increment(
reader.getFetchTime());
Expand Down