/
WebGraph.java
820 lines (711 loc) · 29.8 KB
/
WebGraph.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.scoring.webgraph;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.parse.Outlink;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.util.FSUtils;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
/**
* Creates three databases, one for inlinks, one for outlinks, and a node
* database that holds the number of in and outlinks to a url and the current
* score for the url.
*
* The score is set by an analysis program such as LinkRank. The WebGraph is an
* update-able database. Outlinks are stored by their fetch time or by the
* current system time if no fetch time is available. Only the most recent
* version of outlinks for a given url is stored. As more crawls are executed
* and the WebGraph updated, newer Outlinks will replace older Outlinks. This
* allows the WebGraph to adapt to changes in the link structure of the web.
*
* The Inlink database is created from the Outlink database and is regenerated
* when the WebGraph is updated. The Node database is created from both the
* Inlink and Outlink databases. Because the Node database is overwritten when
* the WebGraph is updated and because the Node database holds current scores
* for urls it is recommended that a crawl-cycle (one or more full crawls) fully
* complete before the WebGraph is updated and some type of analysis, such as
* LinkRank, is run to update scores in the Node database in a stable fashion.
*/
public class WebGraph extends Configured implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
public static final String LOCK_NAME = ".locked";
public static final String INLINK_DIR = "inlinks";
public static final String OUTLINK_DIR = "outlinks/current";
public static final String OLD_OUTLINK_DIR = "outlinks/old";
public static final String NODE_DIR = "nodes";
/**
* The OutlinkDb creates a database of all outlinks. Outlinks to internal urls
* by domain and host can be ignored. The number of Outlinks out to a given
* page or domain can also be limited.
*/
public static class OutlinkDb extends Configured {
public static final String URL_NORMALIZING = "webgraph.url.normalizers";
public static final String URL_FILTERING = "webgraph.url.filters";
/**
* Returns the fetch time from the parse data or the current system time if
* the fetch time doesn't exist.
*
* @param data
* The parse data.
*
* @return The fetch time as a long.
*/
private static long getFetchTime(ParseData data) {
// default to current system time
long fetchTime = System.currentTimeMillis();
String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY);
try {
// get the fetch time from the parse data
fetchTime = Long.parseLong(fetchTimeStr);
} catch (Exception e) {
fetchTime = System.currentTimeMillis();
}
return fetchTime;
}
/**
* Default constructor.
*/
public OutlinkDb() {
}
/**
* Configurable constructor.
*/
public OutlinkDb(Configuration conf) {
setConf(conf);
}
/**
* Passes through existing LinkDatum objects from an existing OutlinkDb and
* maps out new LinkDatum objects from new crawls ParseData.
*/
public static class OutlinkDbMapper extends
Mapper<Text, Writable, Text, NutchWritable> {
// using normalizers and/or filters
private boolean normalize = false;
private boolean filter = false;
// url normalizers, filters and job configuration
private URLNormalizers urlNormalizers;
private URLFilters filters;
private Configuration conf;
/**
* Normalizes and trims extra whitespace from the given url.
*
* @param url
* The url to normalize.
*
* @return The normalized url.
*/
private String normalizeUrl(String url) {
if (!normalize) {
return url;
}
String normalized = null;
if (urlNormalizers != null) {
try {
// normalize and trim the url
normalized = urlNormalizers.normalize(url,
URLNormalizers.SCOPE_DEFAULT);
normalized = normalized.trim();
} catch (Exception e) {
LOG.warn("Skipping " + url + ":" + e);
normalized = null;
}
}
return normalized;
}
/**
* Filters the given url.
*
* @param url
* The url to filter.
*
* @return The filtered url or null.
*/
private String filterUrl(String url) {
if (!filter) {
return url;
}
try {
url = filters.filter(url);
} catch (Exception e) {
url = null;
}
return url;
}
/**
* Configures the OutlinkDb job mapper. Sets up internal links and link limiting.
*/
@Override
public void setup(Mapper<Text, Writable, Text, NutchWritable>.Context context) {
Configuration config = context.getConfiguration();
conf = config;
normalize = conf.getBoolean(URL_NORMALIZING, false);
filter = conf.getBoolean(URL_FILTERING, false);
if (normalize) {
urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
}
if (filter) {
filters = new URLFilters(conf);
}
}
@Override
public void map(Text key, Writable value,
Context context)
throws IOException, InterruptedException {
// normalize url, stop processing if null
String url = normalizeUrl(key.toString());
if (url == null) {
return;
}
// filter url
if (filterUrl(url) == null) {
return;
}
// Overwrite the key with the normalized URL
key.set(url);
if (value instanceof CrawlDatum) {
CrawlDatum datum = (CrawlDatum) value;
if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
|| datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
|| datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
// Tell the reducer to get rid of all instances of this key
context.write(key, new NutchWritable(new BooleanWritable(true)));
}
} else if (value instanceof ParseData) {
// get the parse data and the outlinks from the parse data, along with
// the fetch time for those links
ParseData data = (ParseData) value;
long fetchTime = getFetchTime(data);
Outlink[] outlinkAr = data.getOutlinks();
Map<String, String> outlinkMap = new LinkedHashMap<>();
// normalize urls and put into map
if (outlinkAr != null && outlinkAr.length > 0) {
for (int i = 0; i < outlinkAr.length; i++) {
Outlink outlink = outlinkAr[i];
String toUrl = normalizeUrl(outlink.getToUrl());
if (filterUrl(toUrl) == null) {
continue;
}
// only put into map if the url doesn't already exist in the map or
// if it does and the anchor for that link is null, will replace if
// url is existing
boolean existingUrl = outlinkMap.containsKey(toUrl);
if (toUrl != null
&& (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) {
outlinkMap.put(toUrl, outlink.getAnchor());
}
}
}
// collect the outlinks under the fetch time
for (String outlinkUrl : outlinkMap.keySet()) {
String anchor = outlinkMap.get(outlinkUrl);
LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
context.write(key, new NutchWritable(datum));
}
} else if (value instanceof LinkDatum) {
LinkDatum datum = (LinkDatum) value;
String linkDatumUrl = normalizeUrl(datum.getUrl());
if (filterUrl(linkDatumUrl) != null) {
datum.setUrl(linkDatumUrl);
// collect existing outlinks from existing OutlinkDb
context.write(key, new NutchWritable(datum));
}
}
}
}
public static class OutlinkDbReducer extends
Reducer<Text, NutchWritable, Text, LinkDatum> {
// ignoring internal domains, internal hosts
private boolean ignoreDomain = true;
private boolean ignoreHost = true;
// limiting urls out to a page or to a domain
private boolean limitPages = true;
private boolean limitDomains = true;
// url normalizers, filters and job configuration
private Configuration conf;
/**
* Configures the OutlinkDb job reducer. Sets up internal links and link limiting.
*/
public void setup(Reducer<Text, NutchWritable, Text, LinkDatum>.Context context) {
Configuration config = context.getConfiguration();
conf = config;
ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
limitPages = conf.getBoolean("link.ignore.limit.page", true);
limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
}
public void reduce(Text key, Iterable<NutchWritable> values,
Context context)
throws IOException, InterruptedException {
// aggregate all outlinks, get the most recent timestamp for a fetch
// which should be the timestamp for all of the most recent outlinks
long mostRecent = 0L;
List<LinkDatum> outlinkList = new ArrayList<>();
for (NutchWritable val : values) {
final Writable value = val.get();
if (value instanceof LinkDatum) {
// loop through, change out most recent timestamp if needed
LinkDatum next = (LinkDatum) value;
long timestamp = next.getTimestamp();
if (mostRecent == 0L || mostRecent < timestamp) {
mostRecent = timestamp;
}
outlinkList.add(WritableUtils.clone(next, conf));
context.getCounter("WebGraph.outlinks", "added links").increment(1);
} else if (value instanceof BooleanWritable) {
BooleanWritable delete = (BooleanWritable) value;
// Actually, delete is always true, otherwise we don't emit it in the
// mapper in the first place
if (delete.get() == true) {
// This page is gone, do not emit it's outlinks
context.getCounter("WebGraph.outlinks", "removed links").increment(1);
return;
}
}
}
// get the url, domain, and host for the url
String url = key.toString();
String domain = URLUtil.getDomainName(url);
String host = URLUtil.getHost(url);
// setup checking sets for domains and pages
Set<String> domains = new HashSet<>();
Set<String> pages = new HashSet<>();
// loop through the link datums
for (LinkDatum datum : outlinkList) {
// get the url, host, domain, and page for each outlink
String toUrl = datum.getUrl();
String toDomain = URLUtil.getDomainName(toUrl);
String toHost = URLUtil.getHost(toUrl);
String toPage = URLUtil.getPage(toUrl);
datum.setLinkType(LinkDatum.OUTLINK);
// outlinks must be the most recent and conform to internal url and
// limiting rules, if it does collect it
if (datum.getTimestamp() == mostRecent
&& (!limitPages || (limitPages && !pages.contains(toPage)))
&& (!limitDomains || (limitDomains && !domains.contains(toDomain)))
&& (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
&& (!ignoreDomain || (ignoreDomain && !toDomain
.equalsIgnoreCase(domain)))) {
context.write(key, datum);
pages.add(toPage);
domains.add(toDomain);
}
}
}
}
public void close() {
}
}
/**
* The InlinkDb creates a database of Inlinks. Inlinks are inverted from the
* OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is
* updated.
*/
private static class InlinkDb extends Configured{
private static long timestamp;
/**
* Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
* new system timestamp, type and to and from url switched.
*/
public static class InlinkDbMapper extends
Mapper<Text, LinkDatum, Text, LinkDatum> {
/**
* Configures job mapper. Sets timestamp for all Inlink LinkDatum objects to the
* current system time.
*/
public void setup(Mapper<Text, LinkDatum, Text, LinkDatum>.Context context) {
timestamp = System.currentTimeMillis();
}
public void map(Text key, LinkDatum datum,
Context context)
throws IOException, InterruptedException {
// get the to and from url and the anchor
String fromUrl = key.toString();
String toUrl = datum.getUrl();
String anchor = datum.getAnchor();
// flip the from and to url and set the new link type
LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp);
inlink.setLinkType(LinkDatum.INLINK);
context.write(new Text(toUrl), inlink);
}
}
}
/**
* Creates the Node database which consists of the number of in and outlinks
* for each url and a score slot for analysis programs such as LinkRank.
*/
private static class NodeDb extends Configured {
/**
* Counts the number of inlinks and outlinks for each url and sets a default
* score of 0.0 for each url (node) in the webgraph.
*/
public static class NodeDbReducer extends
Reducer<Text, LinkDatum, Text, Node> {
/**
* Configures job reducer.
*/
public void setup(Reducer<Text, LinkDatum, Text, Node>.Context context) {
}
public void reduce(Text key, Iterable<LinkDatum> values,
Context context) throws IOException, InterruptedException {
Node node = new Node();
int numInlinks = 0;
int numOutlinks = 0;
// loop through counting number of in and out links
for (LinkDatum next : values) {
if (next.getLinkType() == LinkDatum.INLINK) {
numInlinks++;
} else if (next.getLinkType() == LinkDatum.OUTLINK) {
numOutlinks++;
}
}
// set the in and outlinks and a default score of 0
node.setNumInlinks(numInlinks);
node.setNumOutlinks(numOutlinks);
node.setInlinkScore(0.0f);
context.write(key, node);
}
}
}
/**
* Creates the three different WebGraph databases, Outlinks, Inlinks, and
* Node. If a current WebGraph exists then it is updated, if it doesn't exist
* then a new WebGraph database is created.
*
* @param webGraphDb
* The WebGraph to create or update.
* @param segments
* The array of segments used to update the WebGraph. Newer segments
* and fetch times will overwrite older segments.
* @param normalize
* whether to use URLNormalizers on URL's in the segment
* @param filter
* whether to use URLFilters on URL's in the segment
*
* @throws IOException
* If an error occurs while processing the WebGraph.
*/
public void createWebGraph(Path webGraphDb, Path[] segments,
boolean normalize, boolean filter) throws IOException,
InterruptedException, ClassNotFoundException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("WebGraphDb: starting at " + sdf.format(start));
LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
LOG.info("WebGraphDb: URL normalize: " + normalize);
LOG.info("WebGraphDb: URL filter: " + filter);
}
FileSystem fs = webGraphDb.getFileSystem(getConf());
// lock an existing webgraphdb to prevent multiple simultaneous updates
Path lock = new Path(webGraphDb, LOCK_NAME);
if (!fs.exists(webGraphDb)) {
fs.mkdirs(webGraphDb);
}
LockUtil.createLockFile(fs, lock, false);
// outlink and temp outlink database paths
Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR);
if (!fs.exists(outlinkDb)) {
fs.mkdirs(outlinkDb);
}
Path tempOutlinkDb = new Path(outlinkDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job outlinkJob = NutchJob.getInstance(getConf());
Configuration outlinkJobConf = outlinkJob.getConfiguration();
outlinkJob.setJobName("Outlinkdb: " + outlinkDb);
boolean deleteGone = outlinkJobConf.getBoolean("link.delete.gone", false);
boolean preserveBackup = outlinkJobConf.getBoolean("db.preserve.backup", true);
if (deleteGone) {
LOG.info("OutlinkDb: deleting gone links");
}
// get the parse data and crawl fetch data for all segments
if (segments != null) {
for (int i = 0; i < segments.length; i++) {
FileSystem sfs = segments[i].getFileSystem(outlinkJobConf);
Path parseData = new Path(segments[i], ParseData.DIR_NAME);
if (sfs.exists(parseData)) {
LOG.info("OutlinkDb: adding input: " + parseData);
FileInputFormat.addInputPath(outlinkJob, parseData);
}
if (deleteGone) {
Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
if (sfs.exists(crawlFetch)) {
LOG.info("OutlinkDb: adding input: " + crawlFetch);
FileInputFormat.addInputPath(outlinkJob, crawlFetch);
}
}
}
}
// add the existing webgraph
LOG.info("OutlinkDb: adding input: " + outlinkDb);
FileInputFormat.addInputPath(outlinkJob, outlinkDb);
outlinkJobConf.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
outlinkJobConf.setBoolean(OutlinkDb.URL_FILTERING, filter);
outlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
outlinkJob.setJarByClass(OutlinkDb.class);
outlinkJob.setMapperClass(OutlinkDb.OutlinkDbMapper.class);
outlinkJob.setReducerClass(OutlinkDb.OutlinkDbReducer.class);
outlinkJob.setMapOutputKeyClass(Text.class);
outlinkJob.setMapOutputValueClass(NutchWritable.class);
outlinkJob.setOutputKeyClass(Text.class);
outlinkJob.setOutputValueClass(LinkDatum.class);
FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
outlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
outlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the outlinkdb job and replace any old outlinkdb with the new one
try {
LOG.info("OutlinkDb: running");
boolean success = outlinkJob.waitForCompletion(true);
if (!success) {
String message = "OutlinkDb job did not succeed, job status:"
+ outlinkJob.getStatus().getState() + ", reason: "
+ outlinkJob.getStatus().getFailureInfo();
LOG.error(message);
NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
throw new RuntimeException(message);
}
LOG.info("OutlinkDb: installing " + outlinkDb);
FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
if (!preserveBackup && fs.exists(oldOutlinkDb))
fs.delete(oldOutlinkDb, true);
LOG.info("OutlinkDb: finished");
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("OutlinkDb failed:", e);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
throw e;
}
// inlink and temp link database paths
Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
Path tempInlinkDb = new Path(inlinkDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job inlinkJob = NutchJob.getInstance(getConf());
Configuration inlinkJobConf = inlinkJob.getConfiguration();
inlinkJob.setJobName("Inlinkdb " + inlinkDb);
LOG.info("InlinkDb: adding input: " + outlinkDb);
FileInputFormat.addInputPath(inlinkJob, outlinkDb);
inlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
inlinkJob.setJarByClass(InlinkDb.class);
inlinkJob.setMapperClass(InlinkDb.InlinkDbMapper.class);
inlinkJob.setMapOutputKeyClass(Text.class);
inlinkJob.setMapOutputValueClass(LinkDatum.class);
inlinkJob.setOutputKeyClass(Text.class);
inlinkJob.setOutputValueClass(LinkDatum.class);
FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
inlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
inlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
try {
// run the inlink and replace any old with new
LOG.info("InlinkDb: running");
boolean success = inlinkJob.waitForCompletion(true);
if (!success) {
String message = "InlinkDb job did not succeed, job status:"
+ inlinkJob.getStatus().getState() + ", reason: "
+ inlinkJob.getStatus().getFailureInfo();
LOG.error(message);
NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
throw new RuntimeException(message);
}
LOG.info("InlinkDb: installing " + inlinkDb);
FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
LOG.info("InlinkDb: finished");
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("InlinkDb failed:", e);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
throw e;
}
// node and temp node database paths
Path nodeDb = new Path(webGraphDb, NODE_DIR);
Path tempNodeDb = new Path(nodeDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job nodeJob = NutchJob.getInstance(getConf());
Configuration nodeJobConf = nodeJob.getConfiguration();
nodeJob.setJobName("NodeDb " + nodeDb);
LOG.info("NodeDb: adding input: " + outlinkDb);
LOG.info("NodeDb: adding input: " + inlinkDb);
FileInputFormat.addInputPath(nodeJob, outlinkDb);
FileInputFormat.addInputPath(nodeJob, inlinkDb);
nodeJob.setInputFormatClass(SequenceFileInputFormat.class);
nodeJob.setJarByClass(NodeDb.class);
nodeJob.setReducerClass(NodeDb.NodeDbReducer.class);
nodeJob.setMapOutputKeyClass(Text.class);
nodeJob.setMapOutputValueClass(LinkDatum.class);
nodeJob.setOutputKeyClass(Text.class);
nodeJob.setOutputValueClass(Node.class);
FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
nodeJob.setOutputFormatClass(MapFileOutputFormat.class);
nodeJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
try {
// run the node job and replace old nodedb with new
LOG.info("NodeDb: running");
boolean success = nodeJob.waitForCompletion(true);
if (!success) {
String message = "NodeDb job did not succeed, job status:"
+ nodeJob.getStatus().getState() + ", reason: "
+ nodeJob.getStatus().getFailureInfo();
LOG.error(message);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
throw new RuntimeException(message);
}
LOG.info("NodeDb: installing " + nodeDb);
FSUtils.replace(fs, nodeDb, tempNodeDb, true);
LOG.info("NodeDb: finished");
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("NodeDb failed:", e);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
throw e;
}
// remove the lock file for the webgraph
LockUtil.removeLockFile(fs, lock);
long end = System.currentTimeMillis();
LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args);
System.exit(res);
}
/**
* Parses command link arguments and runs the WebGraph jobs.
*/
public int run(String[] args) throws Exception {
// boolean options
Option helpOpt = new Option("h", "help", false, "show this help message");
Option normOpt = new Option("n", "normalize", false,
"whether to use URLNormalizers on the URL's in the segment");
Option filtOpt = new Option("f", "filter", false,
"whether to use URLFilters on the URL's in the segment");
// argument options
@SuppressWarnings("static-access")
Option graphOpt = OptionBuilder
.withArgName("webgraphdb")
.hasArg()
.withDescription(
"the web graph database to create (if none exists) or use if one does")
.create("webgraphdb");
@SuppressWarnings("static-access")
Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
.withDescription("the segment(s) to use").create("segment");
@SuppressWarnings("static-access")
Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs()
.withDescription("the segment directory to use").create("segmentDir");
// create the options
Options options = new Options();
options.addOption(helpOpt);
options.addOption(normOpt);
options.addOption(filtOpt);
options.addOption(graphOpt);
options.addOption(segOpt);
options.addOption(segDirOpt);
CommandLineParser parser = new GnuParser();
try {
CommandLine line = parser.parse(options, args);
if (line.hasOption("help") || !line.hasOption("webgraphdb")
|| (!line.hasOption("segment") && !line.hasOption("segmentDir"))) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("WebGraph", options, true);
return -1;
}
String webGraphDb = line.getOptionValue("webgraphdb");
Path[] segPaths = null;
// Handle segment option
if (line.hasOption("segment")) {
String[] segments = line.getOptionValues("segment");
segPaths = new Path[segments.length];
for (int i = 0; i < segments.length; i++) {
segPaths[i] = new Path(segments[i]);
}
}
// Handle segmentDir option
if (line.hasOption("segmentDir")) {
Path dir = new Path(line.getOptionValue("segmentDir"));
FileSystem fs = dir.getFileSystem(getConf());
FileStatus[] fstats = fs.listStatus(dir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
segPaths = HadoopFSUtil.getPaths(fstats);
}
boolean normalize = false;
if (line.hasOption("normalize")) {
normalize = true;
}
boolean filter = false;
if (line.hasOption("filter")) {
filter = true;
}
createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
return 0;
} catch (Exception e) {
LOG.error("WebGraph: " + StringUtils.stringifyException(e));
return -2;
}
}
}