# Mining words from Wikipedia

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import array_contains
import unicodedata
import pycountry
import string
from operator import add
import spacy
import os
import time

spark = SparkSession \
    .builder \
    .appName("Analysing Wikipedia") \
    .getOrCreate()

In [2]:
df = spark.read.json("./nowiki-20210111-cirrussearch-content.json")

## Exploring the dataset

Looking at the schema just to explore the dataset. Found [a description of the JSON dump format on Wikipedia](https://meta.wikimedia.org/wiki/Data_dumps/Misc_dumps_format)

In [3]:
df.printSchema()

root
 |-- auxiliary_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- content_model: string (nullable = true)
 |-- coordinates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coord: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- lon: double (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- dim: long (nullable = true)
 |    |    |-- globe: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- primary: boolean (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- create_timestamp: string (nullable = true)
 |-- defaultsort: string (nullable = true)
 |-- display_title: string (nullable = true)
 |-- external_link: array (nullable = true)
 |    |-- element: strin

In [4]:
df.show()

+--------------------+--------------------+-------------+--------------------+--------------------+----------------+-------------+--------------------+--------------------+--------------+--------------+--------+---------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------+------------+------+-------------+
|      auxiliary_text|            category|content_model|         coordinates|    create_timestamp|     defaultsort|display_title|       external_link|             heading|incoming_links|         index|language|namespace|namespace_text|        opening_text|   ores_articletopic|  ores_articletopics|       outgoing_link|    popularity_score|            redirect|             score|         source_text|            template|                text|text_bytes|    

### Find columns to filter on

In [5]:
# Should one filter out null?
df.select("content_model").distinct().show()

+-------------+
|content_model|
+-------------+
|         null|
|     wikitext|
+-------------+



In [6]:
%%time
df.filter(df["content_model"].isNotNull()).count()

Wall time: 25.4 s


548219

In [7]:
%%time
df.filter(df["content_model"].isNull()).count()

Wall time: 25.2 s


548219

In [8]:
# It seems like every other row in this dataset consists of just nulls,
# so I remove those rows
df_not_null = df.filter(df["content_model"].isNotNull())

In [9]:
%%time
# After removing the nulls, there is only one language (where previously null also showed up)
df_not_null.select("language").distinct().show()

+--------+
|language|
+--------+
|      nb|
+--------+

Wall time: 26.3 s


In [10]:
%%time
# About namespaces https://en.wikipedia.org/wiki/Wikipedia:Namespace
# Articles have no namespace (no prefix), we are interested in namespace 0.
# It seems this dataset only contains the namespace we are interested in!
df_not_null.select("namespace").distinct().show()

+---------+
|namespace|
+---------+
|        0|
+---------+

Wall time: 25.9 s


## Cleaning dataset

In [11]:
# Filtering out columns
filtered_df = df_not_null.drop("content_model", "language", "category", "coordinates", "defaultsort", \
        "external_link", "heading", "incoming_links", "namespace", "namespace_text", \
        "outgoing_link", "redirect", "text_bytes", "template", "wiki", \
        "wikibase_item", "version_type", "file_bits", "file_height", "file_media_type", \
        "file_resolution", "file_size", "file_text", "file_width", "index", \
        "file_mime", "ores_articletopic", "ores_articletopics", "score", "popularity_score", \
        "display_title", "auxiliary_text", "create_timestamp", "opening_text", "source_text", \
        "timestamp", "version")

In [12]:
%%time
# Seems like only one entry per article. That's good! I feared it was one per revision.
filtered_df.filter(filtered_df["title"] == "Kill Buljo").show()

+--------------------+----------+
|                text|     title|
+--------------------+----------+
|Kill Buljo er en ...|Kill Buljo|
+--------------------+----------+

Wall time: 25.5 s


In [13]:
%%time
# Comparing the content in the text column to the content on the web page
# https://no.wikipedia.org/wiki/Kill_Buljo
filtered_df.filter(filtered_df["title"] == "Kill Buljo").select("text").collect()[0][0]

Wall time: 25 s


'Kill Buljo er en norsk film fra 2008. Den er en komisk parodi på det amerikanske actioneeventyret Kill Bill, og handlingen er lagt til Finnmark. Regi er ved Tommy Wirkola og manus Stig Frode Henriksen og Tommy Wirkola. Filmen hadde premiere i mars 2007, og ble sett av 87 000 på kino i Norge. Det ble solgt over 95 000 DVD-er. Filmmusikken ble bl.a. laget av Alta-bandet Cyaneed. Kill Buljo 2 kom ut i 2013. Jompa Tormann er rollefiguren som spilles av Stig Frode Henriksen i filmen Kill Buljo. Rollefiguren dukker også opp i flere kortfilmer på DVD-en Kill Buljo: The Beginning. Jompa Tormann: Stig Frode Henriksen Pappa Buljo: Frank Arne Olsen Tampa Buljo: Martin Hykkerud Sid Wisløff: Tommy Wirkola Unni Formen: Natasha Angel Dahle Peggy Mathilassi: Linda Øverlie Nilsen Crazy Beibifeit: Ørjan Gamst Kjell Driver: John Even Pedersen Bud Light: Christian Reiertsen Lara Kofta: Merete Nordahl Mr. Handjagi: Ørjan Gamst Kato: Jørn Tore Nilsen Blow Job: Heidi Monsen Troll Tove: Eirik Junge Eliassen 

It seems the text column contains everything in the article, so I went back and removed all columns except `text` and `title`.

The text content itself could need some cleaning. There are several instances of `(en)` which looking at the web page seems to be the language of the external reference, so at least remove `(en)` and `(no)`. At one point we see `[død lenke]` which is an inline footnote, possibly a generic one. Going to the web page we find that it was created by a bot. On Wikipedia we find a web page listing [all referance template tags that can appear in running text](https://no.wikipedia.org/wiki/Mal:Trenger_referanse).

Also found a [list over sentences called "stub"](https://no.wikipedia.org/wiki/Kategori:Stubbmaler) that can appear in the running text. Unfortunately there were 361 of them, and much more template [on the list of all tempaltes](https://no.wikipedia.org/wiki/Kategori:Maler). I tried to use the other dataset (`...general.json`) to find the text the templates produces so I could remove that, but gave up.

We see that \xa0 appears.

And lastly we want to remove punctuation marks, digits and other special characters. Basically be left with only words consisting of letters.

### Cleaning the text content

In [14]:
possible_template_reference_tags = ["[trenger referanse]", "[klargjør]", "[hvor?]", "[trenger oppdatering]", "[død lenke]", "[trenger sitat]", "[trenger bedre kilde]", "[bør utdypes]", "[hvem?]", "[omstridt – diskuter]", "[ufullstendig referanse]", "[ikke i angitt kilde]", "[tredjepartskilde trengs]", "[når?]", "[av hvem?]", "[sic]", "[trenger sidetall]"]

# First time I understand this is a generator and not just a for loop
# Weirdly enough, the ISO 639-3 alpha_2 codes doesn't contain "en"
ext_ref_language_tags = [f"({country.alpha_2.lower()})" for country in pycountry.countries] + ["(en)"]

In [15]:
all_possible_template_text = []
# It is empty because I don't know! I tried to find the templates in 
# "Mining meta information about Wikipedia.ipynb", but failed

In [16]:
# Use RDD features to map and reduce
# Transform from Row with title and text to just text strings, rigth away
rdd = filtered_df.rdd.map(lambda row: row.text)

In [17]:
def remove_strings(src_string: str, strings_to_remove: list):
    new_string= src_string
    for s in strings_to_remove:
        new_string = new_string.replace(s, "")
    return new_string

In [18]:
# Remove possible_template_reference_tags
removed_templates_rdd = rdd.map(lambda s: remove_strings(s, possible_template_reference_tags))
removed_templates_rdd.take(2)

['Hundene i Riga er en svensk film fra 1995 av Per Berglund med Rolf Lassgård, Björn Kjellman, Charlotte Sieling og Paul Butkevich i noen av rollene. Filmen basert seg på romanen Hundene i Riga av Henning Mankell som er den andre i serien om Kurt Wallander. En livbåt flyter i land ved den skånske kysten. I båten befinner det seg to menn som har blitt myrdet. Etterforsker Kurt Wallander fra politiet i Ystad tilkalles til plassen. Ved hjelp av politiet i Latvia blir begge mennene identifisert. For å lette utredningen ble en politimann tilkalt fra Latvia for å hjelpe til å løse saken. Men når politimannen vender tilbake til Latvia blir han myrdet. Kurt Wallander flyr til Latvia for å forsøke å finne ut hvorfor politimannen ble myrdet. (en) Hundene i Riga på Internet Movie Database (sv) Hundene i Riga i Svensk Filmdatabas (en) Hundene i Riga på Rotten Tomatoes Portal: Film',
 'Kill Buljo er en norsk film fra 2008. Den er en komisk parodi på det amerikanske actioneeventyret Kill Bill, og ha

In [19]:
# Remove language tags
removed_lang_tags_rdd = removed_templates_rdd.map(lambda s: remove_strings(s, ext_ref_language_tags))
removed_lang_tags_rdd.take(2)

['Hundene i Riga er en svensk film fra 1995 av Per Berglund med Rolf Lassgård, Björn Kjellman, Charlotte Sieling og Paul Butkevich i noen av rollene. Filmen basert seg på romanen Hundene i Riga av Henning Mankell som er den andre i serien om Kurt Wallander. En livbåt flyter i land ved den skånske kysten. I båten befinner det seg to menn som har blitt myrdet. Etterforsker Kurt Wallander fra politiet i Ystad tilkalles til plassen. Ved hjelp av politiet i Latvia blir begge mennene identifisert. For å lette utredningen ble en politimann tilkalt fra Latvia for å hjelpe til å løse saken. Men når politimannen vender tilbake til Latvia blir han myrdet. Kurt Wallander flyr til Latvia for å forsøke å finne ut hvorfor politimannen ble myrdet.  Hundene i Riga på Internet Movie Database  Hundene i Riga i Svensk Filmdatabas  Hundene i Riga på Rotten Tomatoes Portal: Film',
 'Kill Buljo er en norsk film fra 2008. Den er en komisk parodi på det amerikanske actioneeventyret Kill Bill, og handlingen er 

#### Lemmatisation

I purposefully didn't [stem](https://en.wikipedia.org/wiki/Stemming) the words, since that could lead to non-existing words. E.g. `opparbeide` -> the stem `opparbeid`, `adresse` -> `adress`. The first time I got to the bottom of this notebook I found that the result contained different [inflection](https://en.wikipedia.org/wiki/Inflection) of the word "artikkel", "artikkelene", "artikler". Although I didn't want to stem the words, I still wanted to avoid having different inflections of the same root word.

After some Googling I learned I was looking for the [lemma](https://en.wikipedia.org/wiki/Lemma_(morphology)), and finding it is called lemmatisation.

I just decided to go with the package spaCy since had a simple API and does [POS tagging](https://en.wikipedia.org/wiki/Part-of-speech_tagging) automatically.

In [20]:
# Uncomment and run the following line once. The leading exclamation mark means it is a terminal command
# !python -m spacy download nb_core_news_lg

I got problems with memory tryng to call the above function with PySpark (using map). You can read about some problems [in this blog post](https://haridas.in/run-spacy-jobs-on-apache-spark.html). After some trying and failing I decided to simply run spaCy with plain Python. 

Since I don't have enough RAM to load all data and don't think it is possibly to lazily iterate over an RDD, I must dump data to storage at this point. Note, if you get memory error while running `saveAsTextFile` method on the RDD on Windows, [see this Stack Overflow answer](https://stackoverflow.com/questions/40764807/null-entry-in-command-string-exception-in-saveastextfile-on-pyspark/40958969). 

In [21]:
# Uncomment to dump
# %%time
# removed_lang_tags_rdd.saveAsTextFile("temp1")

Py4JJavaError: An error occurred while calling o101.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/E:/workspace3/online-alias/words/temp1 already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:289)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3167/000000000000000000.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3165/000000000000000000.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3164/000000000000000000.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3163/000000000000000000.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1552)
	at org.apache.spark.rdd.RDD$$Lambda$3161/000000000000000000.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1552)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1538)
	at org.apache.spark.rdd.RDD$$Lambda$3160/000000000000000000.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1538)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:549)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:836)


##### Run lemmatisation in batches and write to file

If you already have run the cell above once, you can start from this cell later!

> Remember to run the first cell with the imports, though

In [2]:
import nb_core_news_lg
nlp = nb_core_news_lg.load()

def lemmatize_documents(src_documents: list):
    docs = nlp.pipe(texts=src_documents)
    new_documents = []
    for doc in docs:
        new_string = ""
        for token in doc:
            if (token.lemma_ != "-PRON-"):
                new_string += f" {token.lemma_}"
            else:
                # spacy returns the lemma "-PRON-" for pronouns, and we don't want that,
                # in that case we return the original word istead
                new_string += f" {token}"
        new_documents.append(new_string)
    return new_documents

In [None]:
%%time
with open("./out/lemmatised_wikipedia.txt", "a", encoding="utf-8") as f_out:
    temp_folder = "./temp1"
    directory = os.fsencode(temp_folder)
    
    for file in os.listdir(directory):
        starttime = time.time()
        filename = os.fsdecode(file)
        if (filename.endswith(".crc") or filename == "_SUCCESS"):
            continue
        f_in = open(f"{temp_folder}/{filename}", encoding="utf-8")
        print(f"START processing {filename}")
        # Read file in parts to avoid yet another memory problem:
        # MemoryError: Unable to allocate 2.08 GiB for an array with shape (546345, 1024) and data type float32
        # If there are too many lines in combinations with large articles, I don't have enough RAM (8GB)
        docs = []
        num_lines = 0
        print("0 lines processed", end='\r')
        batch_size = 1000
        if ("part-00032" in filename):
            batch_size = 250
        for line in f_in:
            docs.append(line)
            if (len(docs) == batch_size):
                try:
                    lemmatised_docs = lemmatize_documents(docs)
                    f_out.writelines(lemmatised_docs)
                    num_lines += batch_size
                    docs = []
                except MemoryError:
                    print(f"Error happened between lines {num_lines} and {num_lines + batch_size}")
                    raise
                print(f"{num_lines} lines processed", end='\r')
        if (len(docs) > 0):
            lemmatised_docs = lemmatize_documents(docs)
            f_out.writelines(lemmatised_docs)
            print(f"{num_lines} lines processed", end='\r')
            num_lines += len(docs)
        f_in.close()
        print(f"END processing {filename}. It had {num_lines} lines. It took {round(time.time() - starttime)} seconds")

START processing part-00000
END processing part-00000. It had 13109 lines. It took 668 seconds
START processing part-00001
END processing part-00001. It had 12927 lines. It took 582 seconds
START processing part-00002
END processing part-00002. It had 13149 lines. It took 649 seconds
START processing part-00003
END processing part-00003. It had 13598 lines. It took 554 seconds
START processing part-00004
END processing part-00004. It had 13428 lines. It took 554 seconds
START processing part-00005
END processing part-00005. It had 13619 lines. It took 571 seconds
START processing part-00006
END processing part-00006. It had 13054 lines. It took 602 seconds
START processing part-00007
END processing part-00007. It had 13396 lines. It took 581 seconds
START processing part-00008
END processing part-00008. It had 13326 lines. It took 594 seconds
START processing part-00009
END processing part-00009. It had 13163 lines. It took 586 seconds
START processing part-00010
END processing part-00

In [24]:
def remove_non_letter_words_and_chars(src_string: str):
    whitelist = set('abcdefghijklmnopqrstuvwxyzæøå ABCDEFGHIJKLMNOPQRSTUVWXYZÆØÅ')
    new_string = ""
    for char in src_string:
        if char in whitelist:
            new_string = new_string + char
        else:
            new_string = new_string + " "
    return new_string

In [24]:
only_letters_rdd = lemmatized_rdd.map(lambda s: remove_non_letter_words_and_chars(s))
only_letters_rdd.take(2)

NameError: name 'lemmatized_rdd' is not defined

### Pre-processing words

In [41]:
def convert_to_lowercase_if_not_acronym(src_string: str):
    for letter in src_string:
        if letter.islower():
            return src_string.lower()
    return src_string

In [42]:
%%time
# Convert string to list of lowercase words larger with two or more characters
words_rdd = only_letters_rdd \
    .flatMap(lambda s: s.split(" ")) \
    .filter(lambda word: len(word) > 1) \
    .map(lambda word: convert_to_lowercase_if_not_acronym(word))
words_rdd.take(20)

Wall time: 4.42 s


['hund',
 'riga',
 'er',
 'en',
 'svensk',
 'film',
 'fra',
 'av',
 'per',
 'berglund',
 'med',
 'rolf',
 'lassgård',
 'bj',
 'rn',
 'kjellman',
 'charlotte',
 'sieling',
 'og',
 'paul']

In [43]:
# List of stopwords taken from http://snowball.tartarus.org/algorithms/norwegian/stop.txt
stopwords = []
with open("./norwegian_stopwords.txt", "r", encoding="utf-8") as f:
    for line in f:
        strings = line.split("|")
        potential_stopword = strings[0].strip()
        if (not potential_stopword == ""):
            stopwords.append(potential_stopword)

# Remove English stopwords as well since I noticed "the", "in", and "of" in the top 20
# Inspired by the Google History stopword list found at https://www.ranks.nl/stopwords
english_stopwords = ["I", "a", "an", "are", "as", "at", "by", "com", "for", "from", "how", "in", "it", "of", "on", "that", "the", "this", "was", "what", "when", "where", "who", "will", "with", "www"]
stopwords = stopwords + english_stopwords
stopwords[:10]

['og', 'i', 'jeg', 'det', 'at', 'en', 'et', 'den', 'til', 'er']

In [44]:
def remove_word_in_list(src_word: str, list_of_words: list):
    if (src_word in list_of_words):
        return ""
    else:
        return src_word

In [46]:
%%time
# Remove all stop-words
no_stopwords_rdd = words_rdd \
    .map(lambda word: remove_word_in_list(word, stopwords)) \
    .filter(lambda word: word != "")
no_stopwords_rdd.take(20)

Wall time: 4.43 s


['hund',
 'riga',
 'svensk',
 'film',
 'per',
 'berglund',
 'rolf',
 'lassgård',
 'bj',
 'rn',
 'kjellman',
 'charlotte',
 'sieling',
 'paul',
 'butkevich',
 'rolle',
 'film',
 'basere',
 'roman',
 'hund']

## Mining!

In [47]:
%%time
# Finding the most popular words

ranked_words_rdd = no_stopwords_rdd.map(lambda word: (word, 1)) \
    .reduceByKey(add) \
    .sortBy(lambda x: x[1], ascending=False)

endTime = time.time()
ranked_words_rdd.take(20)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 57.0 failed 1 times, most recent failure: Lost task 3.0 in stage 57.0 (TID 982, Stian-PC.mshome.net, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 595, in process
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 425, in func
    return f(iterator)
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 1946, in combineLocally
    merger.mergeValues(iterator)
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-38-783a6dc03d6f>", line 1, in <lambda>
  File "<ipython-input-36-61959dd0e36b>", line 4, in lemmatize_string
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\nb_core_news_lg\__init__.py", line 12, in load
    return load_model_from_init_py(__file__, **overrides)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 239, in load_model_from_init_py
    return load_model_from_path(data_path, meta, **overrides)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 222, in load_model_from_path
    return nlp.from_disk(model_path, exclude=disable)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\language.py", line 974, in from_disk
    util.from_disk(path, deserializers, exclude)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 690, in from_disk
    reader(path / key)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\language.py", line 950, in deserialize_vocab
    self.vocab.from_disk(path)
  File "vocab.pyx", line 475, in spacy.vocab.Vocab.from_disk
  File "vectors.pyx", line 432, in spacy.vectors.Vectors.from_disk
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 690, in from_disk
    reader(path / key)
  File "vectors.pyx", line 425, in spacy.vectors.Vectors.from_disk.load_vectors
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\numpy\lib\npyio.py", line 439, in load
    return format.read_array(fid, allow_pickle=allow_pickle,
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\numpy\lib\format.py", line 741, in read_array
    array = numpy.fromfile(fp, dtype=dtype, count=count)
numpy.core._exceptions._ArrayMemoryError: Unable to allocate 572. MiB for an array with shape (150000000,) and data type float32

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2119/000000000000000000.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:836)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$3173/000000000000000000.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$3171/000000000000000000.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDD$$Lambda$1884/000000000000000000.apply(Unknown Source)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:836)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 595, in process
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 425, in func
    return f(iterator)
  File "e:\workspace3\online-alias\words\.venv\lib\site-packages\pyspark\rdd.py", line 1946, in combineLocally
    merger.mergeValues(iterator)
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "E:\workspace3\online-alias\words\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-38-783a6dc03d6f>", line 1, in <lambda>
  File "<ipython-input-36-61959dd0e36b>", line 4, in lemmatize_string
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\nb_core_news_lg\__init__.py", line 12, in load
    return load_model_from_init_py(__file__, **overrides)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 239, in load_model_from_init_py
    return load_model_from_path(data_path, meta, **overrides)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 222, in load_model_from_path
    return nlp.from_disk(model_path, exclude=disable)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\language.py", line 974, in from_disk
    util.from_disk(path, deserializers, exclude)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 690, in from_disk
    reader(path / key)
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\language.py", line 950, in deserialize_vocab
    self.vocab.from_disk(path)
  File "vocab.pyx", line 475, in spacy.vocab.Vocab.from_disk
  File "vectors.pyx", line 432, in spacy.vectors.Vectors.from_disk
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\spacy\util.py", line 690, in from_disk
    reader(path / key)
  File "vectors.pyx", line 425, in spacy.vectors.Vectors.from_disk.load_vectors
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\numpy\lib\npyio.py", line 439, in load
    return format.read_array(fid, allow_pickle=allow_pickle,
  File "E:\workspace3\online-alias\words\.venv\lib\site-packages\numpy\lib\format.py", line 741, in read_array
    array = numpy.fromfile(fp, dtype=dtype, count=count)
numpy.core._exceptions._ArrayMemoryError: Unable to allocate 572. MiB for an array with shape (150000000,) and data type float32

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2119/000000000000000000.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [28]:
# final_words_list = ranked_words_rdd.map(lambda t: t[0]).take(1000)
# with open("./out/wikipedia_words.txt", "w", encoding="utf-8") as f:
#     for word in final_words_list:
#         f.write(f"{word}\n")