In [1]:
from pyspark import SparkContext, SparkConf

conf = (SparkConf()
         .setMaster("local")
         .setAppName("News Processing")
         .set("spark.executor.memory", "9g"))
            
sc = SparkContext(conf = conf)

In [2]:
import re
import nltk
import string
import pandas as pd
import news_config as config
from operator import add
from pymystem3 import Mystem
from NewsMongoMiner import NewsMongoHelper

import sys
reload(sys)
sys.setdefaultencoding('utf8')

%load_ext autoreload
%autoreload 2

mystem = Mystem()
mongo_helper = NewsMongoHelper()

In [3]:
news = mongo_helper.get_news(config.pravda_collection)
articles = [item['text'] for item in news[:1000]]

In [3]:
stop_words = [u'', u' ', u'-', '\n', u'–', u'это', u'еще', u'него', u'сказать', u'а', u'ж', u'нее', u'со', u'без', u'же', u'ней', 
      u'совсем', u'более', u'жизнь', u'нельзя', u'так', u'больше', u'за', u'нет', 
      u'такой', u'будет', u'зачем', u'ни', u'там', u'будто', u'здесь', u'нибудь', u'тебя', 
      u'бы', u'и', u'никогда', u'тем', u'был', u'из', u'ним', u'теперь', u'была', u'из-за', 
      u'них', u'то', u'были', u'или', u'ничего', u'тогда', u'было', u'им', u'но', u'того', 
      u'быть', u'иногда', u'ну', u'тоже', u'в', u'их', u'о', u'только', u'вам', u'к', u'об', 
      u'том', u'вас', u'кажется', u'один', u'тот', u'вдруг', u'как', u'он', u'три', u'ведь', 
      u'какая', u'она', u'тут', u'во', u'какой', u'они', u'ты', u'вот', u'когда', u'опять', 
      u'у', u'впрочем', u'конечно', u'от', u'уж', u'все', u'которого', u'перед', u'уже', u'всегда', 
      u'которые', u'по', u'хорошо', u'всего', u'кто', u'под', u'хоть', u'всех', u'куда', u'после',
      u'чего', u'всю', u'ли', u'потом', u'человек', u'вы', u'лучше', u'потому', u'чем', u'г', u'между', 
      u'почти', u'через', u'где', u'меня', u'при', u'что', u'говорил', u'мне', u'про', u'чтоб', u'да', 
      u'много', u'раз', u'чтобы', u'даже', u'может', u'разве', u'чуть', u'два', u'можно', u'с', u'эти', 
      u'для', u'мой', u'сам', u'этого', u'до', u'моя', u'свое', u'этой', u'другой', u'мы', u'свою', 
      u'этом', u'его', u'на', u'себе', u'этот', u'ее', u'над', u'себя', u'эту', u'ей', u'надо', u'сегодня', 
      u'я', u'ему', u'наконец', u'сейчас', 'если', u'нас', 'есть', u'не', u'также']
    
stop_words = stop_words + nltk.corpus.stopwords.words('russian')
stop_words = list(set(stop_words))

punctuation_regex = re.compile('[%s]' % re.escape(string.punctuation))

def get_stop_words_from_tokens(tokens, threshold):
    tokens = pd.Series(tokens)
    token_frequencies = tokens.value_counts()
    return token_frequencies[token_frequencies < threshold].index.values.tolist()

def remove_punctuation(text):
    """ Remove single punctuation entry from tokens list """
    return punctuation_regex.sub('', text) 

def lemmatized_formatter(text):
    return [item.lower().strip() for item in mystem.lemmatize(text) if item.strip() not in [u'', u' ']]

def clean_formatter(text):
    text = remove_punctuation(text)
    lemmas = lemmatized_formatter(text)
    tokens = [lemma for lemma in lemmas if not lemma in stop_words]
    return tokens

def clean_text(text):
    if(not text):
        return ''
    tokens = clean_formatter(text)
    return ' '.join(tokens)

class TextHelper:
    @staticmethod
    def get_tokens(text):
        return nltk.tokenize.word_tokenize(text)
    
    @staticmethod
    def get_ngrams(text, ngrams):
        tokens = TextHelper.get_tokens(text)
        return list(nltk.ngrams(tokens, ngrams))
    
    @staticmethod
    def get_bigrams(text):
        return TextHelper.get_ngrams(text, 2)
    
    @staticmethod
    def get_trigrams(text):
        return TextHelper.get_ngrams(text, 3)   
    
class CorporaHelper:
    @staticmethod
    def get_tokens(articles):
        tokenized = [nltk.tokenize.word_tokenize(article) for article in articles]    
        tokens = [item for sublist in tokenized for item in sublist]
        return tokens
    
    @staticmethod
    def get_ngrams(articles, ngrams):
        text_tokens = [nltk.word_tokenize(article) for article in articles]
        bigrams_generators = [nltk.ngrams(tokens, ngrams) for tokens in text_tokens]
        bigrams_list = [list(bigrams) for bigrams in bigrams_generators]
        bigrams = [item for sublist in bigrams_list for item in sublist]
        return bigrams
    
    @staticmethod
    def get_bigrams(articles):
        return CorporaHelper.get_ngrams(articles, 2)
    
    @staticmethod
    def get_trigrams(articles):
        return CorporaHelper.get_ngrams(articles, 3)   

### Basic cleaning of news

In [9]:
def clean_news(entry):
    entry['title'] = clean_text(entry['title'])
    entry['text'] = clean_text(entry['text'])
    entry['summary'] = clean_text(entry['summary'])
    return entry

news_cleaned = (sc.parallelize(news)
                .map(clean_news))

news_cleaned.cache()

PythonRDD[2] at RDD at PythonRDD.scala:43

In [8]:
# from pymongo import MongoClient
# mongo_client = MongoClient(config.mongo_db_connection_string)
# collection = mongo_client[config.db]['pravda_cleaned']
# collection.insert(news_cleaned)

### Tokenization and searching for stop words

In [4]:
news = mongo_helper.get_news(config.pravda_cleaned_collection)
articles = [entry["text"] for entry in news]

In [5]:
def word_count(words_rdd):
    return words_rdd.map(lambda word: (word,1)).reduceByKey(add)

news_rdd = sc.parallelize(news)
articles_rdd = sc.parallelize(articles)
tokens = articles_rdd.flatMap(TextHelper.get_tokens)
tokens_count = word_count(tokens)
tokens_count.cache()

most_frequent = tokens_count.takeOrdered(15, key = lambda x: -x[1])

In [33]:
def tokens_statistics(tokens_count):
    all_words_count = tokens_count.map(lambda (k,v): v).reduce(add)
    all_unique_words_count = tokens_count.map(lambda (k,v): k).count()
    rare_words_count = tokens_count.filter(lambda (k,v): v == 1).map(lambda (k,v): v).count()
    rare_unique_words_count = tokens_count.filter(lambda (k,v): v == 1).map(lambda (k,v): k).count()
    print 'Total number of words', all_words_count
    print 'Total number of unique words', all_unique_words_count
    print 'Total number of rare words', rare_words_count
    print 'Number of unique rare words', rare_unique_words_count
    
tokens_statistics(tokens_count) 

In [31]:
rare_tokens_rdd = tokens_count.filter(lambda (k,v): v == 1) #.map(lambda (word, count): word)

#flattens news to (token, (_id, position)) represantation and removes rare tokens from rdd
used_tokens = (news_rdd
 .map(lambda item: (item['_id'], item['text']))
 .mapValues(TextHelper.get_tokens)
 .mapValues(lambda tokens: [(token, position) for position, token in enumerate(tokens)])
 .map(lambda (_id, tokens_with_positions): [(token, (_id, position)) for (token, position) in tokens_with_positions])
 .flatMap(lambda item: item)
 .subtractByKey(rare_tokens_rdd))

#convers (token, (_id, position)) to (_id, text) form
texts_without_rare_words = (used_tokens
 .map(lambda (token, (_id, position)): (_id, (position, token)))
 .groupByKey()
 .mapValues(lambda items: [token for (position, token) in sorted(items)])
 .mapValues(lambda tokens: u' '.join(tokens)))

news_without_text = (news_rdd
                     .map(lambda article: {key:value for key, value in article.items() if key not in [u'text']}))

In [None]:
articles_rdd = sc.parallelize(articles)
articles_clean = articles_rdd.map(clean_text)
articles_clean.cache()
article_tokens = articles_clean.map(TextHelper.get_tokens)
tokens = article_tokens.flatMap(lambda token: token).map(lambda token: (token, 1))
tokens.cache()
tokens_frequency = tokens.reduceByKey(add)

In [None]:
frequent = tokens_frequency.takeOrdered(15, key = lambda x: -x[1])
for item in frequent[:10]:
    print item[0]