In [1]:
# Imports

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, MapType
import pandas as pd
import re
import time
import pandas as pd
import pyspark.sql.functions as sqlf
from pyspark.sql.functions import udf, explode, col, when
from pyspark.ml.feature import RegexTokenizer

In [2]:
# Spark Init

start_time = time.time()
spark = SparkSession.builder.master("spark://sparkmaster:7077").appName("extract_wiki_dict").getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/05 23:59:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# extract text tags from xml
initial_df = spark.read.format('xml').options(rowTag='page').load('./full_wiki.xml')
df = initial_df.selectExpr("revision.text._VALUE as text")

                                                                                

In [4]:
# df = df.filter(col("text").cast("int").isNull())

In [9]:
# clean text

@udf(returnType=StringType())
def clear_text(text):
    string = str(text)
    return re.sub(r'(<ref.+?/(ref)?>)|(<!--.+?-->)|(\s?(\(([^()])*\)))', '', string)

# df = df.select("text")
df = df.select("text", clear_text("text").alias("clean_text"))

In [16]:
# extract links to dict

return_type = ArrayType(MapType(StringType(), StringType()))

def is_dict_pair_valid(dict_pair):
    # If there is not change, skip
    if dict_pair['base'] == dict_pair['form']:
        return False
    # If the starting letter differs, the pair is not valid
    if not dict_pair['base'][0] == dict_pair['form'][0]:
        return False
    # If the number of words differ, the pair is not valid
    if not len(dict_pair['base'].split(' ')) == len(dict_pair['form'].split(' ')):
        return False
    return True

@udf(returnType=return_type)
def extract_links(text):
    dict_results = []
    results = re.findall(r'\[\[[A-Za-z0-9.]+?\|.+?]]', text)
    
    # Variant without postfix
    if results:
        for result in results:
            dict_result = {
                'base': re.findall(r'\[\[(.+?)\|', result)[0],
                'form': re.findall(r'\|(.+?)]]', result)[0],
                'postfix': ''
            }
            if is_dict_pair_valid(dict_result):
                dict_results.append(dict_result)
    
    # Variant with postfix
    if not results:
        results = re.findall(r'\[\[[A-Za-z0-9.]+?]][a-z]+?\s', text)
        if results:
            for result in results:
                dict_result = {
                    'base': re.findall(r'\[\[(.+?)]]', result)[0],
                    'postfix': re.findall(r'\[\[.+?]](.*)\s', result)[0]
                }
                dict_result['form'] = dict_result['base'] + dict_result['postfix']
                dict_results.append(dict_result)
                
    return dict_results

edf = df.select("clean_text", extract_links("clean_text").alias("links")).select("links")
edf = edf.select("links", explode("links").alias("link")).select("link")
exprs = [col("link").getItem(k).alias(k) for k in ["base", "form", "postfix"]]
edf = edf.select(*exprs)

In [17]:
edf = edf.withColumn('postfix', when(col('postfix') == '', None).otherwise(col('postfix')))

In [23]:
nedf = edf.dropDuplicates()

In [24]:
nedf.where(col("postfix").isNotNull()).show(300)
# edf.show(500)

[Stage 21:>                                                         (0 + 1) / 1]

+-------------+----------------+-------+
|         base|            form|postfix|
+-------------+----------------+-------+
|       prorok|        prorokov|     ov|
|     internet|       internetu|      u|
|    Heidegger|   Heideggerovmu|   ovmu|
|mikroprocesor| mikroprocesorov|     ov|
|         Bonn|           Bonne|      e|
|         punk|        punkovej|   ovej|
|       fanzin|         fanzinu|      u|
|         atol|          atolov|     ov|
|        autor|          autori|      i|
|mikroprocesor|  mikroprocesory|      y|
| Michelangelo|  Michelangelova|     va|
|     Japonsko|       Japonskom|      m|
|        otrok|         otrokov|     ov|
|       vztlak|         vztlaku|      u|
|    interpret|     interpretom|     om|
|     Barbados|      Barbadosom|     om|
|    Heidegger|   Heideggerovho|   ovho|
|   Afganistan|     Afganistane|      e|
|        Egypt|          Egypta|      a|
|         film|           filmu|      u|
|     Gibibajt|      Gibibajtov|     ov|
|        slang| 

                                                                                

In [81]:
# tokenize sentences

@udf(returnType=ArrayType(StringType()))
def tokenize_sentences(text):
    sentences = []
    sentence_tuples =  re.findall(r'((\s|^)\'*[A-Z].+?[.!?])(?=\s+\S*[A-Z]|$)', text)
    for tuple in sentence_tuples:
        sentences.append(tuple[0])
    return sentences

sdf = df.select("clean_text", tokenize_sentences("clean_text").alias("sentences"))