In [None]:
import pyspark as ps
import numpy as np
import pandas as pd
import time

from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec, Word2VecModel

from functools import reduce
from itertools import islice


from bz2 import BZ2File
import xml.etree.ElementTree as etree

from __future__ import print_function

## Download ukrianian wikipedia dump (Datalab)

In [None]:
!wget https://dumps.wikimedia.org/ukwiki/latest/ukwiki-latest-pages-articles.xml.bz2

In [None]:
!bzip2 -dk ukwiki-20190701-pages-articles-multistream.xml.bz2

In [None]:
def strip_tag_name(t):
    idx = k = t.rfind("}")
    if idx != -1:
        return t[idx + 1:]
    else:
        return t
    
def read_wiki_dump(bz2_dump_path):
    with BZ2File(bz2_dump_path) as xml_file:
        for event, elem in etree.iterparse(xml_file, events=("start", "end")):
            tname = strip_tag_name(elem.tag)
            if event == "start":
                if tname == "page":
                    title = ""
                    redirect = ""
                    ns = 0
                    text = ""
            else:
                if tname == "title":
                    title = elem.text
                elif tname == "redirect":
                    redirect = elem.attrib["title"]
                elif tname == "ns":
                    ns = int(elem.text)
                elif tname == "text":
                    text = elem.text
                elif tname == "page":
                    yield title, redirect, ns, text
                elem.clear()

## Split large `.xml` file into small `.csv` chunks retaining only pages content.

In [None]:
SLICE_SIZE = 2000000

def is_article(title, redirect, ns, text):
    return ns == 0 and len(redirect) == 0


data = pd.DataFrame(columns=['Title', 'Text'])

batch_size = 10000
batch_id = 0
count = 0

for title, redirect, ns, text in islice(
        filter(
            lambda it: is_article(*it), 
            read_wiki_dump("ukwiki-20190701-pages-articles.xml.bz2")
        ), SLICE_SIZE):
    data.loc[count, 'Title'] = title
    data.loc[count, 'Text'] = text.replace(',', ' ').replace('\n', ' ').replace('\t', ' ').replace('\r', ' ')
    count += 1
    
    if count % batch_size == 0:
        data.to_csv("ukwiki_" + str(batch_id) + ".csv")
        data = pd.DataFrame(columns=['Title', 'Text'])
        count = 0
        batch_id += 1

In [None]:
!gsutil cp ukwiki_*.csv gs://mmds/

## Use word count to determine wikipedia service words

In [None]:
# words = spark.read.csv("preproc/preproc_0.csv", header=True).select('vector_no_stopwords')\
#     .rdd.flatMap(lambda line: line[0].strip().split(" ") if line[0] else str(line[0]))

# wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a + b).sortBy(lambda x: -x[1])

# wordCounts.take(1000)

## Copy data from cloud storage to cluster hdfs

In [None]:
!hadoop distcp gs:///ukwiki_*.csv hdfs://

19/07/20 12:10:58 INFO tools.OptionsParser: parseChunkSize: blocksperchunk false
19/07/20 12:10:59 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, overwrite=false, append=false, useDiff=false, useRdiff=false, fromSnapshot=null, toSnapshot=null, skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, mapBandwidth=100, sslConfigurationFile='null', copyStrategy='uniformsize', preserveStatus=[], preserveRawXattrs=false, atomicWorkPath=null, logPath=null, sourceFileListing=null, sourcePaths=[gs:/ukwiki_*.csv], targetPath=hdfs:/, targetPathExists=true, filtersFile='null', blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}
19/07/20 12:11:00 INFO client.RMProxy: Connecting to ResourceManager at cluster-43cf-m/10.128.0.20:8032
19/07/20 12:11:00 INFO client.AHSProxy: Connecting to Application History server at cluster-43cf-m/10.128.0.20:10200
19/07/20 12:11:35 INFO tools.SimpleCopyListing: Paths (fil

# Init spark and storage

In [None]:
spark = SparkSession.builder.master("local").appName("WikiParse").getOrCreate()

# Download stop words and service words

In [None]:
sw_df = spark.read.csv("gs:///stop_words.csv").toPandas()
sr_df = spark.read.csv("gs:///service_words.csv").toPandas()

In [None]:
stop_words = sw_df['_c0'].tolist()
service_words = sr_df['_c0'].tolist()

# Read all of the training data

In [None]:
def read_batch(offset, limit):
    fractions = []
    for i in range(offset, limit):
        print("Downloading fraction number {}...".format(i))
        df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("encoding", "UTF-8") \
                        .load("hdfs:///ukwiki_" + str(i) + ".csv")
        fractions.append(df)
    return fractions

# Create training pipeline for word2vec model 

In [None]:
class Pipeline:
    def __init__(self, df, stop_words, service_words):
        self.stop_words = stop_words
        self.service_words = service_words
        self.df = df
        self.vector_df = None
    
    def fit(self, sample=0):
        word2vec = Word2Vec(vectorSize=100, seed=42, inputCol='text', outputCol='model')
        
        if sample == 0:
            return word2Vec.fit(self.vector_df)
        
        part = self.vector_df.take(sample)
        
        model = word2vec.fit(spark.createDataFrame(part, schema=self.vector_df.schema))
        return model
    
    def preprocess(self):
        # clean data
        df_trip = self.df.select(['Title', 'Text'])\
            .withColumn('Text', regexp_replace('Text', '[§»«·&\~.a-zA-Z^=\-\"<>!?:;{}()\[\]/|%0-9\\\+\*#_]+', ' '))\
            .withColumn('Text', regexp_replace('Text', '\'{3}', ' '))\
            .withColumn('Text', regexp_replace('Text', '[—−]', ' '))\
            .withColumn('Text', regexp_replace('Text', '[^а-яА-ЯіІіІєЄҐґїЇ\s]', ''))\
            .withColumn('Text', regexp_replace('Text', '\s+', ' '))\
            .select([trim(lower(col('Title'))).alias('Title'), trim(lower(col('Text'))).alias('Text')])
        
        # tokenize data
        tokenizer = Tokenizer(inputCol="Text", outputCol="Vector")
        self.vector_df = tokenizer.transform(df_trip).select("vector")
        self.vector_df.show(5)
        
        # remove stop words
        self.vector_df = self.__remove_stop_words("vector", "vector_no_stopwords", self.stop_words)
        
        # remove service words
        self.vector_df = self.__remove_stop_words("vector_no_stopwords", "text", self.service_words)
        
    def __remove_stop_words(self, in_col, out_col, words_list):
        remover = StopWordsRemover(inputCol=in_col, outputCol=out_col, stopWords=words_list)
        stopwords = remover.getStopWords() 
        
        vector_no_stopw_df = remover.transform(self.vector_df).select(out_col)
        vector_no_stopw_df.show(5)
        
        return vector_no_stopw_df
    
    @staticmethod
    def from_batch(batch, stop_words, service_words):
        return Pipeline(batch, stop_words, service_words)

In [None]:
p = Pipeline.from_batch(fractions, stop_words, service_words)
p.preprocess()

+--------------------+
|              vector|
+--------------------+
|[шапка, головна, ...|
|[файл, фізична, к...|
|[атом, значення, ...|
|[мільярд, число, ...|
|[ядро, основна, ч...|
+--------------------+
only showing top 5 rows

+--------------------+
| vector_no_stopwords|
+--------------------+
|[шапка, головна, ...|
|[файл, фізична, к...|
|[атом, значення, ...|
|[мільярд, число, ...|
|[ядро, основна, ч...|
+--------------------+
only showing top 5 rows

+--------------------+
|                text|
+--------------------+
|[голова, спілкува...|
|[фізична, карта, ...|
|[атом, значення, ...|
|[мільярд, число, ...|
|[ядро, основна, г...|
+--------------------+
only showing top 5 rows



# Training word2vec model over fraction of dataset

In [None]:
# # example for 10 articles

# n = 10
# print("Take {} rows".format(n))
# model = p.fit(sample=n)
# print("Saving word2vec of {} rows".format(n))
# model.save("gs:///w2v/word2vec_{}".format(n))

# x = Word2VecModel.load("gs:///w2v/word2vec_{}".format(n))
# x.getVectors()

# x.findSynonyms('дата', 5).show()

In [None]:
start = 1
end = 91

for i in range(start, end, 10):
    fractions = reduce(lambda x, y: x.union(y), read_batch(0, i))
    p = Pipeline.from_batch(fractions, stop_words, service_words)
    p.preprocess()
    part = i*10000
    print("Take {} rows".format(part))
    model = p.fit(sample=part)
    print("Saving word2vec of {} rows".format(part))
    model.save("gs:///w2v/word2vec_{}".format(i))