From 4732f8d2fbc2f5b644d49ce6ac7027effcc5531c Mon Sep 17 00:00:00 2001 From: Abhinay Nagpal Date: Fri, 28 Sep 2012 11:10:16 -0700 Subject: [PATCH] Added retry logic to fetcher --- .../store/readonly/fetcher/HdfsFetcher.java | 92 +++++++++++-------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java index 74fcc8edfc..2e5f8ea193 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java @@ -68,6 +68,7 @@ public class HdfsFetcher implements FileFetcher { private EventThrottler throttler = null; private long minBytesPerSecond = 0; private DynamicThrottleLimit globalThrottleLimit = null; + private static final int NUM_RETRIES = 3; public HdfsFetcher(VoldemortConfig config) { this(config.getMaxBytesPerSecond(), @@ -282,46 +283,63 @@ private void copyFileWithCheckSum(FileSystem fs, logger.info("Starting copy of " + source + " to " + dest); FSDataInputStream input = null; OutputStream output = null; - try { - input = fs.open(source); - output = new BufferedOutputStream(new FileOutputStream(dest)); - byte[] buffer = new byte[bufferSize]; - while(true) { - int read = input.read(buffer); - if(read < 0) { - break; - } else { - output.write(buffer, 0, read); - } - - if(fileCheckSumGenerator != null) - fileCheckSumGenerator.update(buffer, 0, read); - if(throttler != null) - throttler.maybeThrottle(read); - stats.recordBytes(read); - if(stats.getBytesSinceLastReport() > reportingIntervalBytes) { - NumberFormat format = NumberFormat.getNumberInstance(); - format.setMaximumFractionDigits(2); - logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at " - + format.format(stats.getBytesPerSecond() / (1024 * 1024)) - + " MB/sec - " + format.format(stats.getPercentCopied()) - + " % complete"); - if(this.status != null) { - this.status.setStatus(stats.getTotalBytesCopied() - / (1024 * 1024) - + " MB copied at " - + format.format(stats.getBytesPerSecond() - / (1024 * 1024)) + " MB/sec - " - + format.format(stats.getPercentCopied()) - + " % complete"); + for(int attempt = 0; attempt < NUM_RETRIES; attempt++) { + boolean success = true; + try { + + input = fs.open(source); + output = new BufferedOutputStream(new FileOutputStream(dest)); + byte[] buffer = new byte[bufferSize]; + while(true) { + int read = input.read(buffer); + if(read < 0) { + break; + } else { + output.write(buffer, 0, read); + } + + if(fileCheckSumGenerator != null) + fileCheckSumGenerator.update(buffer, 0, read); + if(throttler != null) + throttler.maybeThrottle(read); + stats.recordBytes(read); + if(stats.getBytesSinceLastReport() > reportingIntervalBytes) { + NumberFormat format = NumberFormat.getNumberInstance(); + format.setMaximumFractionDigits(2); + logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at " + + format.format(stats.getBytesPerSecond() / (1024 * 1024)) + + " MB/sec - " + format.format(stats.getPercentCopied()) + + " % complete"); + if(this.status != null) { + this.status.setStatus(stats.getTotalBytesCopied() + / (1024 * 1024) + + " MB copied at " + + format.format(stats.getBytesPerSecond() + / (1024 * 1024)) + " MB/sec - " + + format.format(stats.getPercentCopied()) + + " % complete"); + } + stats.reset(); } - stats.reset(); } + logger.info("Completed copy of " + source + " to " + dest); + + } catch(IOException ioe) { + success = false; + logger.error("Error during copying file ", ioe); + if(attempt < NUM_RETRIES - 1) + logger.info("retrying copying"); + else + throw ioe; + + } finally { + IOUtils.closeQuietly(output); + IOUtils.closeQuietly(input); + if(success) + break; + } - logger.info("Completed copy of " + source + " to " + dest); - } finally { - IOUtils.closeQuietly(output); - IOUtils.closeQuietly(input); + } }