## Sentiment Analyzer Testing

In [54]:
import spacy
from spacy import displacy
import math
from typing import Dict, List
from dataclasses import dataclass
import networkx as nx
from functional import seq
from statistics import mean

from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

sentiment_analyzer = SentimentIntensityAnalyzer()

nlp = spacy.load('en')

@dataclass(frozen=True, eq=True)
class Politician:
    num: int
    name: str


def get_entity_sentiments(statements: List[str],
                         subjects: List[Politician] = None) -> Dict[int, float]:
    for doc in nlp.pipe(statements):
        results = {}
        for sent in doc.sents:
            score = sentiment_analyzer.polarity_scores(sent.text)['compound']
            pos_words = _get_pos_subjects(sent, ['VERB', 'ADJ', 'NOUN'])
            entities = seq(sent.ents) \
                .filter(lambda x: x.label_ == 'PERSON') \
                .map(lambda x: x.text) \
                .list()

            if len(entities) == 0:
                continue

            graph = nx.Graph()

            for token in sent:
                for child in token.children:
                    dep = child.dep_
                    weight = 0 if dep == 'conj' else 1
                    graph.add_edge('{0}'.format(token), '{0}'.format(child), weight=weight)

            path_lengths = {entity: 0 for entity in entities}
            
            for word in pos_words:
                for entity in entities:
                    names = entity.split()
                    try:
                        shortest_path = min(
                            [nx.dijkstra_path_length(
                                graph, source=word, target=name) for name in names])
                    except nx.NetworkXNoPath:
                        shortest_path = math.inf
                    path_lengths[entity] += shortest_path

            shortest_length = min(path_lengths.values())
            
            relevant_entities = seq(path_lengths.items()) \
                .filter(lambda x: x[1] == shortest_length) \
                .map(lambda x: x[0])

            for entity in relevant_entities:
                politicians = _match_politicians(entity, subjects)
                for politician in politicians:
                    if politician.num not in results:
                        results[politician.num] = [score]
                    else:
                        results[politician.num].append(score)

        for key in results:
            results[key] = mean(results[key])

    return results


def _get_pos_subjects(doc, pos_list) -> List[str]:
    results = []
    for word in doc:
        if word.pos_ in pos_list:
            results.append(word.text)

    return results


def _match_politicians(text, politicians) -> List[Politician]:
    results = []

    for politician in politicians:
        split_name = politician.name.split()
        for name in split_name:
            if name in text:
                results.append(politician)
    return results


In [55]:
politicians = [
    Politician(1, 'Donald Trump'),
    Politician(2, 'Bernie Sanders')
]

In [59]:
# tweet = 'When Trump accuses Bernie Sanders of Murder Trump is actually admitting he’s a murderer'
tweet = 'Bernie Sanders: Donald Trump is shit'
result = get_entity_sentiments([tweet], politicians)

In [60]:
result

{2: 0.0, 1: -0.5574}

In [58]:
doc = nlp(tweet)
displacy.render(doc, style='dep')

In [435]:
def get_pos_subjects(doc, pos_list, politicians):
    verbs = { politician.num: [] for politician in politicians }
    for possible_verb in doc:
        if possible_verb.pos_ in pos_list:
            found_child = False
            children = possible_verb.children
            for child in children:
                match = match_politician(child.text, politicians)
                if match is not None and child.dep_ == 'nsubj':
                    verbs[match].append(possible_verb)
                    traverse_subject_conjs(child, possible_verb, verbs, politicians)
                    found_child = True
            if not found_child:
                traverse_up(possible_verb, possible_verb, verbs, politicians)
    print(verbs)
    
def match_politician(text, politicians):
    for politician in politicians:
        split_name = politician.name.split()
        if text in split_name:
            return politician
    return None

def traverse_up(possible_verb, current, verbs, politicians):
    head = current.head
    if current == head:
        return
    children = head.children
    for child in children:
        match = match_politician(child.text, politicians)
        if match is not None and child.dep_ == 'nsubj':
            verbs[match.num].append(possible_verb)
            traverse_subject_conjs(child, possible_verb, verbs, politicians)
    traverse_up(possible_verb, head, verbs, politicians)
    
    
def traverse_subject_conjs(subj, possible_verb, verbs, politicians):
    children = subj.children
    for child in children:
        if child.dep_ == 'conj':
            match = match_politician(child.text, politicians)
            if match is not None:
                verbs[match.num].append(possible_verb)
                traverse_subject_conjs(child, possible_verb, verbs, politicians)

In [436]:
politician_names = list(map(lambda x: x.name, politicians))
get_pos_subjects(doc, ['VERB', 'ADJ', 'NOUN'], politicians)

{1: [shit, compared], 2: []}
