In [113]:
import bs4
import click
import json
import logging
import os
import os
import pandas as pd
import pickle
import re
import string
import warnings
from IPython.core.display import display, HTML
from bs4 import BeautifulSoup
from datetime import date
from nltk.stem.snowball import SnowballStemmer
from nltk.tokenize import TreebankWordTokenizer
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from pathlib import Path
from pymongo import MongoClient
from pymystem3 import Mystem
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm import tqdm


logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
warnings.filterwarnings('ignore')
raw_data_path = os.path.join("..", "..", "data", "raw")
processed_data_path = os.path.join("..", "..", "data", "processed")
external_data_path = os.path.join("..", "..", "data", "external")


def paras(post):
    paras_tags = ["h1", "h2", "h3", "h4", "h5", "h6", "p", "li"]
    try:
        soup = bs4.BeautifulSoup(post, "lxml")
        for element in soup.find_all(paras_tags):
            yield element.text
        soup.decompose()
    except:
        logging.info(f"paras soup exception on text: {post}.")
        yield ""


def sents(post):
    for para in paras(post):
        for sentence in sent_tokenize(para):
            yield sentence


def load_stop_words():
    with open(os.path.join(external_data_path, 'russian_stop_words.txt'), 'r') as handle:
        stop_words_ru = [line.rstrip('\n') for line in handle]
    with open(os.path.join(external_data_path, 'english_stop_words.txt'), 'r') as handle:
        stop_words_en = [line.rstrip('\n') for line in handle]
    return set(stop_words_ru + stop_words_en)


def lemmatize(text, morphological_analyzer, stemmer, stop_words):
    text = re.sub(r"http\S+", "", text)
    lemmatizing_paras = []
    for para in paras(text):
        lemmatizing_sents = []
        for sentence in sents(para):
            lemmatizing_words = []
            for token_analize in morphological_analyzer.analyze(sentence):
                if "analysis" in token_analize:
                    if token_analize["text"] in stop_words:
                        continue
                    if token_analize["analysis"]:
                        find = re.compile("([^,=]+)")
                        lexem = token_analize["analysis"][0]["lex"]
                        if lexem in stop_words:
                            continue
                        grammemes = token_analize["analysis"][0]["gr"]
                        grammeme = re.search(find, grammemes).group(0)
                        lemmatizing_words.append(f"{lexem}_{grammeme}")
                    else:
                        lexem = stemmer.stem(token_analize["text"])
                        if lexem in stop_words:
                            continue
                        lemmatizing_words.append(lexem)
            if len(lemmatizing_words):
                lemmatizing_sents.append(lemmatizing_words)
        print(len(lemmatizing_sents))
        if len(lemmatizing_sents):
            lemmatizing_paras.append(lemmatizing_sents)
    return lemmatizing_paras

In [114]:
logging.info("Create MongoDB connection...")
client = MongoClient('localhost', 27017)
db = client.publicru_test
collection = db.documents_collection

logging.info(
    f"Collection has {collection.count_documents({'pt_body': None})} documents without preprocessing 'body'.")

morphological_analyzer = Mystem()
stemmer = SnowballStemmer("english")
stop_words = load_stop_words()

2020-03-29 01:20:04,575 : INFO : Create MongoDB connection...
2020-03-29 01:20:05,488 : INFO : Collection has 518456 documents without preprocessing 'body'.


In [115]:
n = 16
# result = collection.find({}).skip(0 if n == 0 else (n - 1)).limit(1)
result = collection.find({}).skip(n).limit(1)
document = result[0]

In [116]:
t_body = lemmatize(document["body"], morphological_analyzer, stemmer, stop_words)

2
1
3
3
6
0
1
2
6
5
1
0
1
1
1


In [117]:
len(t_body)

13

In [118]:
document["body"]

'<body><p>Мнения спикеров. Итоги</p>\n<p>Ширлин Лим, генеральный менеджер отеля Radisson Sonya*, Санкт Петербург, - амбассадор программы «Женщины-лидеры» (Women in Leadership*) компании Carlson Rezidor*</p>\n<p>В современном бизнес-сообществе уже давно нет разделений на мужчин и женщин: все в равной степени являются партнерами. Подобную концепцию мы постоянно слышим и читаем в известных журналах, посвященных бизнесу. Однако, согласно исследованию портала hh.ru (http://spb.hh.ru/article/17038), каждая пятая женщина в России полагает, что гендерные стереотипы так или иначе повлияли на их карьерный рост.</p>\n<p>По словам Татьяны Старковой, технического специалиста компании, занимающейся разработкой программного обеспечения для автоматизации грузовых авиаперевозок, гендерные предубеждения - это повседневная сторона ее работы. Она отмечает, что среди основной массы клиентов технические специалисты мужского пола пользуются большим доверием, чем женщины. Она отмечает, что женщины тем не мене

In [119]:
t_body

[[['мнение_S', 'спикер_S'], ['итог_S']],
 [['ширлин_S',
   'лим_S',
   'генеральный_A',
   'менеджер_S',
   'отель_S',
   'radisson',
   'sonya',
   'санкт_COM',
   'петербург_S',
   'амбассадор_S',
   'программа_S',
   'женщина_S',
   'лидер_S',
   'women',
   'leadership',
   'компания_S',
   'carlson',
   'rezidor']],
 [['современный_A',
   'бизнес_S',
   'сообщество_S',
   'разделение_S',
   'мужчина_S',
   'женщина_S',
   'равный_A',
   'степень_S',
   'являться_V',
   'партнер_S'],
  ['подобный_A',
   'концепция_S',
   'постоянно_ADV',
   'слышать_V',
   'читать_V',
   'известный_A',
   'журнал_S',
   'посвящать_V',
   'бизнес_S'],
  ['согласно_PR',
   'исследование_S',
   'портал_S',
   'hh',
   'ru',
   'женщина_S',
   'россия_S',
   'полагать_V',
   'гендерный_A',
   'стереотип_S',
   'иначе_CONJ',
   'повлиять_V',
   'карьерный_A',
   'рост_S']],
 [['слово_S',
   'татьяна_S',
   'старкова_S',
   'технический_A',
   'специалист_S',
   'компания_S',
   'заниматься_V',
   'разра

In [50]:
def run(document):
    collection.update_one({"doc_id": document["doc_id"]}, {"$set": {
        "t_title": lemmatize(document["title"], morphological_analyzer, stemmer, stop_words),
        "t_body": lemmatize(document["body"], morphological_analyzer, stemmer, stop_words),
    }})

In [30]:
import multiprocessing as mp

In [19]:
mp.cpu_count()

24

In [None]:
with mp.Pool(cpu_count()) as p:
    print(p.map(run, collection.find({"pt_body": {"$exists": False}}, no_cursor_timeout=True).limit(1000)))

Process ForkPoolWorker-28:
2020-03-28 22:56:13,815 : INFO : paras soup exception on text: Новости за рубежом: цветные металлы | Спрос на медь будет расширяться, но не везде.
2020-03-28 22:56:13,814 : INFO : paras soup exception on text: У нас другой маневр - все внимание сфокусировано на перспективных рынках, «голубых океанах», которые только формируются и размер которых в ближайшие 20 лет превысит 100 миллиардов долларов. На их покорение у нас такие же шансы, как и у других стран. Успех будет зависеть от степени концентрации усилий бизнеса, государства и общества на строительстве в России экономики знаний..
2020-03-28 22:56:13,815 : INFO : paras soup exception on text: Разрешают голосовать гражданам, проживающим за пределами страны, 111 государств, в частности Германия, Канада, США, Франция, Сербия, Италия, Белоруссия. А 64 государства разрешают участвовать в выборах даже тем, кто не проживает, а просто находится за пределами страны в день выборов - в отпуске, туристической поездке, п

In [17]:
collection.count_documents({})

588456

In [16]:
collection.count_documents({"pt_body": {"$exists": False}})

588456

In [21]:
import time

In [22]:
%%time

documents = [{"a number": i} for i in range(1000000)]

time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
    col.insert_one(doc)

print(time.time()-time1s)

457.7988154888153
CPU times: user 4min 45s, sys: 29.9 s, total: 5min 15s
Wall time: 7min 38s


In [None]:
import multiprocessing as mp
import time
from pymongo import MongoClient

In [27]:
%%time

documents = [{"number": i} for i in range(1000000)]

def insert_doc(chunk):
    client = MongoClient()
    db = client.mydb
    col = db.mycol
    col.insert_many(chunk)

chunk_size = 10000

def chunks(sequence):
    # Chunks of 1000 documents at a time.
    for j in range(0, len(sequence), chunk_size):
        yield sequence[j:j + chunk_size]

pool = mp.Pool(processes=mp.cpu_count())
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()

CPU times: user 512 ms, sys: 417 ms, total: 930 ms
Wall time: 2.25 s


In [None]:
len_sequence = collection.count_documents({"pt_body": {"$exists": False}})
chunk_size = 10000

def chunks(sequence):
    for j in range(0, len_sequence, chunk_size):
        yield sequence[j:j + chunk_size]

pool = mp.Pool(processes=mp.cpu_count())
pool.map(insert_doc, chunks(collection.find({"pt_body": {"$exists": False}}, no_cursor_timeout=True)))

In [32]:
%%time

client = MongoClient('localhost', 27017)
db = client.publicru_test
collection = db.documents_collection

documents = []
for document in collection.find({"pt_body": {"$exists": False}}, no_cursor_timeout=True).limit(1000):
    document["t_title"] = lemmatize(document["title"], morphological_analyzer, stemmer, stop_words),
    document["t_body"] = lemmatize(document["body"], morphological_analyzer, stemmer, stop_words),
    documents.append(document)

CPU times: user 29.3 s, sys: 1.45 s, total: 30.7 s
Wall time: 1min 10s


In [39]:
collection.count_documents({}) / 1000 * 70 / 60 / 60 

11.442199999999998

In [86]:
import bs4
import click
import json
import logging
import os
import os
import pandas as pd
import pickle
import re
import string
import warnings
from IPython.core.display import display, HTML
from bs4 import BeautifulSoup
from datetime import date
from nltk.stem.snowball import SnowballStemmer
from nltk.tokenize import TreebankWordTokenizer
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from pathlib import Path
from pymongo import MongoClient
from pymystem3 import Mystem
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm import tqdm


logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
warnings.filterwarnings('ignore')
raw_data_path = os.path.join("..", "..", "data", "raw")
processed_data_path = os.path.join("..", "..", "data", "processed")
external_data_path = os.path.join("..", "..", "data", "external")


def paras(post):
    paras_tags = ["h1", "h2", "h3", "h4", "h5", "h6", "p", "li"]
    try:
        soup = bs4.BeautifulSoup(post, "lxml")
        for element in soup.find_all(paras_tags):
            yield element.text
        soup.decompose()
    except:
        logging.info(f"paras soup exception on text: {post}.")
        yield ""


def sents(post):
    for para in paras(post):
        for sentence in sent_tokenize(para):
            yield sentence


def load_stop_words():
    with open(os.path.join(external_data_path, 'russian_stop_words.txt'), 'r') as handle:
        stop_words_ru = [line.rstrip('\n') for line in handle]
    with open(os.path.join(external_data_path, 'english_stop_words.txt'), 'r') as handle:
        stop_words_en = [line.rstrip('\n') for line in handle]
    return set(stop_words_ru + stop_words_en)


def lemmatize(text, morphological_analyzer, stemmer, stop_words):
    lemmatizing_paras = []
    for para in paras(text):
        lemmatizing_sents = []
        for sentence in sents(para):
            lemmatizing_words = []
            for token_analize in morphological_analyzer.analyze(sentence):
                if "analysis" in token_analize:
                    if token_analize["text"] in stop_words:
                        continue
                    if token_analize["analysis"]:
                        find = re.compile("([^,=]+)")
                        lexem = token_analize["analysis"][0]["lex"]
                        if lexem in stop_words:
                            continue
                        grammemes = token_analize["analysis"][0]["gr"]
                        grammeme = re.search(find, grammemes).group(0)
                        lemmatizing_words.append(f"{lexem}_{grammeme}")
                    else:
                        lexem = stemmer.stem(token_analize["text"])
                        if lexem in stop_words:
                            continue
                        lemmatizing_words.append(lexem)
            lemmatizing_sents.append(lemmatizing_words)
        lemmatizing_paras.append(lemmatizing_sents)
    return lemmatizing_paras

In [120]:
morphological_analyzer = Mystem()
stemmer = SnowballStemmer("english")
stop_words = load_stop_words()

client = MongoClient('localhost', 27017)
db = client.publicru_test
collection = db.documents_collection

In [124]:
collection.update_many({}, { "$unset": { "t_body": "", "t_title": "", "pt_body": "", "pt_title": "" } })

<pymongo.results.UpdateResult at 0x7f3289887780>

In [56]:
collection.count_documents({}) / 10000 * 37 / 60 / 60 

0.604802

In [58]:
%%time

for document in documents:
    collection.update_one({"doc_id": document["doc_id"]}, {"$set": {
            "pt_title": document["pt_title"],
            "pt_body": document["pt_body"],
        }})

CPU times: user 11 s, sys: 676 ms, total: 11.7 s
Wall time: 1min 5s


In [69]:
from tqdm.notebook import tqdm

In [64]:
import multiprocessing as mp

morphological_analyzer = Mystem()
stemmer = SnowballStemmer("english")
stop_words = load_stop_words()

client = MongoClient('localhost', 27017)
db = client.publicru_test
collection = db.documents_collection

In [65]:
%%time

def lem_doc(document):
    document["pt_title"] = lemmatize(document["title"], morphological_analyzer, stemmer, stop_words),
    document["pt_body"] = lemmatize(document["body"], morphological_analyzer, stemmer, stop_words),
    return document

    
def update_doc(chunk):
    client = MongoClient('localhost', 27017)
    db = client.publicru_test
    collection = db.documents_collection
    for document in chunk:
        collection.update_one({"doc_id": document["doc_id"]}, {"$set": {
                "pt_title": document["pt_title"],
                "pt_body": document["pt_body"],
            }})

        
def chunks(sequence):
    # Chunks of 1000 documents at a time.
    for j in range(0, len(sequence), chunk_size):
        yield sequence[j:j + chunk_size]

CPU times: user 9 µs, sys: 1 µs, total: 10 µs
Wall time: 14.5 µs


In [67]:
%%time

chunk_size = 1000
docs_for_update = 50000  
    
with mp.Pool(mp.cpu_count()) as p:
    documents = p.map(lem_doc, collection.find({"pt_body": {"$exists": False}}, no_cursor_timeout=True).limit(docs_for_update))

with mp.Pool(mp.cpu_count()) as p:
    p.map(update_doc, chunks(documents))

CPU times: user 1min 18s, sys: 18.5 s, total: 1min 37s
Wall time: 7min 18s


In [74]:
collection.count_documents({}) / docs_for_update * 8

94.15296

In [None]:
%%time

chunk_size = 1000
docs_for_update = 50000

with mp.Pool(mp.cpu_count()) as p:
    documents = p.map(lem_doc, collection.find({"pt_body": {"$exists": False}}, no_cursor_timeout=True).limit(docs_for_update))

with mp.Pool(mp.cpu_count()) as p:
    p.map(update_doc, chunks(documents))

In [72]:
%%time

chunk_size = 1000
docs_for_update = 50000

for i in tqdm(range(round(collection.count_documents({}) / docs_for_update))):
    with mp.Pool(mp.cpu_count()) as p:
        documents = p.map(lem_doc, collection.find({"pt_body": {"$exists": False}}, no_cursor_timeout=True).limit(docs_for_update))

    with mp.Pool(mp.cpu_count()) as p:
        p.map(update_doc, chunks(documents))

HBox(children=(FloatProgress(value=0.0, max=38456.0), HTML(value='')))




In [71]:
collection.count_documents({}) % docs_for_update

38456

In [79]:
for i in tqdm(range(round(collection.count_documents({}) / docs_for_update))):
    pass

HBox(children=(FloatProgress(value=0.0, max=12.0), HTML(value='')))




In [57]:
len(documents)

10000

In [52]:
documents[0]

{'_id': ObjectId('5e7854ebc59124ce04becf39'),
 'title': 'Эффект слабого рубля',
 'annotation': 'Мы взяли две компании с сопоставимой выручкой и оценили влияние курса рубля на их финансы. Первый участник - это производитель удобрений - «ФосАгро», который большую часть выручки получает от экспорта.',
 'body': '<body><p><b>Мы взяли две компании с сопоставимой выручкой и оценили влияние курса рубля на их финансы. Первый участник - это производитель удобрений - «ФосАгро», который большую часть выручки получает от экспорта. Второй участник ориентирован на внутренний рынок - это компания «М.видео», продающая электронику и технику. В качестве начала отчета возьмем 2013 год, когда курс рубля к доллару оставался стабильным и находился в диапазоне 30 - 33,5.</b></p>\n<p><img alt="" src="325/1.jpg" title=""/></p>\n</body>',
 'authors': nan,
 'issue_id': 9,
 'edition_id': 1,
 'issue_date': '2016-10-01',
 'edition_type': 'magazine',
 'edition_periodicity': 'monthly',
 'edition_description': 'Журнал 