## 1. Parsing of the Wikipedia

As the first step the relevant information from the Wikipedia to build a graph of the article relationships is extracted.

### Preparation of the Spark session

Imports and configuration specific to the use of the almond kernel.

In [4]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`org.apache.spark::spark-graphx:2.4.5`
import $ivy.`sh.almond::almond-spark:0.6.0`
import $ivy.`com.databricks::spark-xml:0.9.0`
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

val spark = {
  NotebookSparkSession.builder()
    .progress(false)
    .master("local[*]")
    .config("spark.executor.memory", "2g")
    .config("spark.local.dir", "/data/flachsenberg/tmp/")
    .getOrCreate()
}

Creating SparkSession


[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                              
[39m
[32mimport [39m[36m$ivy.$                                
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@4fa2a80

Other imports, especially the spark-xml parser.

In [5]:
import org.apache.spark.sql.types._
import com.databricks.spark.xml._
import scala.util.matching.Regex
import spark.implicits._

def sc = spark.sparkContext

[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[32mimport [39m[36mcom.databricks.spark.xml._
[39m
[32mimport [39m[36mscala.util.matching.Regex
[39m
[32mimport [39m[36mspark.implicits._

[39m
defined [32mfunction[39m [36msc[39m

### XML parsing

The XML dump of the Wikipedia can be obtained here https://dumps.wikimedia.org/dewiki/ for the German Wikipedia.

The Wikipedia XML format is described in detail here: https://meta.wikimedia.org/wiki/Data_dumps/Dump_format

The relevant fields in the <code>page</code> are the article identifier <code>id</code>, the namespace <code>ns</code>, the article title <code>title</code>, the redirection <code>redirect</code> (that is non-empty if this article is not an actual article, but a redirection) and finally the <code>revision</code> tag. Here, it should be noted that the redirection is stored as an XML attribute to <code>redirect</code> and the actual text of the article is stored in the <code>text</code> of the <code>revision</code> tag.

The relevant information is extracted using the spark-xml reader with the following given schema. This reads the German Wikipedia into a DataFrame.

In [9]:
val schema = new StructType()
      .add("id", LongType)
      .add("ns", LongType)
      .add("title", StringType)
      .add("redirect", new StructType().add("_title", StringType))
      .add("revision", new StructType().add("text", StringType))
val df = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "page")
      .schema(schema)
      .xml("dewiki-20200601-pages-articles.xml")

[36mschema[39m: [32mStructType[39m = [33mList[39m(
  [33mStructField[39m([32m"id"[39m, LongType, true, {}),
  [33mStructField[39m([32m"ns"[39m, LongType, true, {}),
  [33mStructField[39m([32m"title"[39m, StringType, true, {}),
  [33mStructField[39m(
    [32m"redirect"[39m,
    [33mList[39m([33mStructField[39m([32m"_title"[39m, StringType, true, {})),
    true,
    {}
  ),
  [33mStructField[39m(
    [32m"revision"[39m,
    [33mList[39m([33mStructField[39m([32m"text"[39m, StringType, true, {})),
    true,
    {}
  )
)
[36mdf[39m: [32mDataFrame[39m = [id: bigint, ns: bigint ... 3 more fields]

This DataFrame will be filtered and flattened, i.e. only the articles in the main namespace 0 are kept (after that the namespace attribute is no longer needed) and the nested <code>redirect</code> and <code>revision</code> attributes are flattened.

In [10]:
val filtered = df.filter($"ns" === 0)
                 .select("id", "title", "redirect.*", "revision.*")
                 .withColumnRenamed("_title", "redirect")
filtered.cache
filtered.count()

[36mfiltered[39m: [32mDataFrame[39m = [id: bigint, title: string ... 2 more fields]
[36mres9_1[39m: [32mDataFrame[39m = [id: bigint, title: string ... 2 more fields]
[36mres9_2[39m: [32mLong[39m = [32m4017802L[39m

This results in a DataFrame with more than 4 million rows, each corresponding to one article. A DataFrame that contains only the <code>id</code> and the <code>title</code> attribute is derived and stored for later analysis.

In [11]:
val titles = filtered.drop("redirect").drop("text")
titles.cache
titles.show()
titles.write.save("titles.parquet")

+---+--------------------+
| id|               title|
+---+--------------------+
|  1|        Alan Smithee|
|  3|            Actinium|
|  5|             Ang Lee|
|  7|Anschluss (Soziol...|
|  8|  Anschlussfähigkeit|
| 10|       Aussagenlogik|
| 11|          Autopoiese|
| 12|                A.A.|
| 13| Liste von Autoren/A|
| 14| Liste von Autoren/H|
| 15| Liste von Autoren/C|
| 16| Liste von Autoren/I|
| 17| Liste von Autoren/K|
| 18| Liste von Autoren/J|
| 19| Liste von Autoren/V|
| 20| Liste von Autoren/G|
| 21| Liste von Autoren/W|
| 22| Liste von Autoren/B|
| 23| Liste von Autoren/D|
| 24| Liste von Autoren/S|
+---+--------------------+
only showing top 20 rows



[36mtitles[39m: [32mDataFrame[39m = [id: bigint, title: string]
[36mres10_1[39m: [32mDataFrame[39m = [id: bigint, title: string]

Next, the edges in the Wikipedia graph will be extracted, i.e. the links between the pages.

The link format is described here: https://www.mediawiki.org/wiki/Help:Links

Generally, the link is of format <code>\[\[link#subsection|display\]\]</code>
Here, a very simple parser is used. It has a few known problem but will work in most cases. Known limitations are: The <code>&lt;nowiki></code> is ignored, i.e. also the content of explicitly not-to-interpret sections is parsed. Furthermore, links containing the <code>|</code> and <code>#</code> symbol might be problematic.

The DataFrame is converted to an RDD and the <code>flatMap</code> function is used to generate for each unique link the tuple <code>(srcId, targetText, weight)</code>; each article might give raise to any number of link entries. Here, the weight is 1 if the link was extracted from the text and 0 if the article is a redirect.

In [12]:
val pattern = new Regex("\\[\\[(?!File:)(?!Datei:)(.+?)\\]\\]")
val extractedLinks = filtered.drop("title").rdd.flatMap(row => {
    if (row(1) == null) {
        pattern.findAllMatchIn(row.getString(2))
               .map(_.group(1).split("\\|")).filterNot(_.isEmpty)
               .map(_(0).split("#")).filterNot(_.isEmpty).map(_(0))
               .map((row.getLong(0), _, 1)).toList.distinct
    }
    else {
        Seq((row.getLong(0), row.getString(1), 0))
    }
})
extractedLinks.take(10)

[36mpattern[39m: [32mRegex[39m = \[\[(?!File:)(?!Datei:)(.+?)\]\]
[36mextractedLinks[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mLong[39m, [32mString[39m, [32mInt[39m)] = MapPartitionsRDD[56] at flatMap at cmd11.sc:2
[36mres11_2[39m: [32mArray[39m[([32mLong[39m, [32mString[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m1L[39m, [32m"Pseudonym"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Regisseur"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Directors Guild of America"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Los Angeles Times"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Internet Movie Database"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Frank Patch \u2013 Deine Stunden sind gez\u00e4hlt"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Robert Totten"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Richard Widmark"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Don Siegel"[39m, [32m1[39m),
  ([32m1L[39m, [32m"Manier (Stil)"[39m, [32

This RDD already contains the information about the edges in the Wikipedia graph. However, while the source node is already represented as an numerical identifier, the target is still the name of the linked article. The latter must also converted to an identifier - and on the way non-existing articles must be filtered out ("red links" in Wikipedia).

In [20]:
// first, create again a DataFrame from the RDD
val linksWithName = spark.createDataFrame(extractedLinks)
                         .toDF("from", "to", "weight")
linksWithName.cache()
val countBeforeJoin = linksWithName.count()
println(s"Number links before: $countBeforeJoin")
// Now perform a join of the title DataFrame with the edges DataFrame
// on the titles. That way, if a link target (to) exists as an article
// it can be replaced by the corresponding identifier.
val links = linksWithName.join(titles, $"to" === $"title")
                          .drop("to").drop("title")
                          .withColumnRenamed("id", "to")
                          .select("from", "to", "weight") // reorder columns
links.cache
val countAfterJoin = links.count()
println(s"Number links after: $countAfterJoin")
links.show()

Number links before: 86619618
Number links after: 68319408
+-------+--------+------+
|   from|      to|weight|
+-------+--------+------+
|1379866|10729099|     1|
|3896518|10729099|     1|
|6773765|10729099|     1|
|8474492|10729099|     1|
|1040895| 5303821|     1|
|1379755| 5303821|     1|
|5255560| 5303821|     1|
| 871998| 2087688|     1|
|1244806| 2087688|     1|
|1379826| 2087688|     1|
| 703022|10370012|     1|
|1379866|10370012|     1|
|2052128|10370012|     1|
|5556361|10370012|     1|
| 372995|  284968|     1|
|1379840|  284968|     1|
| 422289| 7893632|     1|
|1379851| 7893632|     1|
|7893701| 7893632|     1|
|1379775| 1706420|     1|
+-------+--------+------+
only showing top 20 rows



[36mlinksWithName[39m: [32mDataFrame[39m = [from: bigint, to: string ... 1 more field]
[36mres19_1[39m: [32mDataFrame[39m = [from: bigint, to: string ... 1 more field]
[36mcountBeforeJoin[39m: [32mLong[39m = [32m86619618L[39m
[36mlinks[39m: [32mDataFrame[39m = [from: bigint, to: bigint ... 1 more field]
[36mres19_5[39m: [32mDataFrame[39m = [from: bigint, to: bigint ... 1 more field]
[36mcountAfterJoin[39m: [32mLong[39m = [32m68319408L[39m

Almost 87 million links were parsed from the wikipedia, 68 million links point to an article in namespace 0. These will serve as the edges in the Wikipedia graph. It is also stored for further usage.

In [21]:
links.write.save("links.parquet")