In [2]:
# coding=utf-8

Константы работы с базой данных <b>Elasticsearch</b>

In [3]:
ES_INDEX = "fb_group_posts"
ES_POST_DOC_TYPE = "post"
ES_NAME_RELATION_DOC_TYPE = "name_relation"
ES_BULK_ACTIONS_SIZE = 500

Вся работа с Elasticsearch скрыта в классе <b>FacebookDBHelper</b>, и предоставляет удобный интерфейс.

In [43]:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import TransportError
from elasticsearch import helpers
from copy import deepcopy


class PostData(object):
    def __init__(self, id_, messages):
        self.id = id_
        self.message = messages


class FLNameData(object):
    def __init__(self, fl_name, post_id):
        self.fl_name = fl_name
        self.post_id = post_id


class FacebookDBHelper(object):
    def __init__(self):
        self.es = Elasticsearch()

    def save_posts(self, group_name, group_domain, posts):
        actions = []

        for post in posts:
            if 'message' in post.keys():
                action = {
                    "_index": ES_INDEX,
                    "_type": ES_POST_DOC_TYPE,
                    "_id": post['id'],
                    "_source": {
                        "message": post['message'],
                        "group_name": group_name,
                        "group_domain": group_domain
                    }
                }

                actions.append(action)

        self.__bulk_insert(actions)

    def save_name_relations(self, relations):
        actions = []

        for relation in relations:
            action = {
                "_index": ES_INDEX,
                "_type": ES_NAME_RELATION_DOC_TYPE,
                "_source": {
                    "fl_name": relation.fl_name,
                    "post_id": relation.post_id
                }
            }

            actions.append(action)

        self.__bulk_insert(actions)

    def __bulk_insert(self, actions):
        actions_list = split_list(list(actions), ES_BULK_ACTIONS_SIZE)

        for acts in actions_list:
            helpers.bulk(self.es, acts)

    def get_post_by_id(self, id):
        return self.__get(doc_type=ES_POST_DOC_TYPE, id=id)

    def get_all_posts(self):
        return self.__get_all(doc_type=ES_POST_DOC_TYPE)

    def get_all_name_relations(self, doc_type):
        relations = self.__get_all(doc_type=doc_type)

        # noinspection PyTypeChecker
        return [FLNameData(fl_name=r['_source']['fl_name'], post_id=r["_id"]) for r in relations]

    def __get(self, doc_type, id):
        return self.es.get(index=ES_INDEX, doc_type=doc_type, id=id)

    def __get_all(self, doc_type, body=None):
        if body is None:
            body = {}

        result = []

        page = self.es.search(
            index=ES_INDEX,
            doc_type=doc_type,
            scroll='2m',
            search_type='scan',
            size=1000,
            body=body)

        scroll_id = page['_scroll_id']
        scroll_size = page['hits']['total']

        while scroll_size > 0:
            page = self.es.scroll(scroll_id=scroll_id, scroll='2m')

            scroll_id = page['_scroll_id']
            scroll_size = len(page['hits']['hits'])

            result.extend(page['hits']['hits'])

        return result

    def get_all_sources(self, doc_type):
        posts = self.get_all_posts(doc_type=doc_type)
        result = []

        for post in posts:
            if "_source" in post.keys():
                # noinspection PyTypeChecker
                result.append(post["_source"])

        return result

    def get_all_messages(self, doc_type):
        sources = self.get_all_sources(doc_type=doc_type)
        result = []

        for source in sources:
            if 'message' in source.keys():
                # noinspection PyTypeChecker
                result.append(source["message"])

        return result

    def delete_all_posts(self):
        return delete_by_doc_type(
            es=self.es,
            index=ES_INDEX,
            type_=ES_POST_DOC_TYPE)

    def delete_all_name_relations(self):
        return delete_by_doc_type(
            es=self.es,
            index=ES_INDEX,
            type_=ES_NAME_RELATION_DOC_TYPE)

    def get_messages_by_domain(self, domain):
        return self.__get_all(doc_type=ES_POST_DOC_TYPE, body={
            "query": {
                "match": {
                    "domain": {
                        "query": domain,
                        "operator": "and"
                    }
                }
            }
        })

    def get_name_relations_by_fl(self, fl_name):
        return self.__get_all(doc_type=ES_NAME_RELATION_DOC_TYPE, body={
            "query": {
                "match": {
                    "fl_name": {
                        "query": fl_name,
                        "operator": "and"
                    }
                }
            }
        })


def delete_by_doc_type(es, index, type_):
    try:
        count = es.count(index, type_)['count']
        max_count = 5000

        if not count:
            return 0

        tmp_count = count

        while tmp_count > 0:
            tmp_count -= max_count

            response = es.search(
                index=index,
                filter_path=["hits.hits._id"],
                body={"size": max_count,
                      "query": {
                          "filtered": {
                              "filter": {
                                    "type": {"value": type_}
                              }
                          }
                      }})

            if not response:
                return 0

            ids = [x["_id"] for x in response["hits"]["hits"]]

            if not ids:
                return 0

            bulk_body = [
                '{{"delete": {{"_index": "{}", "_type": "{}", "_id": "{}"}}}}'.format(index, type_, x)
                for x in ids]

            es.bulk('\n'.join(bulk_body))
            es.indices.flush_synced([index])

        return count
    except TransportError as ex:
        print("Elasticsearch error: " + ex.error)
        raise ex
        
def split_list(list_, count_):
    result = []

    if len(list_) == 0:
        return result

    if len(list_) == count_:
        result.append(list_)

        return result

    steps = len(list_) / count_
    tmp_list = deepcopy(list_)

    for i in range(0, steps):
        result.append(tmp_list[0: count_])
        tmp_list = tmp_list[count_: len(tmp_list)]

    if len(tmp_list) != 0:
        result.append(tmp_list)

    return result


In [44]:
fdb = FacebookDBHelper()

Так же для измирения времени работы некоторых участоков подключим модуль <b>"time"</b>

In [45]:
import time as t

def time():
    return int(round(t.time() * 1000))

Используемые константы для работы с <b>facebook</b>

In [46]:
FACEBOOK_TOKEN = "1769775703259571|736fc7f9c5dc31707d40709a1d37813b"
FACEBOOK_POSTS_COUNT = 3000

Сущьность описывающая заведомо известные данные о группе в <b>Facebook</b>:
* Имя
* Id на facebook.com
* Домен (класс) тематики группы

In [47]:
class Group(object):
    def __init__(self, name, id, domain):
        self.name = name
        self.id = id
        self.domain = domain

Список групп, которые будут использованы для работы

In [48]:
groups = [
#     Group(name="CNN Politics", id="219367258105115", domain="politics"),
#     Group(name="SinoRuss", id="1565161760380398", domain="politics"),
#     Group(name="Politics & Sociology", id="1616754815303974", domain="politics"),
#     Group(name="CNN Money", id="6651543066", domain="finances"),
#     Group(name="MTV", id="7245371700", domain="music"),
#     Group(name="CNET", id="7155422274", domain="tech"),
#     Group(name="TechCrunch", id="8062627951", domain="tech"),
#     Group(name="Sport Addicts", id="817513368382866", domain="sport"),
    Group(name="Pokemon GO", id="1745029562403910", domain="pokemon go")
]

Для работы с Facebook будем использовать <b>Facebook Graph API</b>.

In [49]:
from facebook import GraphAPI

graph = GraphAPI(access_token=FACEBOOK_TOKEN)

Так же нам понадобится модуль для совершения <b>HTTP</b> запросов

In [50]:
import requests

Функция загружающая последовательно посты пока не достигнет предела группы или ограничения по кол-ву.

In [51]:
def load_next_posts(posts, max_count):
    result = []
    count = 0

    while True:
        if count > max_count:
            break

        try:
            for post_data in posts['data']:
                keys = post_data.keys()  # Так как мы нам нужны сами сообщения
                                         # мы не включаем в результат посты без текстовых сообщений
                if 'message' in keys:
                    result.append(post_data)

            # Меняем размер запрашиваемого пака (страницы) с постами, в уже сформированном запросе 
            # от Facebook Graph API
            request = posts['paging']['next'].replace("limit=25", "limit=100")

            s_time = time()
            posts = requests.get(request).json()
            f_time = time()
            
            posts_count = len(posts['data'])
            count += posts_count
            
            print "Время загрузки пака постов ->", (f_time - s_time), "|", "кол-во:", posts_count, "|", "всего:", count
        except KeyError:
            break

    print "Общее кол-во загруженных постов группы ->", count

    return result

Используя ранее описанный список постов заполняем нашу базу данных

In [52]:
for group in groups:
    # Получаем данные о группе по ее id на facebook.com
    s_time = time()
    group_json = graph.get_object(group.id)
    f_time = time()
    
    print 'Время загрузки данных [', group_json['name'], '] группы ->', (f_time - s_time)
    
    # Получаем первый пак постов, используем уже для этого метод "get_connections"
    # который, грубо говоря, дает возможность запрашивать списки
    s_time = time()
    first_posts_pack = graph.get_connections(group_json['id'], 'feed')
    f_time = time()
    
    print 'Время загрузки первого пака постов ->', (f_time - s_time)
    
    # Последовательно загружаем все последующие посты
    posts = load_next_posts(posts=first_posts_pack, max_count=FACEBOOK_POSTS_COUNT)
    
    # Сохраняем все в базу данных
    s_time = time()
    fdb.save_posts(group.name, group.domain, posts)
    f_time = time()
    
    print "Время сохранение постов в БД ->", (f_time - s_time)
    print "-------------------------"

Время загрузки данных [ Pokémon GO Malta - Official ] группы -> 1282
Время загрузки первого пака постов -> 1152
Время загрузки пака постов -> 1701 | кол-во: 100 | всего: 100
Время загрузки пака постов -> 3077 | кол-во: 100 | всего: 200
Время загрузки пака постов -> 1261 | кол-во: 100 | всего: 300
Время загрузки пака постов -> 3546 | кол-во: 70 | всего: 370
Время загрузки пака постов -> 2197 | кол-во: 0 | всего: 370
Общее кол-во загруженных постов группы -> 370
Время сохранение постов в БД -> 629
-------------------------


Перед тем как двигатся дальше мы должны вытащить все имена, сформировав отношение "имя <-> пост", для последующего использования этих отношений при поиске. Логично, что это следует сделать перед дальнейшей обработкой и работы с постами, так как мы можем утратить часть имен в дальнейшем.

Для того, что бы отискать все имена, мы воспользуемся регулярными выражениями, в данном случае, самой простой реализацией, которая даст много мусора, но в данном случае это не страшно.

In [53]:
import re

FIRST_LAST_NAME_PATTERN = "[A-Z]{1}[a-z]+\s+[A-Z]{1}[a-z]+"

Далее будем пользоваться уже сохраненными данными (тем что сохранили ранее в базу данных).

In [55]:
posts = fdb.get_all_posts() # Вытаскиваем все посты
name_relations = []

s_time = time()

for post in posts:
    message = post['_source']['message']
    names = re.findall(FIRST_LAST_NAME_PATTERN, message)

    if names:
        id = post['_id']

        for name in names:
            name = name.replace(".", "")
            relation = FLNameData(fl_name=name, post_id=id)
            name_relations.append(relation)

f_time = time()

print "Время поиска имен по шаблону ->", (f_time - s_time)
    
len(name_relations)

Время поиска имен по шаблону -> 254


29006

Поскольку все отношения мы так же будем хранить в базе данных, то очистим ранее созданные связи

In [56]:
s_time = time()
deleted_posts = fdb.delete_all_name_relations()
f_time = time()

print "Время удаления старых отношений 'сообщение <=> имя' ->", (f_time - s_time)
print "Удалено старых постов:", deleted_posts

Время удаления старых отношений 'сообщение <=> имя' -> 391
Удалено старых постов: 0


In [57]:
s_time = time()
fdb.save_name_relations(name_relations)
f_time = time()

print "Время сохранения новых отношений 'сообщение <=> имя' ->", (f_time - s_time)

Время сохранения новых отношений 'сообщение <=> имя' -> 24143


Проверим как работает поиск. 

In [58]:
searched_names = [
    "John Kerry",
    "Paul Reichler"
]

def find_messages_by_fl_name(fl_name):
    finded_relations = fdb.get_name_relations_by_fl(fl_name=fl_name)
    messages = []
    finded_post_ids = set() # Необходимо, что бы учитывать ранее найденные посты

    for relation in finded_relations:
        post_id = relation['_source']['post_id']

        if not post_id in finded_post_ids:
            finded_post_ids.add(post_id)
            
            message = fdb.get_post_by_id(post_id)['_source']['message']
            messages.append(message)

    return messages


for name in searched_names:
    messages = find_messages_by_fl_name(name)
    
    print "Имя [", name, "] найдено соответстивий (сообщений):", len(messages)

Имя [ John Kerry ] найдено соответстивий (сообщений): 19
Имя [ Paul Reichler ] найдено соответстивий (сообщений): 9


In [None]:
from stop_words_loader import StopWordsLoader

stop_words = StopWordsLoader("stop_words").get()
len(stop_words)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction import text 

import nltk
from nltk import word_tokenize   
from nltk.stem.porter import *

stemmer = PorterStemmer()

def tokenize(text):
    tokens = nltk.word_tokenize(text)
    tokens = [i for i in tokens if i not in string.punctuation]
    return stem_tokens(tokens, stemmer)

def stem_tokens(tokens, stemmer):
    stemmed = []
    
    for item in tokens:
        stemmed.append(stemmer.stem(item))
    
    return stemmed

vectorizer = CountVectorizer()
vectorizer.stop_words = text.ENGLISH_STOP_WORDS.union(stop_words)
vectorizer

In [None]:
messages = fdb.get_messages_by_domain("politics")
len(messages)

In [None]:
matrix = vectorizer.fit_transform(messages)

len(vectorizer.vocabulary_)

In [None]:
# def find_top_tokens(matrix):
#     rows_count = matrix.getnnz()
    
#     if rows_count == 0:
#         return 
    
#     first_row = matrix[0]
#     columns_count = first_row.getnnz()
    
#     result = []
    
#     for i in range(columns_count):
#         s_time = time()
#         result.append(sum(matrix[:,i])[0,0])
#         f_time = time()
        
#         print "time ->", (f_time - s_time)
    
#     return result

# summ = find_top_tokens(matrix)
# summ

def find_top_tokens(matrix, items, amount):
    s_time = time()
    counts = [(word, matrix.getcol(col_num).sum()) for word, col_num in items]
    tokens = sorted (counts, key = lambda x: -x[1])[:min(amount, len(counts))]
    f_time = time()
    
    print "Время поиска [", amount, "] популярных токенов ->", (f_time - s_time)
    
    return tokens

top_100_tokens = find_top_tokens(matrix, vectorizer.vocabulary_.items(), 100)
top_100_tokens

In [None]:
import operator

sorted_by_count = sorted(tokens.items(), key=operator.itemgetter(1), reverse=True)
sorted_by_count[0:100]


    
#     print "{:10}".format(token), "->", get_words_count(token, vectorizer.vocabulary_, matrix)

In [41]:
from nltk.stem.porter import *

stemmer = PorterStemmer()
stemmer.stem("running")


u'run'