-
-
Notifications
You must be signed in to change notification settings - Fork 239
/
core.py
1148 lines (1047 loc) · 49 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# pylint:disable-msg=E0611,I1101
"""
Module bundling all functions needed to extract the text in a webpage.
"""
import logging
import re # import regex as re
import warnings
from copy import deepcopy
from lxml.etree import Element, SubElement, XPath, strip_elements, strip_tags, tostring
from lxml.html import tostring
# own
from .external import (SANITIZED_XPATH, justext_rescue, sanitize_tree,
try_readability)
from .filters import (LANGID_FLAG, check_html_lang, duplicate_test,
language_filter, text_chars_test)
from .hashing import content_fingerprint
from .htmlprocessing import (convert_tags, delete_by_link_density,
handle_textnode, link_density_test_tables,
process_node, prune_unwanted_nodes, tree_cleaning)
from .metadata import Document, extract_metadata
from .settings import BASIC_CLEAN_XPATH, DEFAULT_CONFIG, TAG_CATALOG, use_config
from .utils import (is_image_file, load_html, normalize_unicode, trim,
FORMATTING_PROTECTED)
from .xml import (build_json_output, build_tei_output, build_xml_output, control_xml_output,
remove_empty_elements, strip_double_tags, xmltotxt, xmltocsv)
from .xpaths import (BODY_XPATH, COMMENTS_DISCARD_XPATH, COMMENTS_XPATH,
DISCARD_IMAGE_ELEMENTS, OVERALL_DISCARD_XPATH,
PAYWALL_DISCARD_XPATH, PRECISION_DISCARD_XPATH,
REMOVE_COMMENTS_XPATH, TEASER_DISCARD_XPATH)
LOGGER = logging.getLogger(__name__)
P_FORMATTING = {'hi', 'ref'}
TABLE_ELEMS = {'td', 'th'}
TABLE_ALL = {'td', 'th', 'hi'}
FORMATTING = {'hi', 'ref', 'span'}
CODES_QUOTES = {'code', 'quote'}
NOT_AT_THE_END = {'head', 'ref'}
JSON_SEARCH = re.compile(r'"articlebody": *"(.+?)(?<!\\)"', re.I)
class Extractor:
"Defines a class to store all extraction options."
__slots__ = [
'config', 'fast', 'precision', 'recall', 'comments',
'formatting', 'links', 'images', 'tables', 'dedup', 'lang',
]
# consider dataclasses for Python 3.7+
def __init__(self, config, fast, precision, recall, comments,
formatting, links, images, tables, deduplicate,
target_language):
self.config = config
self.fast = fast
self.precision = precision
self.recall = recall
self.comments = comments
self.formatting = formatting
self.links = links
self.images = images
self.tables = tables
self.dedup = deduplicate
self.lang = target_language
def handle_titles(element, options):
'''Process head elements (titles)'''
if len(element) == 0:
# maybe needs attention?
# if element.tail and re.search(r'\w', element.tail):
# LOGGER.debug('tail in title, stripping: %s', element.tail)
# element.tail = None
title = process_node(element, options)
# children
else:
title = deepcopy(element)
# list instead of element.iter('*')
# TODO: write tests for it and check
for child in list(element):
# if child.tag not in potential_tags:
# LOGGER.debug('unexpected in title: %s %s %s', child.tag, child.text, child.tail)
# continue
processed_child = handle_textnode(child, options, comments_fix=False)
if processed_child is not None:
title.append(processed_child)
child.tag = 'done'
if title is not None and text_chars_test(''.join(title.itertext())) is True:
return title
return None
def handle_formatting(element, options):
'''Process formatting elements (b, i, etc. converted to hi) found
outside of paragraphs'''
formatting = process_node(element, options)
if len(element) == 0 and formatting is None:
return None
# repair orphan elements
# if formatting is None:
# formatting = Element(element.tag)
# return None
# if len(element) > 0:
# for child in element.iter('*'):
# if child.tag not in potential_tags:
# LOGGER.debug('unexpected in title: %s %s %s', child.tag, child.text, child.tail)
# continue
# processed_child = handle_textnode(child, options, comments_fix=False)
# if processed_child is not None:
# formatting.append(processed_child)
# child.tag = 'done'
# if text_chars_test(element.text) is True:
# processed_child.text = trim(element.text)
# if text_chars_test(element.tail) is True:
# processed_child.tail = trim(element.tail)
# if len(element) == 0:
# processed_element = process_node(element, options)
# children
# else:
# processed_element = Element(element.tag)
# processed_element.text, processed_element.tail = element.text, element.tail
# for child in element.iter('*'):
# processed_child = handle_textnode(child, options, comments_fix=False)
# if processed_child is not None:
# processed_element.append(processed_child)
# child.tag = 'done'
# repair orphan elements
# shorter code but triggers warning:
# parent = element.getparent() or element.getprevious()
parent = element.getparent()
if parent is None:
parent = element.getprevious()
if parent is None or parent.tag not in FORMATTING_PROTECTED:
processed_element = Element('p')
processed_element.insert(0, formatting)
else:
processed_element = formatting
return processed_element
def handle_lists(element, options):
'''Process lists elements'''
processed_element = Element(element.tag)
if element.text is not None and element.text.strip():
newchildelem = SubElement(processed_element, "item")
newchildelem.text = element.text
# if element.tail is not None:
# processed_element.tail = element.text
for child in element.iter('item'):
newchildelem = Element('item')
if len(child) == 0:
processed_child = process_node(child, options)
if processed_child is not None:
newchildelem.text = processed_child.text
if processed_child.tail is not None and processed_child.tail.strip():
newchildelem.text += " " + processed_child.tail
processed_element.append(newchildelem)
else:
newchildelem.text = child.text
# proceed with iteration, fix for nested elements
for subelem in child.iterdescendants('*'):
# beware of nested lists
if subelem.tag == 'list':
processed_subchild = handle_lists(subelem, options)
if processed_subchild is not None:
newchildelem.append(processed_subchild)
else:
processed_subchild = handle_textnode(subelem, options, comments_fix=False)
# add child element to processed_element
if processed_subchild is not None:
subchildelem = SubElement(newchildelem, processed_subchild.tag)
subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail
# set attributes
for attr in subelem.attrib:
subchildelem.set(attr, subelem.get(attr))
# strip_tags(newchildelem, 'item')
subelem.tag = 'done'
if child.tail is not None and child.tail.strip():
newchildelem_children = [el for el in newchildelem.getchildren() if el.tag != 'done']
if newchildelem_children:
last_subchild = newchildelem_children[-1]
if last_subchild.tail is None or not last_subchild.tail.strip():
last_subchild.tail = child.tail
else:
last_subchild.tail += ' ' + child.tail
if newchildelem.text or len(newchildelem) > 0:
# set attribute
if child.get('rend') is not None:
newchildelem.set('rend', child.get('rend'))
processed_element.append(newchildelem)
child.tag = 'done'
element.tag = 'done'
# test if it has children and text. Avoid double tags??
if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True:
# set attribute
if element.get('rend') is not None:
processed_element.set('rend', element.get('rend'))
return processed_element
return None
def is_code_block_element(element):
# pip
if element.get('lang') is not None or element.tag == 'code':
return True
# GitHub
parent = element.getparent()
if parent is not None and 'highlight' in parent.get('class', default=''):
return True
# highlightjs
code = element.find('code')
if code is not None and len(element.getchildren()) == 1:
return True
return False
def handle_code_blocks(element):
processed_element = deepcopy(element)
for child in element.iter('*'):
child.tag = 'done'
processed_element.tag = 'code'
return processed_element
def handle_quotes(element, options):
'''Process quotes elements'''
if is_code_block_element(element):
return handle_code_blocks(element)
processed_element = Element(element.tag)
for child in element.iter('*'):
processed_child = process_node(child, options) # handle_textnode(child, comments_fix=True)
if processed_child is not None:
newsub = SubElement(processed_element, child.tag)
newsub.text, newsub.tail = processed_child.text, processed_child.tail
child.tag = 'done'
if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True:
# avoid double/nested tags
strip_tags(processed_element, 'quote')
return processed_element
return None
def handle_other_elements(element, potential_tags, options):
'''Handle diverse or unknown elements in the scope of relevant tags'''
# handle w3schools code
if element.tag == 'div' and 'w3-code' in element.get('class', default=''):
return handle_code_blocks(element)
# delete unwanted
if element.tag not in potential_tags:
if element.tag != 'done':
LOGGER.debug('discarding element: %s %s', element.tag, element.text)
return None
if element.tag == 'div':
# make a copy and prune it in case it contains sub-elements handled on their own?
# divcopy = deepcopy(element)
processed_element = handle_textnode(element, options, comments_fix=False, preserve_spaces=True)
if processed_element is not None and text_chars_test(processed_element.text) is True:
processed_element.attrib.clear()
# small div-correction # could be moved elsewhere
if processed_element.tag == 'div':
processed_element.tag = 'p'
# insert
return processed_element
else:
LOGGER.debug('unexpected element seen: %s %s', element.tag, element.text)
return None
def handle_paragraphs(element, potential_tags, options):
'''Process paragraphs (p) elements along with their children,
trim and clean the content'''
element.attrib.clear()
# strip_tags(element, 'p') # change in precision due to spaces?
# no children
if len(element) == 0:
processed_element = process_node(element, options)
if processed_element is not None:
return processed_element
return None
# children
processed_element = Element(element.tag)
for child in element.iter('*'):
if child.tag not in potential_tags and child.tag != 'done':
LOGGER.debug('unexpected in p: %s %s %s', child.tag, child.text, child.tail)
continue
# spacing = child.tag in SPACING_PROTECTED # todo: outputformat.startswith('xml')?
# todo: act on spacing here?
processed_child = handle_textnode(child, options, comments_fix=False, preserve_spaces=True)
if processed_child is not None:
# todo: needing attention!
if processed_child.tag == 'p':
LOGGER.debug('extra p within p: %s %s %s', processed_child.tag, processed_child.text,
processed_child.tail)
if processed_element.text:
processed_element.text += ' ' + processed_child.text
else:
processed_element.text = processed_child.text
continue
# handle formatting
newsub = Element(child.tag)
if processed_child.tag in P_FORMATTING:
# check depth and clean
if len(processed_child) > 0:
for item in processed_child: # children are lists
if text_chars_test(item.text) is True:
item.text = ' ' + item.text
strip_tags(processed_child, item.tag)
# correct attributes
if child.tag == 'hi':
newsub.set('rend', child.get('rend'))
elif child.tag == 'ref':
if child.get('target') is not None:
newsub.set('target', child.get('target'))
# handle line breaks
# elif processed_child.tag == 'lb':
# try:
# processed_child.tail = process_node(child, options).tail
# except AttributeError: # no text
# pass
# prepare text
# todo: to be moved to handle_textnode()
# if text_chars_test(processed_child.text) is False:
# processed_child.text = ''
# if text_chars_test(processed_child.tail) is False:
# processed_child.tail = ''
# if there are already children
# if len(processed_element) > 0:
# if text_chars_test(processed_child.tail) is True:
# newsub.tail = processed_child.text + processed_child.tail
# else:
# newsub.tail = processed_child.text
newsub.text, newsub.tail = processed_child.text, processed_child.tail
processed_element.append(newsub)
child.tag = 'done'
# finish
if len(processed_element) > 0:
# clean trailing lb-elements
if (
processed_element[-1].tag == 'lb'
and processed_element[-1].tail is None
):
processed_element[-1].getparent().remove(processed_element[-1])
return processed_element
if processed_element.text:
return processed_element
LOGGER.debug('discarding p-child: %s', tostring(processed_element))
return None
def define_cell_type(element):
'''Determine cell element type and mint new element'''
# define tag
cell_element = Element('cell')
if element.tag == 'th':
cell_element.set('role', 'head')
return cell_element
def handle_table(table_elem, potential_tags, options):
'''Process single table element'''
newtable = Element('table')
newrow = Element('row')
# strip these structural elements
strip_tags(table_elem, 'thead', 'tbody', 'tfoot')
# explore sub-elements
for subelement in table_elem.iterdescendants():
if subelement.tag == 'tr':
# process existing row
if len(newrow) > 0:
newtable.append(newrow)
newrow = Element('row')
elif subelement.tag in TABLE_ELEMS:
newchildelem = define_cell_type(subelement)
# process
if len(subelement) == 0:
processed_cell = process_node(subelement, options)
if processed_cell is not None:
newchildelem.text, newchildelem.tail = processed_cell.text, processed_cell.tail
else:
# proceed with iteration, fix for nested elements
newchildelem.text, newchildelem.tail = subelement.text, subelement.tail
subelement.tag = "done"
for child in subelement.iterdescendants():
if child.tag in TABLE_ALL:
# todo: define attributes properly
if child.tag in TABLE_ELEMS:
# subcell_elem = define_cell_type(subelement)
child.tag = 'cell'
processed_subchild = handle_textnode(child, options, preserve_spaces=True, comments_fix=True)
# todo: lists in table cells
else:
# subcell_elem = Element(child.tag)
processed_subchild = handle_textelem(child, potential_tags.union(['div']), options)
# add child element to processed_element
if processed_subchild is not None:
subchildelem = SubElement(newchildelem, processed_subchild.tag)
subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail
child.tag = 'done'
# add to tree
if newchildelem.text or len(newchildelem) > 0:
newrow.append(newchildelem)
# beware of nested tables
elif subelement.tag == 'table':
break
# cleanup
subelement.tag = 'done'
# end of processing
if len(newrow) > 0:
newtable.append(newrow)
if len(newtable) > 0:
return newtable
return None
def handle_image(element):
'''Process image element'''
# image source
processed_element = Element(element.tag)
if is_image_file(element.get('data-src')):
processed_element.set('src', element.get('data-src'))
elif is_image_file(element.get('src')):
processed_element.set('src', element.get('src'))
else:
# take the first corresponding attribute
for attr in element.attrib:
if attr.startswith('data-src') and is_image_file(element.get(attr)):
processed_element.set('src', element.get(attr))
break
# additional data
if element.get('alt') is not None:
processed_element.set('alt', element.get('alt'))
if element.get('title') is not None:
processed_element.set('title', element.get('title'))
# don't return empty elements or elements without source, just None
if len(processed_element.attrib) == 0 or not processed_element.get('src'):
return None
# post-processing: URLs
url = processed_element.get('src')
processed_element.set('src', re.sub(r'^//', 'http://', url))
return processed_element
def handle_textelem(element, potential_tags, options):
'''Process text element and determine how to deal with its content'''
new_element = None
# bypass: nested elements
if element.tag == 'list':
new_element = handle_lists(element, options)
elif element.tag in CODES_QUOTES:
new_element = handle_quotes(element, options)
elif element.tag == 'head':
new_element = handle_titles(element, options)
elif element.tag == 'p':
new_element = handle_paragraphs(element, potential_tags, options)
elif element.tag == 'lb':
if text_chars_test(element.tail) is True:
element = process_node(element, options)
if element is not None:
new_element = Element('p')
new_element.text = element.tail
elif element.tag in FORMATTING:
new_element = handle_formatting(element, options) # process_node(element, options)
elif element.tag == 'table' and 'table' in potential_tags:
new_element = handle_table(element, potential_tags, options)
elif element.tag == 'graphic' and 'graphic' in potential_tags:
new_element = handle_image(element)
else:
# other elements (div, ??, ??)
new_element = handle_other_elements(element, potential_tags, options)
return new_element
def recover_wild_text(tree, result_body, options, potential_tags=TAG_CATALOG):
'''Look for all previously unconsidered wild elements, including outside of the determined
frame and throughout the document to recover potentially missing text parts'''
LOGGER.debug('Recovering wild text elements')
search_expr = './/blockquote|.//code|.//p|.//pre|.//q|.//quote|.//table|.//div[contains(@class, \'w3-code\')]'
if options.recall is True:
potential_tags.update(['div', 'lb'])
search_expr += '|.//div|.//lb|.//list'
# prune
search_tree = prune_unwanted_sections(tree, potential_tags, options)
# decide if links are preserved
if 'ref' not in potential_tags:
strip_tags(search_tree, 'a', 'ref', 'span')
else:
strip_tags(search_tree, 'span')
subelems = search_tree.xpath(search_expr)
result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options)
for e in subelems)))
return result_body
def prune_unwanted_sections(tree, potential_tags, options):
'Rule-based deletion of targeted document sections'
# prune the rest
tree = prune_unwanted_nodes(tree, OVERALL_DISCARD_XPATH, with_backup=True)
tree = prune_unwanted_nodes(tree, PAYWALL_DISCARD_XPATH)
# decide if images are preserved
if 'graphic' not in potential_tags:
tree = prune_unwanted_nodes(tree, DISCARD_IMAGE_ELEMENTS)
# balance precision/recall
if options.recall is False:
tree = prune_unwanted_nodes(tree, TEASER_DISCARD_XPATH)
if options.precision is True:
tree = prune_unwanted_nodes(tree, PRECISION_DISCARD_XPATH)
# remove elements by link density
tree = delete_by_link_density(tree, 'div', backtracking=True, favor_precision=options.precision)
tree = delete_by_link_density(tree, 'list', backtracking=False, favor_precision=options.precision)
tree = delete_by_link_density(tree, 'p', backtracking=False, favor_precision=options.precision)
# also filter fw/head, table and quote elements?
if options.precision is True:
# delete trailing titles
while len(tree) > 0 and (tree[-1].tag == 'head'):
tree[-1].getparent().remove(tree[-1])
tree = delete_by_link_density(tree, 'head', backtracking=False) # favor_precision=options.precision
tree = delete_by_link_density(tree, 'quote', backtracking=False) # favor_precision=options.precision
return tree
def extract_content(tree, options):
'''Find the main content of a page using a set of XPath expressions,
then extract relevant elements, strip them of unwanted subparts and
convert them'''
# backup
backup_tree = deepcopy(tree)
# init
result_body = Element('body')
potential_tags = set(TAG_CATALOG)
if options.tables is True:
potential_tags.update(['table', 'td', 'th', 'tr'])
if options.images is True:
potential_tags.add('graphic')
if options.links is True:
potential_tags.add('ref')
# iterate
for expr in BODY_XPATH:
# select tree if the expression has been found
try:
subtree = expr(tree)[0]
except IndexError:
continue
# prune the subtree
subtree = prune_unwanted_sections(subtree, potential_tags, options)
# second pass?
# subtree = delete_by_link_density(subtree, 'list', backtracking=False, favor_precision=options.precision)
if 'table' in potential_tags or options.precision is True:
for elem in subtree.iter('table'):
if link_density_test_tables(elem) is True:
elem.getparent().remove(elem)
# skip if empty tree
if len(subtree) == 0:
continue
# no paragraphs containing text, or not enough
ptest = subtree.xpath('//p//text()')
if options.recall is True:
factor = 5
elif options.precision is True:
factor = 1
else:
factor = 3
if not ptest or len(''.join(ptest)) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE') * factor:
potential_tags.add('div')
# polish list of potential tags
if 'ref' not in potential_tags:
strip_tags(subtree, 'ref')
if 'span' not in potential_tags:
strip_tags(subtree, 'span')
LOGGER.debug(sorted(potential_tags))
# proper extraction
subelems = subtree.xpath('.//*')
# e.g. only lb-elems in a div
if {e.tag for e in subelems} == {'lb'}:
subelems = [subtree]
# extract content
result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options) for e in subelems)))
# remove trailing titles
while len(result_body) > 0 and (result_body[-1].tag in NOT_AT_THE_END):
result_body[-1].getparent().remove(result_body[-1])
# exit the loop if the result has children
if len(result_body) > 1:
LOGGER.debug(expr)
break
temp_text = ' '.join(result_body.itertext()).strip()
# try parsing wild <p> elements if nothing found or text too short
# todo: test precision and recall settings here
if len(result_body) == 0 or len(temp_text) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
result_body = recover_wild_text(backup_tree, result_body, options, potential_tags)
temp_text = ' '.join(result_body.itertext()).strip()
# filter output
strip_elements(result_body, 'done')
strip_tags(result_body, 'div')
# return
return result_body, temp_text, len(temp_text)
def process_comments_node(elem, potential_tags, options):
'''Process comment node and determine how to deal with its content'''
if elem.tag in potential_tags:
# print(elem.tag, elem.text_content())
processed_element = handle_textnode(elem, options, comments_fix=True)
# test length and remove
if processed_element is not None: # and processed_element.text not in COMMENTS_BLACKLIST:
processed_element.attrib.clear()
# if textfilter(elem) is True: # ^Pingback
# return None
return processed_element
return None
def extract_comments(tree, options):
'''Try and extract comments out of potential sections in the HTML'''
comments_body = Element('body')
# define iteration strategy
potential_tags = set(TAG_CATALOG) # 'span'
# potential_tags.add('div') trouble with <div class="comment-author meta">
for expr in COMMENTS_XPATH:
# select tree if the expression has been found
subtree = expr(tree)
if not subtree:
continue
subtree = subtree[0]
# prune
subtree = prune_unwanted_nodes(subtree, COMMENTS_DISCARD_XPATH)
# todo: unified stripping function, taking include_links into account
strip_tags(subtree, 'a', 'ref', 'span')
# extract content
# for elem in subtree.xpath('.//*'):
# processed_elem = process_comments_node(elem, potential_tags)
# if processed_elem is not None:
# comments_body.append(processed_elem)
# processed_elems = (process_comments_node(elem, potential_tags, options) for elem in
# subtree.xpath('.//*'))
comments_body.extend(filter(lambda x: x is not None, (process_comments_node(e, potential_tags, options) for e in subtree.xpath('.//*'))))
# control
if len(comments_body) > 0: # if it has children
LOGGER.debug(expr)
# remove corresponding subtree
subtree.getparent().remove(subtree)
break
# lengths
temp_comments = ' '.join(comments_body.itertext()).strip()
return comments_body, temp_comments, len(temp_comments), tree
def compare_extraction(tree, backup_tree, url, body, text, len_text, options):
'''Decide whether to choose own or external extraction
based on a series of heuristics'''
min_target_length = options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE')
# bypass for recall
if options.recall is True and len_text > min_target_length * 10:
return body, text, len_text
algo_flag, jt_result = False, False
# prior cleaning
backup_tree = prune_unwanted_nodes(backup_tree, PAYWALL_DISCARD_XPATH)
if options.precision is True:
backup_tree = prune_unwanted_nodes(backup_tree, OVERALL_DISCARD_XPATH)
# try with readability
temppost_algo = try_readability(backup_tree)
# unicode fix necessary on certain systems (#331)
algo_text = trim(tostring(temppost_algo, method='text', encoding='utf-8').decode('utf-8'))
len_algo = len(algo_text)
# compare
LOGGER.debug('extracted length: %s (algorithm) %s (extraction)', len_algo, len_text)
# conditions to use alternative algorithms
if len_algo in (0, len_text):
algo_flag = False
elif len_text == 0 and len_algo > 0:
algo_flag = True
elif len_text > 2 * len_algo:
algo_flag = False
elif len_algo > 2 * len_text:
algo_flag = True
# borderline cases
elif not body.xpath('.//p//text()') and len_algo > min_target_length * 2:
algo_flag = True
elif len(body.findall('.//table')) > len(body.findall('.//p')) and len_algo > min_target_length * 2:
algo_flag = True
# https://github.com/adbar/trafilatura/issues/354
elif options.recall is True and not body.xpath('.//head') and temppost_algo.xpath('.//h2|.//h3|.//h4') and len_algo > len_text:
algo_flag = True
else:
LOGGER.debug('extraction values: %s %s for %s', len_text, len_algo, url)
algo_flag = False
# apply decision
if algo_flag:
body, text, len_text = temppost_algo, algo_text, len_algo
LOGGER.debug('using generic algorithm: %s', url)
else:
LOGGER.debug('using custom extraction: %s', url)
# override faulty extraction: try with justext
if body.xpath(SANITIZED_XPATH) or len_text < min_target_length: # body.find(...)
# or options.recall is True ?
LOGGER.debug('unclean document triggering justext examination: %s', url)
# tree = prune_unwanted_sections(tree, {}, options)
body2, text2, len_text2, jt_result = justext_rescue(tree, url, options.lang, body, 0, '')
# prevent too short documents from replacing the main text
if jt_result is True and not len_text > 4*len_text2: # threshold could be adjusted
LOGGER.debug('using justext, length: %s', len_text2)
body, text, len_text = body2, text2, len_text2
# post-processing: remove unwanted sections
if algo_flag is True and jt_result is False:
body, text, len_text = sanitize_tree(body, options)
return body, text, len_text
def basic_cleaning(tree):
"Remove a few section types from the document."
for elem in BASIC_CLEAN_XPATH(tree):
elem.getparent().remove(elem)
return tree
def baseline(filecontent):
"""Use baseline extraction function targeting text paragraphs and/or JSON metadata.
Args:
filecontent: HTML code as binary string or string.
Returns:
A LXML <body> element containing the extracted paragraphs,
the main text as string, and its length as integer.
"""
tree = load_html(filecontent)
postbody = Element('body')
if tree is None:
return postbody, '', 0
# scrape from json text
for elem in tree.iterfind('.//script[@type="application/ld+json"]'):
if elem.text and '"article' in elem.text:
mymatch = JSON_SEARCH.search(elem.text)
if mymatch:
elem = SubElement(postbody, 'p')
elem.text = trim(mymatch[1].replace('\\"', '"'))
return postbody, elem.text, len(elem.text)
tree = basic_cleaning(tree)
# scrape from article tag
article_elem = tree.find('.//article')
if article_elem is not None:
temp_text = trim(article_elem.text_content())
if len(temp_text) > 100:
elem = SubElement(postbody, 'p')
elem.text = temp_text
return postbody, temp_text, len(temp_text)
# scrape from text paragraphs
results = set()
for element in tree.iter('blockquote', 'code', 'p', 'pre', 'q', 'quote'):
entry = element.text_content()
if entry not in results:
elem = SubElement(postbody, 'p')
elem.text = entry
results.add(entry)
temp_text = trim('\n'.join(postbody.itertext()))
if len(temp_text) > 100:
return postbody, temp_text, len(temp_text)
# default strategy: clean the tree and take everything
postbody = Element('body')
body_elem = tree.find('.//body')
if body_elem is not None:
# elem.text = trim(body_elem.text_content())
text = '\n'.join([trim(e) for e in body_elem.itertext()])
if len(text) > 100:
elem = SubElement(postbody, 'p')
elem.text = text
return postbody, text, len(text)
# new fallback
text = html2txt(tree)
elem = SubElement(postbody, 'p')
elem.text = text
return postbody, text, len(text)
# old: return postbody, '', 0
def html2txt(content):
"""Run basic html2txt on a document.
Args:
content: HTML document as string or LXML element.
Returns:
The extracted text in the form of a string or an empty string.
"""
tree = load_html(content)
if tree is None:
return ""
body = tree.find(".//body")
if body is None:
return ""
tree = basic_cleaning(tree)
return " ".join(body.text_content().split()).strip()
def determine_returnstring(document, output_format, include_formatting, tei_validation):
'''Convert XML tree to chosen format, clean the result and output it as a string'''
# XML (TEI) steps
if 'xml' in output_format:
# last cleaning
for element in document.body.iter('*'):
if element.tag != 'graphic' and len(element) == 0 and not element.text and not element.tail:
parent = element.getparent()
# do not remove elements inside <code> to preserve formatting
if parent is not None and parent.tag != 'code':
parent.remove(element)
# build output trees
strip_double_tags(document.body)
remove_empty_elements(document.body)
if output_format == 'xml':
output = build_xml_output(document)
elif output_format == 'xmltei':
output = build_tei_output(document)
# can be improved
returnstring = control_xml_output(output, output_format, tei_validation, document)
# CSV
elif output_format == 'csv':
returnstring = xmltocsv(document, include_formatting)
# JSON
elif output_format == 'json':
returnstring = build_json_output(document)
# TXT
else:
returnstring = xmltotxt(document.body, include_formatting)
if document.commentsbody is not None:
comments_text = xmltotxt(document.commentsbody, include_formatting)
returnstring = f"{returnstring}\n{comments_text}".strip()
# normalize Unicode format (defaults to NFC)
return normalize_unicode(returnstring)
def bare_extraction(filecontent, url=None, no_fallback=False, # fast=False,
favor_precision=False, favor_recall=False,
include_comments=True, output_format='python', target_language=None,
include_tables=True, include_images=False, include_formatting=False,
include_links=False, deduplicate=False,
date_extraction_params=None,
only_with_metadata=False, with_metadata=False,
max_tree_size=None, url_blacklist=None, author_blacklist=None,
as_dict=True, prune_xpath=None,
config=DEFAULT_CONFIG):
"""Internal function for text extraction returning bare Python variables.
Args:
filecontent: HTML code as string.
url: URL of the webpage.
no_fallback: Use faster heuristics and skip backup extraction.
favor_precision: prefer less text but correct extraction.
favor_recall: prefer more text even when unsure.
include_comments: Extract comments along with the main text.
output_format: Define an output format, Python being the default
and the interest of this internal function.
Other values: "txt", "csv", "json", "xml", or "xmltei".
target_language: Define a language to discard invalid documents (ISO 639-1 format).
include_tables: Take into account information within the HTML <table> element.
include_images: Take images into account (experimental).
include_formatting: Keep structural elements related to formatting
(present in XML format, converted to markdown otherwise).
include_links: Keep links along with their targets (experimental).
deduplicate: Remove duplicate segments and documents.
date_extraction_params: Provide extraction parameters to htmldate as dict().
only_with_metadata: Only keep documents featuring all essential metadata
(date, title, url).
max_tree_size: Discard documents with too many elements.
url_blacklist: Provide a blacklist of URLs as set() to filter out documents.
author_blacklist: Provide a blacklist of Author Names as set() to filter out authors.
as_dict: Legacy option, return a dictionary instead of a class with attributes.
prune_xpath: Provide an XPath expression to prune the tree before extraction.
can be str or list of str.
config: Directly provide a configparser configuration.
Returns:
A Python dict() containing all the extracted information or None.
Raises:
ValueError: Extraction problem.
"""
# init
if url_blacklist is None:
url_blacklist = set()
# deprecation warnings
if with_metadata is True:
only_with_metadata = with_metadata
warnings.warn(
'"with_metadata" will be deprecated in a future version, use "only_with_metadata instead"',
PendingDeprecationWarning
)
#if no_fallback is True:
# fast = no_fallback
#warnings.warn(
# '"no_fallback" will be deprecated in a future version, use "fast" instead',
# PendingDeprecationWarning
#)
# load data
try:
tree = load_html(filecontent)
if tree is None:
LOGGER.error('empty HTML tree for URL %s', url)
raise ValueError
# quick and dirty HTML lang check
if target_language is not None and (no_fallback is True or LANGID_FLAG is False):
if check_html_lang(tree, target_language) is False:
LOGGER.error('wrong HTML meta language for URL %s', url)
raise ValueError
# extract metadata if necessary
if output_format != 'txt':
if not date_extraction_params:
date_extraction_params = {
"extensive_search": config.getboolean('DEFAULT', 'EXTENSIVE_DATE_SEARCH'),
}
document = extract_metadata(tree, url, date_extraction_params, no_fallback, author_blacklist)
# cut short if extracted URL in blacklist
if document.url in url_blacklist:
LOGGER.warning('blacklisted URL: %s', url)
raise ValueError
# cut short if core elements are missing
if only_with_metadata is True and any(
x is None for x in
[document.date, document.title, document.url]
):
LOGGER.error('no metadata for URL %s', url)
raise ValueError
else:
document = Document()
# regroup extraction options
options = Extractor(config, no_fallback, favor_precision, favor_recall,
include_comments, include_formatting, include_links,
include_images, include_tables, deduplicate,
target_language)
# prune all xpath expressions that user specified
# no backup as this is unetre full control of the user
if prune_xpath is not None:
if isinstance(prune_xpath, str):
prune_xpath = [prune_xpath]
tree = prune_unwanted_nodes(tree, [XPath(x) for x in prune_xpath])
# backup (or not) for further processing
tree_backup_1 = deepcopy(tree) if no_fallback is False else None
tree_backup_2 = deepcopy(tree)
# clean + use LXML cleaner
cleaned_tree = tree_cleaning(tree, options)
cleaned_tree_backup = deepcopy(cleaned_tree)
# convert tags, the rest does not work without conversion
cleaned_tree = convert_tags(cleaned_tree, options, url or document.url)
# comments first, then remove
if include_comments is True:
commentsbody, temp_comments, len_comments, cleaned_tree = extract_comments(cleaned_tree, options)
else:
commentsbody, temp_comments, len_comments = None, '', 0
if favor_precision is True:
cleaned_tree = prune_unwanted_nodes(cleaned_tree, REMOVE_COMMENTS_XPATH)
# extract content
postbody, temp_text, len_text = extract_content(cleaned_tree, options)
# compare if necessary
if no_fallback is False:
postbody, temp_text, len_text = compare_extraction(cleaned_tree_backup, tree_backup_1, url, postbody, temp_text, len_text, options)
# add baseline as additional fallback
# rescue: try to use original/dirty tree # and favor_precision is False=?
if len_text < config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
postbody, temp_text, len_text = baseline(tree_backup_2)
LOGGER.debug('non-clean extracted length: %s (extraction)', len_text)
# tree size sanity check
if max_tree_size is not None:
# strip tags
if len(postbody) > max_tree_size:
LOGGER.debug('output tree too long: %s', len(postbody))
strip_tags(postbody, 'hi')
# still too long, raise an error
if len(postbody) > max_tree_size:
LOGGER.debug('output tree too long: %s, discarding file', len(postbody))
raise ValueError
# size checks
if len_comments < config.getint('DEFAULT', 'MIN_EXTRACTED_COMM_SIZE'):
LOGGER.debug('not enough comments %s', url)
if len_text < config.getint('DEFAULT', 'MIN_OUTPUT_SIZE') and len_comments < config.getint('DEFAULT',
'MIN_OUTPUT_COMM_SIZE'):
LOGGER.debug('text and comments not long enough: %s %s', len_text, len_comments)
raise ValueError