Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common Crawl crawler: adapt to new data access scheme, fixes #223 #226

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ news-please supports three main use cases, which are explained in more detail in
* commoncrawl.org provides an extensive, free-to-use archive of news articles from small and major publishers world wide
* news-please enables users to conveniently download and extract articles from commoncrawl.org
* you can optionally define filter criteria, such as news publisher(s) or the date period, within which articles need to be published
* clone the news-please repository, [install the awscli tool](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html), adapt the config section in [newsplease/examples/commoncrawl.py](/newsplease/examples/commoncrawl.py), and execute `python3 -m newsplease.examples.commoncrawl`
* clone the news-please repository, adapt the config section in [newsplease/examples/commoncrawl.py](/newsplease/examples/commoncrawl.py), and execute `python3 -m newsplease.examples.commoncrawl`

## Getting started
It's super easy, we promise!
Expand Down
145 changes: 91 additions & 54 deletions newsplease/crawler/commoncrawl_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
"""
import logging
import os
import subprocess
import tempfile
import time
from functools import partial
from multiprocessing import Pool
import datetime
import gzip
from urllib.parse import urlparse

import boto3
import botocore
from dateutil import parser
import requests
from scrapy.utils.log import configure_logging

from ..crawler.commoncrawl_extractor import CommonCrawlExtractor
Expand All @@ -23,7 +26,8 @@
__credits__ = ["Sebastian Nagel"]

# commoncrawl.org
__cc_base_url = 'https://commoncrawl.s3.amazonaws.com/'
__cc_base_url = 'https://data.commoncrawl.org/'
__cc_bucket = 'commoncrawl'

# log file of fully extracted WARC files
__log_pathname_fully_extracted_warcs = None
Expand Down Expand Up @@ -56,7 +60,7 @@ def __setup(local_download_dir_warc, log_level):
global __log_pathname_fully_extracted_warcs
__log_pathname_fully_extracted_warcs = os.path.join(local_download_dir_warc, 'fullyextractedwarcs.list')

# make loggers quite
# make loggers quiet
configure_logging({"LOG_LEVEL": "ERROR"})
logging.getLogger('requests').setLevel(logging.CRITICAL)
logging.getLogger('readability').setLevel(logging.CRITICAL)
Expand All @@ -66,6 +70,9 @@ def __setup(local_download_dir_warc, log_level):
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('jieba').setLevel(logging.CRITICAL)

boto3.set_stream_logger('botocore', log_level)
boto3.set_stream_logger('boto3', log_level)

# set own logger
logging.basicConfig(level=log_level)
__logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,12 +108,15 @@ def __iterate_by_month(start_date=None, end_date=None, month_step=1):
# Until now.
end_date = datetime.datetime.today()
current_date = start_date
while current_date < end_date:
yield current_date
yield current_date
while True:
carry, new_month = divmod(current_date.month - 1 + month_step, 12)
new_month += 1
current_date = current_date.replace(year=current_date.year + carry,
month=new_month)
yield current_date
if current_date > end_date:
break


def __extract_date_from_warc_filename(path):
Expand All @@ -115,7 +125,11 @@ def __extract_date_from_warc_filename(path):
fn = fn.replace('CC-NEWS-', '')
dt = fn.split('-')[0]

return datetime.datetime.strptime(dt, '%Y%m%d%H%M%S')
try:
return datetime.datetime.strptime(dt, '%Y%m%d%H%M%S')
except:
# return date clearly outside the range
return datetime.datetime(1900, 1, 1)


def __date_within_period(date, start_date=None, end_date=None):
Expand All @@ -128,7 +142,7 @@ def __date_within_period(date, start_date=None, end_date=None):
return start_date <= date < end_date


def __get_remote_index(warc_files_start_date, warc_files_end_date):
def __get_remote_index(warc_files_start_date=None, warc_files_end_date=None):
"""
Gets the index of news crawl files from commoncrawl.org and returns an array of names
:param warc_files_start_date: only list .warc files with greater or equal date in
Expand All @@ -137,58 +151,73 @@ def __get_remote_index(warc_files_start_date, warc_files_end_date):
:return:
"""

with tempfile.NamedTemporaryFile() as temp:
temp_filename = temp.name
s3_client = boto3.client('s3')
# Verify access to commoncrawl bucket
try:
s3_client.head_bucket(Bucket=__cc_bucket)
except (botocore.exceptions.ClientError, botocore.exceptions.NoCredentialsError):
__logger.info('Failed to read %s bucket, using monthly WARC file listings', __cc_bucket)
s3_client = None

if os.name == 'nt':
awk_parameter = '"{ print $4 }"'
else:
awk_parameter = "'{ print $4 }'"
objects = []

# get the remote info
if s3_client:
def s3_list_objects(bucket, prefix):
response = s3_client.list_objects(Bucket=bucket, Prefix=prefix)
if 'Contents' not in response:
return []
return [x['Key'] for x in response['Contents']]

cmd = ''
if warc_files_start_date or warc_files_end_date:
# cleanup
try:
os.remove(temp_filename)
except OSError:
pass

# The news files are grouped per year and month in separate folders
warc_dates = __iterate_by_month(start_date=warc_files_start_date, end_date=warc_files_end_date)
for date in warc_dates:
year = date.strftime('%Y')
month = date.strftime('%m')
cmd += "aws s3 ls --recursive s3://commoncrawl/crawl-data/CC-NEWS/%s/%s/ --no-sign-request >> %s && " % (year, month, temp_filename)

prefix = 'crawl-data/CC-NEWS/%s/%s/' % (year, month)
__logger.debug('Listing objects on S3 bucket %s and prefix %s', __cc_bucket, prefix)
objects += s3_list_objects(__cc_bucket, prefix)
else:
cmd = "aws s3 ls --recursive s3://commoncrawl/crawl-data/CC-NEWS/ --no-sign-request > %s && " % temp_filename

cmd += "awk %s %s " % (awk_parameter, temp_filename)
objects = s3_list_objects(__cc_bucket, 'crawl-data/CC-NEWS/')

__logger.info('executing: %s', cmd)
exitcode, stdout_data = subprocess.getstatusoutput(cmd)

if exitcode > 0:
raise Exception(stdout_data)

lines = stdout_data.splitlines()
else:
# The news files are grouped per year and month in separate folders
warc_dates = __iterate_by_month(start_date=warc_files_start_date, end_date=warc_files_end_date)
for date in warc_dates:
year = date.strftime('%Y')
month = date.strftime('%m')
url = '%scrawl-data/CC-NEWS/%s/%s/warc.paths.gz' % (__cc_base_url, year, month)
__logger.debug('Fetching WARC paths listing %s', url)
response = requests.get(url)
if response:
objects += gzip.decompress(response.content).decode('ascii').strip().split('\n')
else:
__logger.info('Failed to fetch WARC file list %s: %s', url, response)

if warc_files_start_date or warc_files_end_date:
# Now filter further on day of month, hour, minute
lines = [
p for p in lines if __date_within_period(
objects = [
p for p in objects if __date_within_period(
__extract_date_from_warc_filename(p),
start_date=warc_files_start_date,
end_date=warc_files_end_date,
)
]

return lines
__logger.info('Found %i WARC files', len(objects))

return objects

def __get_list_of_fully_extracted_warc_urls():
def __get_url_path(url_or_path):
if url_or_path.startswith('http:') or url_or_path.startswith('https:'):
try:
url = urlparse(url_or_path)
return url.path.lstrip('/') # trim leading slash
except:
pass
return url_or_path

def __get_list_of_fully_extracted_warc_paths():
"""
Reads in the log file that contains a list of all previously, fully extracted WARC urls
:return:
Expand All @@ -201,6 +230,9 @@ def __get_list_of_fully_extracted_warc_urls():
# remove break lines
list_warcs = [x.strip() for x in list_warcs]

# (back-ward compatibility) if it's a URL keep only the path
list_warcs = [__get_url_path(x) for x in list_warcs]

return list_warcs


Expand Down Expand Up @@ -247,7 +279,7 @@ def __callback_on_warc_completed(warc_path, counter_article_passed, counter_arti
__counter_article_error, __counter_article_total, __counter_warc_processed)


def __start_commoncrawl_extractor(warc_download_url, callback_on_article_extracted=None,
def __start_commoncrawl_extractor(warc_path, callback_on_article_extracted=None,
callback_on_warc_completed=None, valid_hosts=None,
start_date=None, end_date=None,
strict_date=True, reuse_previously_downloaded_files=True,
Expand All @@ -261,7 +293,7 @@ def __start_commoncrawl_extractor(warc_download_url, callback_on_article_extract
fetch_images=False):
"""
Starts a single CommonCrawlExtractor
:param warc_download_url:
:param warc_path: path to the WARC file on s3://commoncrawl/ resp. https://data.commoncrawl.org/
:param callback_on_article_extracted:
:param callback_on_warc_completed:
:param valid_hosts:
Expand All @@ -278,7 +310,7 @@ def __start_commoncrawl_extractor(warc_download_url, callback_on_article_extract
:return:
"""
commoncrawl_extractor = extractor_cls()
commoncrawl_extractor.extract_from_commoncrawl(warc_download_url, callback_on_article_extracted,
commoncrawl_extractor.extract_from_commoncrawl(warc_path, callback_on_article_extracted,
callback_on_warc_completed=callback_on_warc_completed,
valid_hosts=valid_hosts,
start_date=start_date, end_date=end_date,
Expand All @@ -299,7 +331,8 @@ def crawl_from_commoncrawl(callback_on_article_extracted, callback_on_warc_compl
continue_after_error=True, show_download_progress=False,
number_of_extraction_processes=4, log_level=logging.ERROR,
delete_warc_after_extraction=True, continue_process=True,
extractor_cls=CommonCrawlExtractor, fetch_images=False):
extractor_cls=CommonCrawlExtractor, fetch_images=False,
dry_run=False):
"""
Crawl and extract articles form the news crawl provided by commoncrawl.org. For each article that was extracted
successfully the callback function callback_on_article_extracted is invoked where the first parameter is the
Expand All @@ -320,6 +353,7 @@ def crawl_from_commoncrawl(callback_on_article_extracted, callback_on_warc_compl
:param show_download_progress:
:param log_level:
:param extractor_cls:
:param dry_run: if True just list the WARC files to be processed but do not actually process them
:return:
"""
__setup(local_download_dir_warc, log_level)
Expand All @@ -334,27 +368,30 @@ def crawl_from_commoncrawl(callback_on_article_extracted, callback_on_warc_compl

# multiprocessing (iterate the list of crawl_names, and for each: download and process it)
__logger.info('creating extraction process pool with %i processes', number_of_extraction_processes)
warc_download_urls = []
fully_extracted_warc_urls = __get_list_of_fully_extracted_warc_urls()
for name in cc_news_crawl_names:
warc_download_url = __get_download_url(name)
warc_paths = []
fully_extracted_warc_paths = __get_list_of_fully_extracted_warc_paths()
for warc_path in cc_news_crawl_names:
if continue_process:
# check if the current WARC has already been fully extracted (assuming that the filter criteria have not
# been changed!)
if warc_download_url in fully_extracted_warc_urls:
__logger.info('skipping WARC because fully extracted: %s' % warc_download_url)
if warc_path in fully_extracted_warc_paths:
__logger.info('skipping WARC because fully extracted: %s', warc_path)
global __counter_warc_skipped
__counter_warc_skipped += 1
pass
else:
warc_download_urls.append(warc_download_url)
warc_paths.append(warc_path)

else:
# if not continue process, then always add
warc_download_urls.append(warc_download_url)
warc_paths.append(warc_path)

if dry_run:
for warc_path in warc_paths:
__logger.info('(Dry run) Selected WARC file for processing: %s', warc_path)

# run the crawler in the current, single process if number of extraction processes is set to 1
if number_of_extraction_processes > 1:
elif number_of_extraction_processes > 1:
with Pool(number_of_extraction_processes) as extraction_process_pool:
extraction_process_pool.map(partial(__start_commoncrawl_extractor,
callback_on_article_extracted=callback_on_article_extracted,
Expand All @@ -371,10 +408,10 @@ def crawl_from_commoncrawl(callback_on_article_extracted, callback_on_warc_compl
log_pathname_fully_extracted_warcs=__log_pathname_fully_extracted_warcs,
extractor_cls=extractor_cls,
fetch_images=fetch_images),
warc_download_urls)
warc_paths)
else:
for warc_download_url in warc_download_urls:
__start_commoncrawl_extractor(warc_download_url,
for warc_path in warc_paths:
__start_commoncrawl_extractor(warc_path,
callback_on_article_extracted=callback_on_article_extracted,
callback_on_warc_completed=__callback_on_warc_completed,
valid_hosts=valid_hosts,
Expand Down
Loading