In [2]:
#Create a spark context
import wmfdata
spark = wmfdata.spark.get_session(type='yarn-large')

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [4]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
from urllib.parse import urlparse

In [5]:
@F.udf(returnType=T.ArrayType(T.StringType()))
def extract_urls(wikitext):
    links_regexp = re.compile('https?://[^\s]+')
    refs = links_regexp.findall(wikitext)
    return refs
    
@F.udf(returnType=T.StringType())
def get_domain(url): 
    def remove_prefix(text, prefix):
        if text.startswith(prefix):
            return text[len(prefix):]
        return text 
    
    try: 
        domain = remove_prefix(urlparse(url).netloc,'www.')
        if domain == 'web.archive.org':
            target_url = 'http'+url.split('http')[2]
            return remove_prefix(urlparse(target_url).netloc,'www.')
        return domain
    except: return None   


In [8]:
%%time
    
snapshot ="2022-10"

wikihistory_revs = spark.sql('''
    SELECT 
        wiki_db, page_id, revision_id, revision_text
    FROM 
        wmf.mediawiki_wikitext_history
    WHERE 
        page_namespace=0
        AND snapshot="{snapshot}" 
'''.format(snapshot=snapshot))

CPU times: user 0 ns, sys: 3.4 ms, total: 3.4 ms
Wall time: 200 ms


In [9]:
wikihistory_revs = wikihistory_revs.withColumn('urls', extract_urls(F.col('revision_text'))) 
wikihistory_revs = wikihistory_revs.withColumn("url", F.explode(F.col('urls'))).drop("urls","revision_text")
wikihistory_revs = wikihistory_revs.groupBy('wiki_db','page_id','url').agg({"revision_id": "min"})\
                    .withColumnRenamed('min(revision_id)','revision_id')
wikihistory_revs

DataFrame[wiki_db: string, page_id: bigint, url: string, revision_id: bigint]

In [10]:
wiki_domains = wikihistory_revs.withColumn("domain", get_domain(F.col('url')))
wiki_domains

DataFrame[wiki_db: string, page_id: bigint, url: string, revision_id: bigint, domain: string]

In [17]:
!hadoop fs -mkdir urls

In [None]:
wiki_domains.write.parquet('urls/all_links_domain.parquet',mode='overwrite')

In [None]:
# Make a domain list

In [None]:
data.where(F.col('domain')!='').select('domain').distinct().write.csv('urls/url_list.csv', mode='overwrite')

In [None]:
!hadoop fs -text urls/url_list.csv/* |gzip >> data/url_list.csv.gz

In [None]:
import pandas as pd

pdf = pd.read_csv('data/url_list.csv.gz', warn_bad_lines=True, error_bad_lines=False, names=['domain'])

#https://stackoverflow.com/questions/26093545/how-to-validate-domain-name-using-regex
pdf[pdf.domain.str.match("^[a-zA-Z0-9][a-zA-Z0-9-]{1,61}[a-zA-Z0-9](?:\.[a-zA-Z]{2,})+$")].sort_values('domain_length').reset_index(drop=True).to_csv('data/domain_list_filtered.csv.gz')