In [14]:
import datasets

from datasets import load_dataset

from pathlib import Path

In [15]:
import multiprocessing

NUM_PROCS=multiprocessing.cpu_count()
print(f"num_procs: {NUM_PROCS}")

CC_INDEX_FOLDER=Path("/Users/thomas/code/bigscience/pseudo_crawl/") / "cc"

num_procs: 8


## To download index

You have run:
```bash
aws s3 sync s3://commoncrawl-dev/big-science-workshop/data-sourcing-sheet/cc/ $CC_INDEX_FOLDER/
```

In [16]:
def get_all_parquet_files(path):
    path = Path(path)
    all_crawls = [crawl for crawl in path.iterdir() if crawl.is_dir()]
    only_warcs = [subset for crawl in all_crawls for subset in crawl.iterdir() if subset.is_dir() and subset.name == "subset=warc"]
    return [str(file.absolute().resolve()) for subset in only_warcs for file in subset.iterdir() if file.is_file()]

ds = load_dataset("parquet", data_files=get_all_parquet_files(CC_INDEX_FOLDER), split="train[:10]")


Resolving data files:   0%|          | 0/540 [00:00<?, ?it/s]

Using custom data configuration default-58ef27b3324726df
Reusing dataset parquet (/Users/thomas/.cache/huggingface/datasets/parquet/default-58ef27b3324726df/0.0.0/1638526fd0e8d960534e2155dc54fdff8dce73851f21f031d2fb9c2cf757c121)


In [17]:
print("\n".join(ds.column_names))

id
title
link
language
url_surtkey_prefix
url_surtkey
url_host_tld
url_host_registered_domain
url_host_name
url
fetch_status
fetch_time
warc_filename
warc_record_offset
warc_record_length
fetch_redirect
content_mime_detected
content_languages


In [18]:
print(len(ds))

10


In [19]:
print(ds[0])

{'id': 335, 'title': 'diario el pueblo', 'link': 'http://www.diarioelpueblo.com.uy/', 'language': 'es', 'url_surtkey_prefix': 'uy,com,diarioelpueblo)/', 'url_surtkey': 'uy,com,diarioelpueblo)/', 'url_host_tld': 'uy', 'url_host_registered_domain': 'diarioelpueblo.com.uy', 'url_host_name': 'diarioelpueblo.com.uy', 'url': 'https://diarioelpueblo.com.uy/', 'fetch_status': 200, 'fetch_time': Timestamp('2021-06-16 01:11:54'), 'warc_filename': 'crawl-data/CC-MAIN-2021-25/segments/1623487621699.22/warc/CC-MAIN-20210616001810-20210616031810-00104.warc.gz', 'warc_record_offset': 205442192, 'warc_record_length': 26704, 'fetch_redirect': None, 'content_mime_detected': 'text/html', 'content_languages': 'spa'}


In [7]:
print(set(zip(ds["content_languages"], ds["language"])))

{('spa', 'es')}


In [20]:
print(ds[0]["fetch_time"], ds[1]["fetch_time"], type(ds[0]["fetch_time"]), ds[0]["fetch_time"]< ds[1]["fetch_time"])

2021-06-16 01:11:54 2021-06-14 17:39:58 <class 'pandas._libs.tslibs.timestamps.Timestamp'> False


## Getting pdf urls

In [8]:
def get_pdf_urls(batch):
    content_mime_detected = batch["content_mime_detected"]
    urls = batch["url"]
    assert len(content_mime_detected) == len(urls)
    # Arrow doesn't support None, setting empty string for now
    batch["pdf_url"] = [url if mime == "application/pdf" else "" for mime, url in zip(content_mime_detected, urls)]
    return batch
    
    
ds = ds.map(get_pdf_urls, batched=True, num_proc=NUM_PROCS)

# Test that there are other paths
set(ds["pdf_url"])

{''}

## Get HTML and outgoing links

Problems:
 - fetching data is too slow using http -> need to implement an asynchronous pipeline

In [9]:
set(ds["warc_filename"])

{'crawl-data/CC-MAIN-2021-25/segments/1623487613380.12/warc/CC-MAIN-20210614170602-20210614200602-00187.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487613380.12/warc/CC-MAIN-20210614170602-20210614200602-00253.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487613380.12/warc/CC-MAIN-20210614170602-20210614200602-00350.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487613380.12/warc/CC-MAIN-20210614170602-20210614200602-00373.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487613380.12/warc/CC-MAIN-20210614170602-20210614200602-00409.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487613380.12/warc/CC-MAIN-20210614170602-20210614200602-00606.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487621699.22/warc/CC-MAIN-20210616001810-20210616031810-00104.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487621699.22/warc/CC-MAIN-20210616001810-20210616031810-00127.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487621699.22/warc/CC-MAIN-20210616001810-20210

In [10]:
import requests
from warcio.archiveiterator import ArchiveIterator
from warcio.recordloader import ArchiveLoadFailed

'''
Download all warc files and extract html
'''

HTML_TYPES=['text/html', 'application/xhtml+xml']
def get_warc(batch):
    content_mime_detected = batch["content_mime_detected"] # select only text/html
    url_host_registered_domains = batch["url_host_registered_domain"]
    warc_filenames = batch["warc_filename"]
    warc_record_length = batch["warc_record_length"]
    warc_record_offset = batch["warc_record_offset"]
    assert len(content_mime_detected) == len(warc_filenames)
    assert len(content_mime_detected) == len(warc_record_length)
    assert len(content_mime_detected) == len(warc_record_offset)
    
    compressed_warcs = []
    for mime, filename, length, offset, domain in zip(content_mime_detected, warc_filenames, warc_record_length, warc_record_offset, url_host_registered_domains):
        headers = {
            "Range": f"bytes={offset}-{offset + length - 1}"
        }
        
        response = requests.get(f'https://commoncrawl.s3.amazonaws.com/{filename}', headers=headers)
        compressed_warcs.append(response.content)
        
    batch["compressed_warc"] = compressed_warcs
    return batch
    
ds = ds.map(get_warc, batched=True, batch_size=100, num_proc=NUM_PROCS)

In [11]:
from bs4 import BeautifulSoup
import re
from warcio.archiveiterator import ArchiveIterator
import io

#Retrieves a list of all external links found on a page
def get_external_links(soup, exclude_url):
    external_links = set()
    #Finds all links that start with "http" that do
    #not contain the current URL
    for link in soup.find_all('a', {'href' : re.compile('^(((http|https)://)|www){1,2}((?!'+exclude_url+').)*$')}):
        href = link.attrs['href']
        if href is not None:
            external_links.add(href)
    return list(external_links)
    
def get_outgoing_links(batch):
    content_mime_detected = batch["content_mime_detected"] # select only text/html
    url_host_registered_domains = batch["url_host_registered_domain"]
    compressed_warcs = batch["compressed_warc"]
    assert len(content_mime_detected) == len(compressed_warcs)
    assert len(content_mime_detected) == len(url_host_registered_domains)

    external_urls=[]    
    for mime, compressed_warc, domain in zip(content_mime_detected, compressed_warcs, url_host_registered_domains):
        if mime not in HTML_TYPES:
            external_urls.append([])
            continue

        with io.BytesIO(compressed_warc) as stream:
            html = None
            for record in ArchiveIterator(stream):
                if record.rec_type == 'response':
                    html = record.content_stream().read()
                    break

        assert html is not None
        soup = BeautifulSoup(html, 'html.parser')
        external_urls.append(get_external_links(soup, domain))
        
    batch["external_urls"] = external_urls
    return batch
    
import time

t0=time.time()
ds = ds.map(get_outgoing_links, batched=True, batch_size=100, num_proc=NUM_PROCS)
t1=time.time()
print(f"get_html: {t1-t0}")

get_html: 0.15583300590515137


In [12]:
ds[0]["compressed_warc"]

b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x00\xec\xbd\xdbN#\xc9\xb6(\xfa\x0cR\xffC\xb4{w\x03\xb3\x9dvf\xfan\nzQ@W\xd1\x8b*\x98@\xd5\\=\xabKV83l\'\x95\xcet\xe7\x05p\xd1Hk?\xee\xc7\xfdx\xb6\xce\x91\xce|9\xd2\xd4R\xebhi\xbe\xed\xf5\xb0\xa4\xe6O\xd6\x07\x9co8cD\xe4\xd5N\x831\x97\x82*\xd7\x05\xec\xb8\x8c\x88\x181b\\"F\x8c\xf8\xcb\xc6\xc1fQ)\xc8_-\xfe\x05>IG\xc3\x01k\x12\x87\xb9\x03\xdbrY\x90\xb8E=HTeU\x91\xe4\xaa\xa4T\x8fd\xa5\xa9(\xcdJ\xf9\xafA\x81\x03\xa6\xd9\x8e.\xedl5\xc93\xdf\xb1\x9a\xbeo\xe8\xcdZ\xa5]j\xabu]\xea\xb02\x95\xcaJ\xb9$\xb5\xab\x1dU\xaaj\x8a\\\xa1UE\x96\xa9\xbe\xfe\xd5\xe2\xa6my\xcc\xf2\xa4]fu\xbd^\x93(\xb5jC\x91\xe3t\xd1#:\x18\x98\x86F=\xc3\xb6\x8a=\xcf\x1b\xac\x92\xbe\xdb\xf5 km\xa4\xaf\x7f\xa1\x8efX\x1d{\xa43\x15\xbdQnw\x98"iU\x15:\xa3j\xb2TW\x95\xbaT\xaa\x94\xcb\xba\xdc\x90\xcbz\xbb\xba\x1e\x80\x80\x965\xdfqx\xe3v\x12HGi\xcbU\xb5S\x82\xfa\x8a.\x95\xcb\x0cFT.U\xa4*c\x8a\xd2\xa8W5\xb9Q\x0e\x81\xec\xecK\x1b\xba\x0e\x9dsqH\x8d\x82Z+(\x95j\xa1\xac\x86\x88\xa6N\x97y\xd2\x9b\x83\

In [13]:
ds[0]["external_urls"]

['https://www.dgi.gub.uy/wdgi/page?2,impuesto_primaria,dgi--impuesto-de-primaria,O,es,0,',
 'https://twitter.com/elpueblosalto',
 'https://issuu.com/johngn360-1/docs/gnclasificados_13-6-2021',
 'https://www.facebook.com/diarioelpueblodesalto',
 'https://gn.uy/gn-agencia-digital/',
 'https://www.instagram.com/diarioelpueblodesalto2019/']

## Cleaning up dataset

In [None]:
columns_to_keep = ["id", "title", "link", "languages", "pdf_url", "html", "compressed_warc", "external_urls"]
columns_to_remove = [column for column in ds.column_names if column not in columns_to_keep]
print(columns_to_remove)
cleaned_ds = ds.remove_columns(columns_to_remove)

In [None]:
print(cleaned_ds[0])