In [1]:
import datasets
datasets.set_caching_enabled(False)

from datasets import load_dataset

from pathlib import Path

In [15]:
import multiprocessing

NUM_PROCS=multiprocessing.cpu_count()
print(f"num_procs: {NUM_PROCS}")

CC_INDEX_FOLDER=Path("/Users/thomas/code/bigscience/pseudo_crawl/") / "cc"

num_procs: 8


## To download index

You have run:
```bash
aws s3 sync s3://commoncrawl-dev/big-science-workshop/data-sourcing-sheet/cc/ $CC_INDEX_FOLDER/
```

In [2]:
def get_all_parquet_files(path):
    path = Path(path)
    all_crawls = [crawl for crawl in path.iterdir() if crawl.is_dir()]
    only_warcs = [subset for crawl in all_crawls for subset in crawl.iterdir() if subset.is_dir() and subset.name == "subset=warc"]
    return [str(file.absolute().resolve()) for subset in only_warcs for file in subset.iterdir() if file.is_file()]

ds = load_dataset("parquet", data_files=get_all_parquet_files(CC_INDEX_FOLDER), split="train[:10000]")


Resolving data files:   0%|          | 0/540 [00:00<?, ?it/s]

Using custom data configuration default-0f88c9d5930c8314
Reusing dataset parquet (/Users/thomas/.cache/huggingface/datasets/parquet/default-0f88c9d5930c8314/0.0.0/1638526fd0e8d960534e2155dc54fdff8dce73851f21f031d2fb9c2cf757c121)


In [3]:
print("\n".join(ds.column_names))

id
title
link
language
url_surtkey_prefix
url_surtkey
url_host_tld
url_host_registered_domain
url_host_name
url
fetch_status
fetch_time
warc_filename
warc_record_offset
warc_record_length
fetch_redirect
content_mime_detected
content_languages


In [4]:
print(len(ds))

10000


In [5]:
print(ds[0])

{'id': 335, 'title': 'diario el pueblo', 'link': 'http://www.diarioelpueblo.com.uy/', 'language': 'es', 'url_surtkey_prefix': 'uy,com,diarioelpueblo)/', 'url_surtkey': 'uy,com,diarioelpueblo)/', 'url_host_tld': 'uy', 'url_host_registered_domain': 'diarioelpueblo.com.uy', 'url_host_name': 'diarioelpueblo.com.uy', 'url': 'https://diarioelpueblo.com.uy/', 'fetch_status': 200, 'fetch_time': Timestamp('2021-06-16 01:11:54'), 'warc_filename': 'crawl-data/CC-MAIN-2021-25/segments/1623487621699.22/warc/CC-MAIN-20210616001810-20210616031810-00104.warc.gz', 'warc_record_offset': 205442192, 'warc_record_length': 26704, 'fetch_redirect': None, 'content_mime_detected': 'text/html', 'content_languages': 'spa'}


In [6]:
print(set(zip(ds["content_languages"], ds["language"])))

{('spa,deu', 'es'), ('eng,spa', 'es'), ('jpn,spa,eng', 'es'), ('eus', 'es'), ('spa,bre', 'es'), ('spa,grn', 'es'), ('spa,cat', 'es'), ('eng', 'es'), ('spa,eng,cat', 'es'), ('spa,cat,ita', 'es'), ('spa,ron', 'es'), ('spa,wol', 'es'), ('spa,lat', 'es'), ('spa,ind', 'es'), ('spa,war', 'es'), (None, 'es'), ('spa,nld', 'es'), ('spa,fra', 'es'), ('spa,cos', 'es'), ('spa,ces', 'es'), ('spa,eng', 'es'), ('spa,que', 'es'), ('spa,ara', 'es'), ('spa,dan', 'es'), ('spa,hrv', 'es'), ('spa,nno', 'es'), ('spa', 'es'), ('spa,wol,cat', 'es'), ('spa,eus', 'es'), ('spa,cat,dan', 'es'), ('spa,ita', 'es')}


## Getting pdf urls

In [9]:
def get_pdf_urls(batch):
    content_mime_detected = batch["content_mime_detected"]
    urls = batch["url"]
    assert len(content_mime_detected) == len(urls)
    # Arrow doesn't support None, setting empty string for now
    batch["pdf_url"] = [url if mime == "application/pdf" else "" for mime, url in zip(content_mime_detected, urls)]
    return batch
    
    
ds = ds.map(get_pdf_urls, batched=True, num_proc=NUM_PROCS)

# Test that there are other paths
set(ds["pdf_url"])

{'',
 'https://www.cibercuba.com/sites/default/files/pdf/2018_07_25-21_10-tabloide-constitucion-sin-precio-bn.pdf',
 'https://www.expreso.ec/uploads/files/2021/01/26/INSTITUCIONES-PLAN-VACUNARSE.pdf'}

## Get HTML and outgoing links

Problems:
 - fetching data is too slow using http -> need to implement an asynchronous pipeline

In [10]:
set(ds["warc_filename"])

{'crawl-data/CC-MAIN-2021-25/segments/1623487629632.54/warc/CC-MAIN-20210617072023-20210617102023-00252.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487613453.9/warc/CC-MAIN-20210614201339-20210614231339-00011.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623488559139.95/warc/CC-MAIN-20210624202437-20210624232437-00256.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623488273983.63/warc/CC-MAIN-20210621120456-20210621150456-00361.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487640324.35/warc/CC-MAIN-20210618165643-20210618195643-00224.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487582767.0/warc/CC-MAIN-20210612103920-20210612133920-00079.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623488539480.67/warc/CC-MAIN-20210623134306-20210623164306-00286.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623488286726.71/warc/CC-MAIN-20210621151134-20210621181134-00472.warc.gz',
 'crawl-data/CC-MAIN-2021-25/segments/1623487617599.15/warc/CC-MAIN-20210615053457-2021061

In [None]:
import requests
from warcio.archiveiterator import ArchiveIterator

'''
Download all warc files and extract html
'''

HTML_TYPES=['text/html', 'application/xhtml+xml']
def get_html(batch):
    content_mime_detected = batch["content_mime_detected"] # select only text/html
    url_host_registered_domains = batch["url_host_registered_domain"]
    warc_filenames = batch["warc_filename"]
    warc_record_length = batch["warc_record_length"]
    warc_record_offset = batch["warc_record_offset"]
    assert len(content_mime_detected) == len(warc_filenames)
    assert len(content_mime_detected) == len(warc_record_length)
    assert len(content_mime_detected) == len(warc_record_offset)
    
    htmls = []
    for mime, filename, length, offset, domain in zip(content_mime_detected, warc_filenames, warc_record_length, warc_record_offset, url_host_registered_domains):
        if mime not in HTML_TYPES:
            htmls.append("")
            continue
            
        headers = {
            "Range": f"bytes={offset}-{offset + length - 1}"
        }

        with requests.get(f'https://commoncrawl.s3.amazonaws.com/{filename}', headers=headers, stream=True) as response:
    
            for record in ArchiveIterator(response.raw):
                if record.rec_type == 'response':
                    html = record.content_stream().read()
                    break
        
        htmls.append(html)
        
    batch["html"] = htmls
    return batch
    
ds = ds.map(get_html, batched=True, batch_size=100, num_proc=NUM_PROCS)

In [None]:
from bs4 import BeautifulSoup
import re

#Retrieves a list of all external links found on a page
def get_external_links(soup, exclude_url):
    external_links = []
    #Finds all links that start with "http" that do
    #not contain the current URL
    for link in soup.find_all('a', {'href' : re.compile('^(((http|https)://)|www){1,2}((?!'+exclude_url+').)*$')}):
        if link.attrs['href'] is not None:
            if link.attrs['href'] not in external_links:
                external_links.append(link.attrs['href'])
    return external_links


def preprocess_html(soup):
    text = soup.get_text()
    text = re.sub(r"\t{2,}","\t",text)
    text = re.sub(r"((\s+)\n(\s+))+","\n",text)
    return text
    
def get_text_and_outgoing_lings(batch):
    content_mime_detected = batch["content_mime_detected"] # select only text/html
    url_host_registered_domains = batch["url_host_registered_domain"]
    htmls = batch["html"]
    assert len(content_mime_detected) == len(htmls)
    assert len(content_mime_detected) == len(url_host_registered_domains)

    texts=[]
    external_urls=[]    
    for mime, html, domain in zip(content_mime_detected, htmls, url_host_registered_domains):
        if mime not in HTML_TYPES:
            texts.append("")
            external_urls.append([])
            continue
           
        soup = BeautifulSoup(html, 'html.parser')
        text = preprocess_html(soup)
        texts.append(text)
        external_urls.append(get_external_links(soup, domain))
        
        
    batch["text"] = texts
    batch["external_urls"] = external_urls
    return batch
    
import time

t0=time.time()
ds = ds.map(get_text_and_outgoing_lings, batched=True, batch_size=100, num_proc=NUM_PROCS)
t1=time.time()
print(f"get_html: {t1-t0}")

In [None]:
ds[0]["warc_record"]

In [None]:
ds[4]["text"]

In [None]:
ds[0]["external_urls"]

## Cleaning up dataset

In [None]:
columns_to_keep = ["id", "title", "link", "languages", "pdf_url", "html", "text", "external_urls"]
columns_to_remove = [column for column in ds.column_names if column not in columns_to_keep]
print(columns_to_remove)
cleaned_ds = ds.remove_columns(columns_to_remove)

In [None]:
print(cleaned_ds[0])