# Extract urls from wikitext
by Diego Sáez Trumper (WMF)

This code make usage of the Wikimedia Analytics cluster. 
You could emulate this by uploading the wikimedia XML dumps to your spark cluster


In [None]:
spark = wmfdata.spark.get_session(type='yarn-large')

In [1]:
spark.sql('show partitions wmf.mediawiki_wikitext_history').show()

+--------------------+
|           partition|
+--------------------+
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
|snapshot=2021-07/...|
+--------------------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,StringType
import re

links_RE = re.compile('https?://[^\s]+')


def extract_sections(text):
    return links_RE.findall(text)

reudf  = udf(extract_sections , ArrayType(StringType()))


In [5]:
df = spark.sql('''SELECT wiki_db,page_id,revision_text,revision_timestamp 
                    FROM wmf.mediawiki_wikitext_history WHERE page_namespace=0  
                    AND snapshot="2021-07"''')
df  = df.withColumn('links',reudf(df.revision_text))

In [6]:
from pyspark.sql.functions import explode
df = df.withColumn("link", explode(df.links)).drop("links","revision_text")

In [7]:
df2 = df.groupBy('wiki_db','page_id','link').agg({"revision_timestamp": "min"})

In [26]:
df2.show()

+-------+-------+--------------------+-----------------------+
|wiki_db|page_id|                link|min(revision_timestamp)|
+-------+-------+--------------------+-----------------------+
| enwiki|     12| http://anarclan.net|   2004-01-06T06:10:00Z|
| enwiki|     12|http://cnt-ait.in...|   2004-07-14T13:17:36Z|
| enwiki|     12|http://dwardmac.p...|   2019-03-07T11:52:25Z|
| enwiki|     12|http://dwardmac.p...|   2006-03-11T16:08:36Z|
| enwiki|     12|http://newleftrev...|   2010-03-17T06:50:15Z|
| enwiki|     12|http://web.archiv...|   2016-09-23T01:49:36Z|
| enwiki|     12|http://www.aynran...|   2006-10-25T00:58:30Z|
| enwiki|     12|http://www.bigate...|   2005-07-11T06:12:33Z|
| enwiki|     12|http://www.bigate...|   2005-07-11T06:12:33Z|
| enwiki|     12|http://www.britan...|   2014-04-24T19:03:11Z|
| enwiki|     12|http://www.britan...|   2006-08-30T02:29:32Z|
| enwiki|     12|http://www.britan...|   2018-06-02T04:26:02Z|
| enwiki|     12|http://www.crimet...|   2002-12-16T20:

In [2]:
spark.sql('show partitions wwikidata_item_page_link')

AnalysisException: "Table or view 'wikidata_item_page_link' not found in database 'default';"

In [12]:
wikidata = spark.sql('''SELECT wiki_db,item_id,page_id  FROM wmf.wikidata_item_page_link WHERE 
                        snapshot="2020-09-28"
                         ''')

df3 = df2.join(wikidata,["wiki_db","page_id"]).withColumnRenamed('min(revision_timestamp)','timestamp_moin')

In [11]:
df3.show()

+-------+-------+--------------------+-----------------------+---------+
|wiki_db|page_id|                link|min(revision_timestamp)|  item_id|
+-------+-------+--------------------+-----------------------+---------+
| abwiki|   2479|http://www.ethno-...|   2008-09-04T14:48:55Z|  Q620884|
| abwiki|   2529|http://www.ethno-...|   2008-09-07T14:42:07Z|  Q621904|
| abwiki|   4036|http://www.amazon...|   2010-09-21T11:18:17Z|    Q5208|
| abwiki|   4036|http://www.amazon...|   2010-09-21T11:18:17Z|    Q5208|
| abwiki|   4066|http://www.capita...|   2010-09-27T16:42:05Z|     Q174|
| abwiki|   4792|http://www.beneti...|   2011-06-18T15:34:37Z|   Q25232|
| abwiki|   4861|http://www.adana....|   2011-08-04T23:51:04Z|   Q38545|
| abwiki|   5726|http://kaspi.org.ge/|   2013-08-16T20:30:13Z|  Q956954|
| abwiki|   5726| http://kaspi.org.ge|   2013-08-16T20:30:13Z|  Q956954|
| abwiki|   5726|http://www.geosta...|   2019-07-05T19:05:17Z|  Q956954|
|acewiki|    539|http://en.preside...|   2012-04-10

In [10]:
df3.schema

StructType(List(StructField(wiki_db,StringType,true),StructField(page_id,LongType,true),StructField(link,StringType,true),StructField(min(revision_timestamp),StringType,true),StructField(item_id,StringType,true)))

In [None]:
df3.write.parquet('references.parquet',mode='overwrite')


In [None]:
df3.count()