
### BigData Project on
# NLP for russian language via SparkNLP

Vadim Alperovich <br>
IAD21

[![Go to colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1WpuuDQSxczxT95MxYoOewBnDqKZ9pVOE?authuser=3#scrollTo=TA21Jo5d9SVq)




# 1. Colab-Spark setup

In [1]:
%%time
from IPython.display import clear_output
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

# Install Spark NLP Display for visualization
!pip install --ignore-installed spark-nlp-display
clear_output(wait=True)

CPU times: user 217 ms, sys: 69.8 ms, total: 287 ms
Wall time: 23.5 s


In [159]:
%%time
import sparknlp
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline

spark = sparknlp.start()

CPU times: user 17 µs, sys: 0 ns, total: 17 µs
Wall time: 21 µs


# 2. Data loading

In [2]:
from google.colab import drive
drive.mount('/content/drive')
PROJECT_FOLDER = '/content/drive/MyDrive/spark-project/'
%ls {PROJECT_FOLDER}

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
bigdata-project  [0m[01;34mdata[0m/


In [24]:
%%time
# download the Lenta.ru v1.0 russian news dataset
!wget https://github.com/yutkin/Lenta.Ru-News-Dataset/releases/download/v1.0/lenta-ru-news.csv.gz -P {PROJECT_FOLDER}/data
# download supporting library
!pip install corus
clear_output(wait=True)

CPU times: user 221 ms, sys: 91.9 ms, total: 313 ms
Wall time: 16.9 s


In [32]:
import json
import corus
import pandas as pd
import numpy as np

In [120]:
%%time
from corus import load_lenta

records = load_lenta(path)
l = []
for i, r in enumerate(records):
    if i > 50000:
        break
    l.append(list(r))
df = pd.DataFrame(l, columns=['url', 'title', 'text', 'topic', 'tags', 'date'])
df.to_csv(f'{PROJECT_FOLDER}/data/lenta-ru-news50000.csv', sep='\t', index=None)

CPU times: user 4.35 s, sys: 257 ms, total: 4.61 s
Wall time: 5.07 s


In [88]:
# %%time
# path = f'{PROJECT_FOLDER}/data/lenta-ru-news.csv.gz'

# df = pd.read_csv(path, nrows=50000, compression='gzip')
# df.head()
# df.to_csv(f'{PROJECT_FOLDER}/data/lenta-ru-news50000.csv', sep='\t', index=None)

In [121]:
spark_df = spark.read \
      .option("inferSchema","true") \
      .option("header", True) \
      .csv(f'{PROJECT_FOLDER}/data/lenta-ru-news50000.csv', sep='\t')
print(f'Hell yeah, spark_df is {type(spark_df)}!')
print(f'spark_df has {spark_df.count()} rows')
spark_df

Hell yeah, spark_df is <class 'pyspark.sql.dataframe.DataFrame'>!
spark_df has 50001 rows


DataFrame[url: string, title: string, text: string, topic: string, tags: string, date: string]

In [122]:
spark_df.show(n=10, truncate=30)

+------------------------------+------------------------------+------------------------------+-----------------+---------------+----+
|                           url|                         title|                          text|            topic|           tags|date|
+------------------------------+------------------------------+------------------------------+-----------------+---------------+----+
|https://lenta.ru/news/2018/...|Названы регионы России с са...|Вице-премьер по социальным ...|           Россия|       Общество|null|
|https://lenta.ru/news/2018/...|Австрия не представила дока...|Австрийские правоохранитель...|            Спорт|    Зимние виды|null|
|https://lenta.ru/news/2018/...|Обнаружено самое счастливое...|Сотрудники социальной сети ...|      Путешествия|            Мир|null|
|https://lenta.ru/news/2018/...|В США раскрыли сумму расход...|С начала расследования росс...|              Мир|       Политика|null|
|https://lenta.ru/news/2018/...|Хакеры рассказали о планах ...

# 3. EDA & Preprocessing

In [133]:
column = 'topic'
topic_unique = spark_df.select(column).distinct().count()
print(f'*{column}* columns has {topic_unique} unique values', )
spark_df.select(column).groupBy(column).count().orderBy(F.desc('count')).toPandas()

*topic* columns has 19 unique values


Unnamed: 0,topic,count
0,Мир,6896
1,Россия,6871
2,Спорт,5458
3,Экономика,4915
4,Интернет и СМИ,3886
5,Наука и техника,3511
6,Из жизни,3320
7,Бывший СССР,3260
8,Культура,3171
9,Силовые структуры,2693


In [135]:
column = 'tags'
tags_unique = spark_df.select(column).distinct().count()
print(f'*{column}* columns has {tags_unique} unique values', )
spark_df.select(column).groupBy(column).count().orderBy(F.desc('count')).toPandas()

*tags* columns has 80 unique values


Unnamed: 0,tags,count
0,Политика,5521
1,Общество,4501
2,Происшествия,2933
3,Футбол,2798
4,Украина,2548
...,...,...
75,Наследие,10
76,Страноведение,8
77,Выборы,4
78,Экология,3


In [160]:
def get_text_count(text, token_len=False):
    if token_len:
        return len(text.split(' '))
    return len(text)

spark_get_char_count = F.udf(lambda x: get_text_count(x), T.IntegerType())
spark_get_token_count = F.udf(lambda x: get_text_count(x, token_len=True), T.IntegerType())

In [165]:
%%time
# ```text``` stats
spark_df = spark_df.withColumn("char_count", spark_get_char_count(F.col("text")))
spark_df = spark_df.withColumn("token_count", spark_get_token_count(F.col("text")))
# ```title``` stats
spark_df = spark_df.withColumn("char_count_title", spark_get_char_count(F.col("title")))
spark_df = spark_df.withColumn("token_count_title", spark_get_token_count(F.col("title")))

CPU times: user 9.45 ms, sys: 3.08 ms, total: 12.5 ms
Wall time: 48.8 ms


In [166]:
spark_df.show(n=5, truncate=10)

+----------+----------+----------+----------+----------+----+----------+-----------+----------------+-----------------+
|       url|     title|      text|     topic|      tags|date|char_count|token_count|char_count_title|token_count_title|
+----------+----------+----------+----------+----------+----+----------+-----------+----------------+-----------------+
|https:/...|Названы...|Вице-пр...|    Россия|  Общество|null|       660|         92|              58|                7|
|https:/...|Австрия...|Австрий...|     Спорт|Зимние ...|null|      1072|        137|              65|                6|
|https:/...|Обнаруж...|Сотрудн...|Путешес...|       Мир|null|       961|        120|              44|                5|
|https:/...|В США р...|С начал...|       Мир|  Политика|null|      1347|        189|              65|                8|
|https:/...|Хакеры ...|Хакерск...|       Мир|  Общество|null|      2066|        255|              66|                6|
+----------+----------+----------+------

In [168]:
spark_df.select('char_count', 'token_count', 'char_count_title', 'token_count_title').describe().toPandas()

Unnamed: 0,summary,char_count,token_count,char_count_title,token_count_title
0,count,50001.0,50001.0,50001.0,50001.0
1,mean,1263.7543049139017,172.83338333233334,54.32613347733045,6.3006339873202535
2,stddev,470.6264749594072,63.09961567498512,13.146924158437349,1.474631275286148
3,min,252.0,31.0,12.0,1.0
4,max,9282.0,1246.0,84.0,12.0


# 4. Pipelines & Experiments

## 4.1 Text processing pipeline

In [203]:
%%time 

document_assembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setLowercase(True) \
      .setOutputCol("normalized_token")

stopwords_cleaner = StopWordsCleaner.pretrained("stopwords_ru", "ru")\
      .setInputCols("normalized_token")\
      .setOutputCol("clean_token")\
      .setCaseSensitive(False)

lemmatizer = LemmatizerModel.pretrained("lemma", "ru") \
        .setInputCols(["clean_token"]) \
        .setOutputCol("lemma")

preprocessing_pipeline = Pipeline(stages=[
                                        document_assembler, 
                                        tokenizer,
                                        normalizer,
                                        stopwords_cleaner,
                                        lemmatizer])

stopwords_ru download started this may take some time.
Approximate size to download 2.9 KB
[OK!]
lemma download started this may take some time.
Approximate size to download 1.3 MB
[OK!]
CPU times: user 122 ms, sys: 29 ms, total: 151 ms
Wall time: 6.02 s


In [204]:
%%time

preprocessor_model = preprocessing_pipeline.fit(spark_df[['text']])
doc_df = preprocessor_model.transform(spark_df[['text']])

doc_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 

In [209]:
# doc_df[['lemma']].show(n=10, truncate=150)

In [208]:
doc_df.withColumn(
    "tmp", 
    F.explode("lemma"))\
    .select("tmp.*")\
    .show(truncate=False)

+-------------+-----+---+--------------+---------------+----------+
|annotatorType|begin|end|result        |metadata       |embeddings|
+-------------+-----+---+--------------+---------------+----------+
|token        |0    |10 |вицепремьер   |[sentence -> 0]|[]        |
|token        |16   |25 |социальный    |[sentence -> 0]|[]        |
|token        |27   |34 |вопрос        |[sentence -> 0]|[]        |
|token        |36   |42 |татьяна       |[sentence -> 0]|[]        |
|token        |44   |51 |голикова      |[sentence -> 0]|[]        |
|token        |53   |62 |рассказать    |[sentence -> 0]|[]        |
|token        |67   |71 |какой         |[sentence -> 0]|[]        |
|token        |73   |80 |регион        |[sentence -> 0]|[]        |
|token        |82   |87 |россии        |[sentence -> 0]|[]        |
|token        |89   |101|фиксировать   |[sentence -> 0]|[]        |
|token        |112  |118|высокий       |[sentence -> 0]|[]        |
|token        |120  |129|смертность    |[sentenc

In [12]:
%%time
useEmbeddings = UniversalSentenceEncoder.pretrained()\
      .setInputCols("document")\
      .setOutputCol("use_embeddings")

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
CPU times: user 1.1 s, sys: 235 ms, total: 1.34 s
Wall time: 2min 51s


## 3. Select the DL model

In [None]:
# If you change the model, re-run all the cells below.
# Applicable models: wikiner_840B_300, wikiner_6B_300, wikiner_6B_100
MODEL_NAME = "wikiner_840B_300"

## 4. Some sample examples

In [None]:
# Enter examples to be transformed as strings in this list
text_list = [
    """Уильям Генри Гейтс III (родился 28 октября 1955 года) - американский бизнес-магнат, разработчик программного обеспечения, инвестор и филантроп. Он наиболее известен как соучредитель корпорации Microsoft. За время своей карьеры в Microsoft Гейтс занимал должности председателя, главного исполнительного директора (CEO), президента и главного архитектора программного обеспечения, а также был крупнейшим индивидуальным акционером до мая 2014 года. Он является одним из самых известных предпринимателей и пионеров микрокомпьютерная революция 1970-х и 1980-х годов. Гейтс родился и вырос в Сиэтле, штат Вашингтон, в 1975 году вместе с другом детства Полом Алленом в Альбукерке, штат Нью-Мексико, и основал компанию Microsoft. она стала крупнейшей в мире компанией-разработчиком программного обеспечения для персональных компьютеров. Гейтс руководил компанией в качестве председателя и генерального директора, пока в январе 2000 года не ушел с поста генерального директора, но остался председателем и стал главным архитектором программного обеспечения. В конце 1990-х Гейтс подвергся критике за свою деловую тактику, которая считалась антиконкурентной. Это мнение было подтверждено многочисленными судебными решениями. В июне 2006 года Гейтс объявил, что перейдет на неполный рабочий день в Microsoft и будет работать на полную ставку в Фонде Билла и Мелинды Гейтс, частном благотворительном фонде, который он и его жена Мелинда Гейтс создали в 2000 году. [ 9] Постепенно он передал свои обязанности Рэю Оззи и Крейгу Манди. Он ушел с поста президента Microsoft в феврале 2014 года и занял новую должность консультанта по технологиям для поддержки вновь назначенного генерального директора Сатья Наделла.""",
    """Мона Лиза - картина маслом 16-го века, созданная Леонардо. Он проводится в Лувре в Париже."""
]

## 5. Define Spark NLP pipeline

In [None]:
document_assembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# The wikiner_840B_300 is trained with glove_840B_300, so the embeddings in the
# pipeline should match. Same applies for the other available models.
if MODEL_NAME == "wikiner_840B_300":
    embeddings = WordEmbeddingsModel.pretrained('glove_840B_300', lang='xx') \
        .setInputCols(['document', 'token']) \
        .setOutputCol('embeddings')
elif MODEL_NAME == "wikiner_6B_300":
    embeddings = WordEmbeddingsModel.pretrained('glove_6B_300', lang='xx') \
        .setInputCols(['document', 'token']) \
        .setOutputCol('embeddings')
elif MODEL_NAME == "wikiner_6B_100":
    embeddings = WordEmbeddingsModel.pretrained('glove_100d') \
        .setInputCols(['document', 'token']) \
        .setOutputCol('embeddings')

ner_model = NerDLModel.pretrained(MODEL_NAME, 'ru') \
    .setInputCols(['document', 'token', 'embeddings']) \
    .setOutputCol('ner')

ner_converter = NerConverter() \
    .setInputCols(['document', 'token', 'ner']) \
    .setOutputCol('ner_chunk')

nlp_pipeline = Pipeline(stages=[
    document_assembler, 
    tokenizer,
    embeddings,
    ner_model,
    ner_converter
])

glove_840B_300 download started this may take some time.
Approximate size to download 2.3 GB
[OK!]
wikiner_840B_300 download started this may take some time.
Approximate size to download 14.4 MB
[OK!]


## 6. Run the pipeline

In [None]:
empty_df = spark.createDataFrame([['']]).toDF('text')
pipeline_model = nlp_pipeline.fit(empty_df)
df = spark.createDataFrame(pd.DataFrame({'text': text_list}))
result = pipeline_model.transform(df)

## 7. Visualize results

In [None]:
from sparknlp_display import NerVisualizer

NerVisualizer().display(
    result = result.collect()[0],
    label_col = 'ner_chunk',
    document_col = 'document'
)