In [1]:
import re
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("data03") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/16 17:08:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Topic preprocessing

This notebook contains the code to clear some useless topics and to merge some topics that are too similar.

read the text file about all the topics

In [2]:
# read text folder
df = spark.read.text("topics.txt")

print("Number of unique topics: ", df.select("value").count())
df.show(truncate=False, n=5)

Number of unique topics:  1457234
+-------------------------+
|value                    |
+-------------------------+
|P2 Ípsilon Ímpar Fugas P3|
|laboratório              |
|advogado                 |
|descrever                |
|Soup Nazi                |
+-------------------------+
only showing top 5 rows



split multiple topics into a list of topics (if they have more then 1 space, ex.: "Saúde $\ $ Covid-19)

In [3]:
df = df.withColumn(
    "tokens",
    F.split(F.col("value"), " {2,}")
)

df.show(truncate=False, n=5)

+-------------------------+---------------------------+
|value                    |tokens                     |
+-------------------------+---------------------------+
|P2 Ípsilon Ímpar Fugas P3|[P2 Ípsilon Ímpar Fugas P3]|
|laboratório              |[laboratório]              |
|advogado                 |[advogado]                 |
|descrever                |[descrever]                |
|Soup Nazi                |[Soup Nazi]                |
+-------------------------+---------------------------+
only showing top 5 rows



get a lemma of existent topics 

- by checking the spelling of the word in Natura Dictionary (https://natura.di.uminho.pt/download/sources/Dictionaries/wordlists/)

- by using the most common form of the word (ex.: "Galp", "galp" -> goes to most common form between them)

In [4]:
# read the dictionary
dicitionary = spark.read.text("wordlist_utf8.txt")

# create map of lowercase words to their original form
def get_word_map(df):
    word_map = {}
    for row in df.collect():
        word = row.value.strip()
        if len(word) > 0:
            word_map[word.lower()] = word
    return word_map
word_map = get_word_map(dicitionary)
word_map_broadcast = spark.sparkContext.broadcast(word_map)

CodeCache: size=131072Kb used=18680Kb max_used=18687Kb free=112391Kb
 bounds [0x000000010a98c000, 0x000000010bbec000, 0x000000011298c000]
 total_blobs=7551 nmethods=6627 adapters=837
 compilation: disabled (not enough contiguous free space left)




In [5]:
# get all tokens
tokens_df = df.select(F.explode(F.col("tokens")).alias("token"))

# create map for most common form
window_spec = Window.partitionBy("std_token").orderBy(F.desc("count"))
tokens_df = tokens_df \
            .withColumn("std_token", F.lower(F.col("token"))) \
            .groupBy("std_token", "token") \
            .agg(F.count("*").alias("count")) \
            .withColumn("rank", F.row_number().over(window_spec)) \
            .filter(F.col("rank") == 1) \
            .orderBy("std_token") \
            .select("std_token", "token", "count")
std_to_token_dict = {
    row['std_token']: row['token']
    for row in tokens_df.collect()
}

                                                                                

In [6]:
# apply the maps to the tokens
def map_token(word):
    word = word.lower()
    if word in word_map_broadcast.value:
        return word_map_broadcast.value[word]
    elif word in std_to_token_dict:
        return std_to_token_dict[word]
    else:
        print(f"Token not found in dictionary: {word}")
        return None

df = df.rdd.flatMap(
    lambda row: [[row[0], [map_token(token) for token in row[1]]]]) \
    .toDF(["topic", "token"])
df.show(truncate=False, n=5)

                                                                                

+-------------------------+---------------------------+
|topic                    |token                      |
+-------------------------+---------------------------+
|P2 Ípsilon Ímpar Fugas P3|[P2 Ípsilon Ímpar Fugas P3]|
|laboratório              |[laboratório]              |
|advogado                 |[advogado]                 |
|descrever                |[descrever]                |
|Soup Nazi                |[Soup Nazi]                |
+-------------------------+---------------------------+
only showing top 5 rows



                                                                                

get all possible combinations of topics within a topic (ex.: Banco de Portugal -> Banco, Portugal, Banco de Portugal)

- avoid splitting names (Leonor Pereira -> Leonor, Pereira, Leonor Pereira)[*BAD*]

- only split if any of the words is in the Natura Dictionary but not as a name ("Banco de Portugal" -> banco, Portugal, Banco de Portugal)[*OK*]

In [7]:
def is_first_letter_upper(s):
    return s[0].isupper() if s else False
    
def squeeze_tokens(tokens):
    new_tokens = []
    for token in tokens:

        # already a single token
        ind_token = token.split(" ")
        if len(ind_token) == 1:
            new_tokens.append(ind_token[0])
            continue

        # verify if the token is ok for split
        valid_split = False
        invalid_word = {"uns", "por", "dos", "das"}
        for t in ind_token:
            t = t.lower()
            if len(t) >= 3 and t not in invalid_word and t in word_map_broadcast.value:
                valid_split = not is_first_letter_upper(word_map_broadcast.value[t])
                if valid_split:
                    break
        
        # if valid split, split it
        if valid_split:
            for t in ind_token:
                t_lower = t.lower()
                if t_lower in word_map_broadcast.value:
                    t = word_map_broadcast.value[t_lower]
                elif t_lower in std_to_token_dict:
                    t = std_to_token_dict[t_lower]
                else:
                    t = t
                new_tokens.append(t)

        new_tokens.append(token)

    return new_tokens

df = df.rdd.flatMap(
    lambda row: [[row[0], squeeze_tokens(row[1])]]) \
    .toDF(["topic", "token"])
                
df.show(truncate=False, n=5)

25/04/16 17:08:43 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 21 (TID 58): Attempting to kill Python Worker
[Stage 22:>                                                         (0 + 1) / 1]

+-------------------------+----------------------------------------------------------+
|topic                    |token                                                     |
+-------------------------+----------------------------------------------------------+
|P2 Ípsilon Ímpar Fugas P3|[P2, ípsilon, ímpar, fugas, P3, P2 Ípsilon Ímpar Fugas P3]|
|laboratório              |[laboratório]                                             |
|advogado                 |[advogado]                                                |
|descrever                |[descrever]                                               |
|Soup Nazi                |[soup, nazi, Soup Nazi]                                   |
+-------------------------+----------------------------------------------------------+
only showing top 5 rows



                                                                                

check for bad encoded characters and remove them

In [8]:
# define valid Portuguese characters
custom_valid = [
    "ã", "á", "à", "â", "Á", "Ã",
    "é", "ê", "É", "Ê",
    "í", "Í",
    "ó", "ô", "õ", "Ó", "Õ",
    "ú", "Ú",
    "ç", "-", "º", "ª"
]

# define a mapping for common encoding issues
fix_map = {
    "ő": "õ", "Ő": "Õ", "ť": "ç", "ťo": "ção",
    "Ã£": "ã", "Ã¡": "á", "Ãª": "ê", "Ã³": "ó",
    "Ã­": "í", "Ã©": "é", "Ã§": "ç", "Ã‰": "É",
    "Ãµ": "õ", "Ãº": "ú", "Ã‰": "É", "ů": "ó",
    "ă": "ã", "ę": "ê", "¾": "ó",
}

def valid_word_detection(text):
    return not re.search(rf"[^a-zA-Z0-9 {''.join(re.escape(c) for c in custom_valid)}]", text or "")

def fix_encoding_issues(text):
    if not isinstance(text, str):
        return text
    for enc in ["latin1", "cp1252"]:
        try:
            return text.encode(enc).decode("utf-8")
        except Exception:
            continue
    return text

def manual_fix(text):
    for wrong, right in fix_map.items():
        text = text.replace(wrong, right)
    return text

def fixed_words(word):
    if valid_word_detection(word):
        return word

    else:
        fixed_word = fix_encoding_issues(word)
        fixed_word = manual_fix(fixed_word)

        if valid_word_detection(fixed_word):
            return fixed_word
    
    return ""

df = df.rdd.flatMap(
    lambda row: [(row[0], [fixed_words(token) for token in row[1] if fixed_words(token)])]) \
    .toDF(["topic", "token"])

df.show(truncate=False, n=5)

[Stage 24:>                                                         (0 + 1) / 1]

+-------------------------+----------------------------------------------------------+
|topic                    |token                                                     |
+-------------------------+----------------------------------------------------------+
|P2 Ípsilon Ímpar Fugas P3|[P2, ípsilon, ímpar, fugas, P3, P2 Ípsilon Ímpar Fugas P3]|
|laboratório              |[laboratório]                                             |
|advogado                 |[advogado]                                                |
|descrever                |[descrever]                                               |
|Soup Nazi                |[soup, nazi, Soup Nazi]                                   |
+-------------------------+----------------------------------------------------------+
only showing top 5 rows



                                                                                

remove duplicated tokens and invalid tokens such as "2018", "Abr 2020", "", "de", ...

In [9]:
def hours_detection(token):
    return bool(re.search(r"\b(?:\d{1,2}:\d{2}|\d{1,2}h\d{2})\b", token))

def date_detection(token):
    months = bool(re.search(r"\b(?:Jan|Fev|Abr|Mai|Jun|Jul|Ago|Set|Out|Nov|Dez)\b", token))
    march = bool(re.search(r"\b(?:\d{1,2} Mar|Mar \d{2,4})\b", token))
    months2 = bool(re.search(r"^(janeiro|fevereiro|março|abril|maio|junho|julho|agosto|setembro|outubro|novembro|dezembro)\b", token, flags=re.IGNORECASE))
    dateD = bool(re.search(r"\b(Janeiro|Fevereiro|Março|Abril|Maio|Junho|Julho|Agosto|Setembro|Outubro|Novembro|Dezembro)[ ]?\d{2,4}\b", token, flags=re.IGNORECASE))
    dateY = bool(re.search(r"\b\d{1,2}(?:\s+de)?\s+(janeiro|fevereiro|março|abril|maio|junho|julho|agosto|setembro|outubro|novembro|dezembro)\b", token, flags=re.IGNORECASE))
    days = bool(re.search(r"\b(segunda-feira|terça-feira|quarta-feira|quinta-feira|sexta-feira|sábado|domingo)\b", token, flags=re.IGNORECASE))
    hours = bool(re.search(r"\b\d{1,2}:\d{2}\b", token)) or bool(re.search(r"\b\d{1,2}h\d{2}\b", token))
    minutes = bool(re.search(r"\b\d{1,2}m\d{2}\b", token))
    return months or march or months2 or dateD or dateY or days or hours or minutes

def invalid_tokens_detection(token):
    if token.lower() in {"de", "da", "do", "dos", "das", "e", "ou", "a", "o", "as", "os",
                         "para", "com", "em", "na", "no", "por", "pelo", "pelos", "uma",
                         "pelo", "pelas", "com", "sem", "sobre", "entre", "até", "um",
                         "antes", "depois", "durante", "após", "segundo", "junto"}:
        return True
    if len(token) < 2:
        return True
    if token.isdigit():
        return True
    if token == "":
        return True
    if hours_detection(token):
        return True
    if date_detection(token):
        return True

    return False


df = df.rdd.flatMap(
    lambda row: [(row[0], [token for token in row[1] if not invalid_tokens_detection(token)])])

df = df.flatMap(
    lambda row: [(row[0], list(set(row[1])))]) \
    .toDF(["topic", "token"])

df.show(truncate=False, n=5)

25/04/16 17:08:52 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 25 (TID 62): Attempting to kill Python Worker
25/04/16 17:08:52 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 25 (TID 62): Attempting to kill Python Worker
[Stage 26:>                                                         (0 + 1) / 1]

+-------------------------+----------------------------------------------------------+
|topic                    |token                                                     |
+-------------------------+----------------------------------------------------------+
|P2 Ípsilon Ímpar Fugas P3|[P2, ípsilon, P3, ímpar, fugas, P2 Ípsilon Ímpar Fugas P3]|
|laboratório              |[laboratório]                                             |
|advogado                 |[advogado]                                                |
|descrever                |[descrever]                                               |
|Soup Nazi                |[Soup Nazi, nazi, soup]                                   |
+-------------------------+----------------------------------------------------------+
only showing top 5 rows



25/04/16 17:08:56 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 26 (TID 63): Attempting to kill Python Worker
                                                                                

remove stop words that *introduce* the topic, such as "O Banco de Portugal" -> "Banco de Portugal"

In [10]:
def remove_introductory_stopword(token):
    lower_token = token.lower()

    if lower_token.startswith(("a ", "o ")):
        return token[2:]
    if lower_token.startswith(("as ", "os ", "um ", "de ", "da ", "do ", "em ")):
        return token[3:]
    if lower_token.startswith(("uma ", "uns ")):
        return token[4:]
    if lower_token.startswith("umas "):
        return token[5:]
    
    return token

df = df.rdd.flatMap(
    lambda row: [(row[0], [remove_introductory_stopword(token) for token in row[1]])]) \
    .toDF(["topic", "token"])

df.show(truncate=False, n=5)

25/04/16 17:09:00 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 27 (TID 64): Attempting to kill Python Worker
[Stage 28:>                                                         (0 + 1) / 1]

+-------------------------+----------------------------------------------------------+
|topic                    |token                                                     |
+-------------------------+----------------------------------------------------------+
|P2 Ípsilon Ímpar Fugas P3|[P2, ípsilon, P3, ímpar, fugas, P2 Ípsilon Ímpar Fugas P3]|
|laboratório              |[laboratório]                                             |
|advogado                 |[advogado]                                                |
|descrever                |[descrever]                                               |
|Soup Nazi                |[Soup Nazi, nazi, soup]                                   |
+-------------------------+----------------------------------------------------------+
only showing top 5 rows



25/04/16 17:09:05 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 28 (TID 65): Attempting to kill Python Worker
25/04/16 17:09:05 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 28 (TID 65): Attempting to kill Python Worker
                                                                                

save the topics mapping to a file

In [11]:
df.write.mode("overwrite").json("topics.json")

                                                                                