From df5eed2e1293f8529057a114edafe4c60f45201a Mon Sep 17 00:00:00 2001 From: Lewis John McGibbney Date: Tue, 17 May 2016 09:38:01 -0700 Subject: [PATCH] NUTCH-2262 Utilize parameterized logging notation across Fetcher --- .../org/apache/nutch/fetcher/FetchItem.java | 2 +- .../org/apache/nutch/fetcher/Fetcher.java | 82 ++++++++----------- .../nutch/fetcher/FetcherOutputFormat.java | 4 +- 3 files changed, 39 insertions(+), 49 deletions(-) diff --git a/src/java/org/apache/nutch/fetcher/FetchItem.java b/src/java/org/apache/nutch/fetcher/FetchItem.java index 833fa38ade..3ad497090b 100644 --- a/src/java/org/apache/nutch/fetcher/FetchItem.java +++ b/src/java/org/apache/nutch/fetcher/FetchItem.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; /** - * This class described the item to be fetched. + * This class describes the item to be fetched. */ public class FetchItem { diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java index 2030778a45..aad9ee9871 100644 --- a/src/java/org/apache/nutch/fetcher/Fetcher.java +++ b/src/java/org/apache/nutch/fetcher/Fetcher.java @@ -78,7 +78,7 @@ * @author Andrzej Bialecki */ public class Fetcher extends NutchTool implements Tool, - MapRunnable { +MapRunnable { public static final int PERM_REFRESH_TIME = 5; @@ -89,7 +89,7 @@ public class Fetcher extends NutchTool implements Tool, public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class); public static class InputFormat extends - SequenceFileInputFormat { + SequenceFileInputFormat { /** Don't split inputs, to keep things polite. */ public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException { FileStatus[] files = listStatus(job); @@ -103,6 +103,7 @@ public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException { } } + @SuppressWarnings("unused") private OutputCollector output; private Reporter reporter; @@ -125,7 +126,7 @@ public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException { LinkedList fetcherThreads = new LinkedList(); public Fetcher() { - super(null); + super(null); } public Fetcher(Configuration conf) { @@ -141,14 +142,14 @@ private void reportStatus(int pagesLastSec, int bytesLastSec) long avgBytesSec = (bytes.get() / 128l) / elapsed.longValue(); status.append(activeThreads).append(" threads (").append(spinWaiting.get()) - .append(" waiting), "); + .append(" waiting), "); status.append(fetchQueues.getQueueCount()).append(" queues, "); status.append(fetchQueues.getTotalSize()).append(" URLs queued, "); status.append(pages).append(" pages, ").append(errors).append(" errors, "); status.append(String.format("%.2f", avgPagesSec)).append(" pages/s ("); status.append(pagesLastSec).append(" last sec), "); status.append(avgBytesSec).append(" kbits/s (") - .append((bytesLastSec / 128)).append(" last sec)"); + .append((bytesLastSec / 128)).append(" last sec)"); reporter.setStatus(status.toString()); } @@ -178,7 +179,7 @@ public static boolean isStoringContent(Configuration conf) { public void run(RecordReader input, OutputCollector output, Reporter reporter) - throws IOException { + throws IOException { this.output = output; this.reporter = reporter; @@ -186,12 +187,12 @@ public void run(RecordReader input, int threadCount = getConf().getInt("fetcher.threads.fetch", 10); if (LOG.isInfoEnabled()) { - LOG.info("Fetcher: threads: " + threadCount); + LOG.info("Fetcher: threads: {}", threadCount); } int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2); if (LOG.isInfoEnabled()) { - LOG.info("Fetcher: time-out divisor: " + timeoutDivisor); + LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor); } int queueDepthMuliplier = getConf().getInt( @@ -229,20 +230,18 @@ public void run(RecordReader input, int pagesLastSec; int bytesLastSec; - // Set to true whenever the threshold has been exceeded for the first time - boolean throughputThresholdExceeded = false; int throughputThresholdNumRetries = 0; int throughputThresholdPages = getConf().getInt( "fetcher.throughput.threshold.pages", -1); if (LOG.isInfoEnabled()) { - LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); + LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages); } int throughputThresholdMaxRetries = getConf().getInt( "fetcher.throughput.threshold.retries", 5); if (LOG.isInfoEnabled()) { - LOG.info("Fetcher: throughput threshold retries: " - + throughputThresholdMaxRetries); + LOG.info("Fetcher: throughput threshold retries: {}", + throughputThresholdMaxRetries); } long throughputThresholdTimeLimit = getConf().getLong( "fetcher.throughput.threshold.check.after", -1); @@ -250,8 +249,8 @@ public void run(RecordReader input, int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000; int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount); if (maxNumThreads < threadCount) { - LOG.info("fetcher.maxNum.threads can't be < than " + threadCount - + " : using " + threadCount + " instead"); + LOG.info("fetcher.maxNum.threads can't be < than {} : using {} instead", + threadCount, threadCount); maxNumThreads = threadCount; } int bandwidthTargetCheckEveryNSecs = getConf().getInt( @@ -297,10 +296,8 @@ public void run(RecordReader input, // Check if we're dropping below the threshold if (pagesLastSec < throughputThresholdPages) { throughputThresholdNumRetries++; - LOG.warn(Integer.toString(throughputThresholdNumRetries) - + ": dropping below configured threshold of " - + Integer.toString(throughputThresholdPages) - + " pages per second"); + LOG.warn("{}: dropping below configured threshold of {} pages per second", + Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages)); // Quit if we dropped below threshold too many times if (throughputThresholdNumRetries == throughputThresholdMaxRetries) { @@ -336,8 +333,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { averageBdwPerThread = Math.round(bpsSinceLastCheck / activeThreads.get()); - LOG.info("averageBdwPerThread : " + (averageBdwPerThread / 1000) - + " kbps"); + LOG.info("averageBdwPerThread : {} kbps", (averageBdwPerThread / 1000)); if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) { // check whether it is worth doing e.g. more queues than threads @@ -354,10 +350,8 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { // availableThreads and additionalThreads) additionalThreads = (availableThreads < additionalThreads ? availableThreads : additionalThreads); - LOG.info("Has space for more threads (" - + (bpsSinceLastCheck / 1000) + " vs " - + (targetBandwidth / 1000) + " kbps) \t=> adding " - + additionalThreads + " new threads"); + LOG.info("Has space for more threads ({} vs {} kbps) \t=> adding {} new threads", + (bpsSinceLastCheck / 1000), (targetBandwidth / 1000), additionalThreads); // activate new threads for (int i = 0; i < additionalThreads; i++) { FetcherThread thread = new FetcherThread(getConf(), getActiveThreads(), fetchQueues, @@ -373,9 +367,8 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { // bandwidth, we have to stop some threads long excessBdw = bpsSinceLastCheck - targetBandwidth; int excessThreads = Math.round(excessBdw / averageBdwPerThread); - LOG.info("Exceeding target bandwidth (" + bpsSinceLastCheck / 1000 - + " vs " + (targetBandwidth / 1000) - + " kbps). \t=> excessThreads = " + excessThreads); + LOG.info("Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}", + bpsSinceLastCheck / 1000, (targetBandwidth / 1000), excessThreads); // keep at least one if (excessThreads >= fetcherThreads.size()) excessThreads = 0; @@ -399,12 +392,11 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { // some requests seem to hang, despite all intentions if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { if (LOG.isWarnEnabled()) { - LOG.warn("Aborting with " + activeThreads + " hung threads."); + LOG.warn("Aborting with {} hung threads.", activeThreads); for (int i = 0; i < fetcherThreads.size(); i++) { FetcherThread thread = fetcherThreads.get(i); if (thread.isAlive()) { - LOG.warn("Thread #" + i + " hung while processing " - + thread.getReprUrl()); + LOG.warn("Thread #{} hung while processing {}", i, thread.getReprUrl()); if (LOG.isDebugEnabled()) { StackTraceElement[] stack = thread.getStackTrace(); StringBuilder sb = new StringBuilder(); @@ -421,7 +413,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { } } while (activeThreads.get() > 0); - LOG.info("-activeThreads=" + activeThreads); + LOG.info("-activeThreads={}", activeThreads); } @@ -432,8 +424,8 @@ public void fetch(Path segment, int threads) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); if (LOG.isInfoEnabled()) { - LOG.info("Fetcher: starting at " + sdf.format(start)); - LOG.info("Fetcher: segment: " + segment); + LOG.info("Fetcher: starting at {}", sdf.format(start)); + LOG.info("Fetcher: segment: {}", segment); } // set the actual time for the timelimit relative @@ -442,7 +434,7 @@ public void fetch(Path segment, int threads) throws IOException { long timelimit = getConf().getLong("fetcher.timelimit.mins", -1); if (timelimit != -1) { timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); - LOG.info("Fetcher Timelimit set for : " + timelimit); + LOG.info("Fetcher Timelimit set for : {}", timelimit); getConf().setLong("fetcher.timelimit", timelimit); } @@ -455,8 +447,8 @@ public void fetch(Path segment, int threads) throws IOException { int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1); if (maxOutlinkDepth > 0) { - LOG.info("Fetcher: following outlinks up to depth: " - + Integer.toString(maxOutlinkDepth)); + LOG.info("Fetcher: following outlinks up to depth: {}", + Integer.toString(maxOutlinkDepth)); int maxOutlinkDepthNumLinks = getConf().getInt( "fetcher.follow.outlinks.num.links", 4); @@ -469,8 +461,8 @@ public void fetch(Path segment, int threads) throws IOException { / (i + 1) * maxOutlinkDepthNumLinks); } - LOG.info("Fetcher: maximum outlinks to follow: " - + Integer.toString(totalOutlinksToFollow)); + LOG.info("Fetcher: maximum outlinks to follow: {}", + Integer.toString(totalOutlinksToFollow)); } JobConf job = new NutchJob(getConf()); @@ -496,8 +488,8 @@ public void fetch(Path segment, int threads) throws IOException { JobClient.runJob(job); long end = System.currentTimeMillis(); - LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " - + TimingUtil.elapsedTime(start, end)); + LOG.info("Fetcher: finished at {}, elapsed: {}", sdf.format(end), + TimingUtil.elapsedTime(start, end)); } /** Run the fetcher. */ @@ -518,7 +510,6 @@ public int run(String[] args) throws Exception { Path segment = new Path(args[0]); int threads = getConf().getInt("fetcher.threads.fetch", 10); - boolean parsing = false; for (int i = 1; i < args.length; i++) { // parse command line if (args[i].equals("-threads")) { // found -threads option @@ -532,7 +523,7 @@ public int run(String[] args) throws Exception { fetch(segment, threads); return 0; } catch (Exception e) { - LOG.error("Fetcher: " + StringUtils.stringifyException(e)); + LOG.error("Fetcher: {}", StringUtils.stringifyException(e)); return -1; } @@ -554,7 +545,7 @@ private void checkConfiguration() { private AtomicInteger getActiveThreads() { return activeThreads; } - + @Override public Map run(Map args, String crawlId) throws Exception { @@ -588,7 +579,6 @@ public int compare(File f1, File f2) { int threads = getConf().getInt("fetcher.threads.fetch", 10); - boolean parsing = false; // parse command line if (args.containsKey("threads")) { // found -threads option @@ -601,7 +591,7 @@ public int compare(File f1, File f2) { results.put(Nutch.VAL_RESULT, Integer.toString(0)); return results; } catch (Exception e) { - LOG.error("Fetcher: " + StringUtils.stringifyException(e)); + LOG.error("Fetcher: {}", StringUtils.stringifyException(e)); results.put(Nutch.VAL_RESULT, Integer.toString(-1)); return results; } diff --git a/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java b/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java index 89969eef09..d526a07d33 100644 --- a/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java +++ b/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java @@ -58,7 +58,7 @@ public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) - throws IOException { + throws IOException { Path out = FileOutputFormat.getOutputPath(job); final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name); @@ -71,7 +71,7 @@ public RecordWriter getRecordWriter(final FileSystem fs, org.apache.hadoop.io.SequenceFile.Writer.Option fValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class); org.apache.hadoop.io.SequenceFile.Writer.Option fProgressOpt = SequenceFile.Writer.progressable(progress); org.apache.hadoop.io.SequenceFile.Writer.Option fCompOpt = SequenceFile.Writer.compression(compType); - + final MapFile.Writer fetchOut = new MapFile.Writer(job, fetch, fKeyClassOpt, fValClassOpt, fCompOpt, fProgressOpt);