In [1]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [2]:
import math
from collections import defaultdict, namedtuple
from dataclasses import dataclass, field
from enum import Enum
from operator import itemgetter

import nltk
import pandas as pd

In [3]:
def trigrams(text):
    for snt in nltk.tokenize.sent_tokenize(text):
        words = [word.lower() for word in nltk.tokenize.word_tokenize(snt) if word.isalpha()]
        for trigram in nltk.ngrams(words, 3):
            yield trigram

In [4]:
class Label(Enum):
    ADJ = "ADJ"
    ADV = "ADV"
    NOUN = "NOUN"
    PRO = "PRO"
    DET = "DET"
    CONJ = "CONJ"
    PREP = "PREP"
    VERB = "VERB"
    # VERB_PAST = "VERB_PAST"
    # VERB_PROG = "VERB_PROG"
    # NOUN_PLRL = "NOUN_PLRL"

> The 28 seed words used for English were the following - pronoun: you, we,
me; verb: come, play, put; preposition: on, out, in; determiner: this, these; noun: baby, car, train,
box, house, boy, man, book; adjective: big, silly, green; adverb: well, very, now; conjunction: and,
or, but.

In [5]:
SEEDS = [
    ("you", Label.PRO),
    ("we", Label.PRO),
    ("me", Label.PRO),
    ("come", Label.VERB),
    ("play", Label.VERB),
    ("put", Label.VERB),
    ("on", Label.PREP),
    ("out", Label.PREP),
    ("in", Label.PREP),
    ("this", Label.DET),
    ("these", Label.DET),
    ("baby", Label.NOUN),
    ("car", Label.NOUN),
    ("train", Label.NOUN),
    ("box", Label.NOUN),
    ("house", Label.NOUN),
    ("boy", Label.NOUN),
    ("man", Label.NOUN),
    ("book", Label.NOUN),
    ("big", Label.ADJ),
    ("silly", Label.ADJ),
    ("green", Label.ADJ),
    ("well", Label.ADV),
    ("very", Label.ADV),
    ("now", Label.ADV),
    ("and", Label.CONJ),
    ("or", Label.CONJ),
    ("but", Label.CONJ)
]

In [142]:
Frame = namedtuple("Frame", ["left", "right"])


@dataclass
class Model:
    frames: dict = field(default_factory=lambda: defaultdict(lambda: defaultdict(int)))  # frame -> label -> score
    lexicon: dict = field(default_factory=lambda: defaultdict(lambda: defaultdict(int))) # word  -> label -> score
    fthresh: int = field(default=15)
    wthresh: int = field(default=15)
    lframes: dict = field(default_factory=dict)
    rframes: dict = field(default_factory=dict)
    cframes: dict = field(default_factory=dict)
    
    def __post_init__(self):
        for word, lbl in SEEDS:
            self.lexicon[word][lbl] = math.inf
    
    def train(self, text):
        for left, target, right in trigrams(text):
            frame = Frame(left, right)
            wlabel, flabel = self.wlabel(target), self.flabel(frame)
            # The target word is part of the trusted lexicon.
            if wlabel:
                # Update frame labels.
                self.frames[frame][wlabel] += 1
                for lbl in self.frames[frame]:
                    if lbl != wlabel:
                        self.frames[frame][lbl] -= 1
            # The frame is a trusted context.
            if flabel:
                # Update word labels.
                self.lexicon[target][flabel] += 1
                for lbl in self.lexicon[target]:
                    if lbl != flabel:
                        self.lexicon[target][lbl] -= 0.75
            # Consider generalizing lexical frames.
            # for lbl in Label:
            #     frms = [frm for frm in self.frames if self.flabel(frm) == lbl]
            #     lfreqs = nltk.FreqDist([(self.wlabel(frm.left), frm.right) for frm in frms])
            #     del lfreqs[None]
            #     rfreqs = nltk.FreqDist([(frm.left, self.wlabel(frm.right)) for frm in frms])
            #     del rfreqs[None]
            #     cfreqs = nltk.FreqDist([(self.wlabel(frm.left), self.wlabel(frm.right)) for frm in frms])
            #     for tup in cfreqs:
            #         if None in tup:
            #             del cfreqs[tup]
            #     # Run the tolerance principle.
            #     self.lframes = {}
            #     for frm in lfreqs:
            #         n = lfreqs.N()
            #         threshold = n - (n / math.log(n))
            #         if lfreqs[frm] > threshold:
            #             self.lframes[frm] = lbl 
            #     self.rframes = {}
            #     for frm in rfreqs:
            #         n = rfreqs.N()
            #         threshold = n - (n / math.log(n))
            #         if rfreqs[frm] > threshold:
            #             self.rframes[frm] = lbl
            #     self.cframes = {}
            #     for frm in cfreqs:
            #         n = cfreqs.N()
            #         threshold = n - (n / math.log(n))
            #         if cfreqs[frm] > threshold:
            #             self.cframes[frm] = lbl
                
    # TODO: This should not be taking the "max"        
    def wlabel(self, word):
        # Retrieve the highest scoring label for the word.
        if word not in self.lexicon:
            return None
        label, score = max(self.lexicon[word].items(), key=itemgetter(1))
        if score <= self.wthresh:
            return None
        return label
    
    def flabel(self, frame):
        if frame not in self.frames:
            # llbl, rlbl = self.wlabel(frame.left), self.wlabel(frame.right)
            # lfrm, rfrm = Frame(llbl, frame.right), Frame(frame.left, rlbl)
            # cfrm = Frame(llbl, rlbl)
            # if llbl and lfrm in self.lframes:
            #     return self.lframes[lfrm]
            # if rlbl and rfrm in self.rframes:
            #     return self.rframes[rfrm]
            # if llbl and rlbl and cfrm in self.cframes:
            #     return self.cframes[cfrm]
            return None
        label, score = max(self.frames[frame].items(), key=itemgetter(1))
        if score <= self.fthresh:
            return None
        return label
    
    def words(self):
        for word in self.lexicon:
            lbl = self.wlabel(word)
            if lbl:
                yield (word, lbl)
                
    def ctxs(self):
        for frm in self.frames:
            lbl = self.flabel(frm)
            if lbl:
                yield (frm, lbl)

In [143]:
mdl = Model()
mdl.train(" ".join([" ".join(snt) for snt in nltk.corpus.brown.sents()]))

In [6]:
# set(mdl.words()) - set(SEEDS)

In [123]:
df = pd.DataFrame([{"word": wrd, "label": lbl, "score": math.inf} for wrd, lbl in SEEDS]).set_index(["word", "label"])

In [8]:
df.loc["you"].loc[Label.PRO].score

inf

In [9]:
("you", Label.PRO) in df.index

True

In [10]:
df.get(("you", Label.PRO))

In [11]:
df.query("label == @Label.CONJ")

Unnamed: 0_level_0,Unnamed: 1_level_0,score
word,label,Unnamed: 2_level_1
and,Label.CONJ,inf
or,Label.CONJ,inf
but,Label.CONJ,inf


In [56]:
pd.DataFrame(columns=["left", "right", "label", "score"]).set_index(["left", "right", "label"])

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,score
left,right,label,Unnamed: 3_level_1


In [133]:
@dataclass
class Model:
    frames: pd.DataFrame = field(
        default_factory=lambda: pd.DataFrame(
            columns=["left", "right", "label", "score"]
        ).set_index(["left", "right", "label"])
    )
    words: pd.DataFrame = field(
        default=pd.DataFrame(
            [{"word": wrd, "label": lbl, "score": math.inf} for wrd, lbl in SEEDS]
        ).set_index(["word", "label"])
    )
    fthresh: int = field(default=15)
    wthresh: int = field(default=15)
    
    @property
    def lexicon(self):
        return self.words.query("score > @self.wthresh")
    
    @property
    def contexts(self):
        return self.frames.query("score > @self.fthresh")
    
    def train(self, text):
        for left, target, right in trigrams(text):
            frame = (left, right)
            if target in self.lexicon.index.get_level_values("word"):
                self.frames.loc[(frame, self.words.loc[target, "label"]), "score"] += 1
            if frame in self.contexts.index:
                pass
            
            # wlabel, flabel = self.wlabel(target), self.flabel(frame)
            # # The target word is part of the trusted lexicon.
            # if wlabel:
            #     # Update frame labels.
            #     self.frames[frame][wlabel] += 1
            #     for lbl in self.frames[frame]:
            #         if lbl != wlabel:
            #             self.frames[frame][lbl] -= 1
            # # The frame is a trusted context.
            # if flabel:
            #     # Update word labels.
            #     self.lexicon[target][flabel] += 1
            #     for lbl in self.lexicon[target]:
            #         if lbl != flabel:
            #             self.lexicon[target][lbl] -= 0.75

In [134]:
mdl = Model()
mdl.train(" ".join([" ".join(snt) for snt in nltk.corpus.brown.sents()]))

KeyError: 'label'

In [131]:
#df.loc[(df.label == Label.PRO) & (df.index == "test"), "score"] += 1

In [53]:
df.loc["test"] = (Label.PRO, 1)

In [121]:
df

Unnamed: 0_level_0,label,score
word,Unnamed: 1_level_1,Unnamed: 2_level_1
you,Label.PRO,inf
we,Label.PRO,inf
me,Label.PRO,inf
come,Label.VERB,inf
play,Label.VERB,inf
put,Label.VERB,inf
on,Label.PREP,inf
out,Label.PREP,inf
in,Label.PREP,inf
this,Label.DET,inf


In [100]:
df.loc["test"].score += 1

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cacher_needs_updating = self._check_is_chained_assignment_possible()


In [136]:
df.loc["you"]

Unnamed: 0_level_0,score
label,Unnamed: 1_level_1
Label.PRO,inf


In [137]:
# Frame = namedtuple("Frame", ["left", "right"])


@dataclass
class Model:
    frames: list = field(default_factory=list)  # frame -> label -> score
    words: list = field(default_factory=lambda: [Word(txt, lbl, math.inf) for txt, lbl in SEEDS]) # word  -> label -> score
    fthresh: int = field(default=15)
    wthresh: int = field(default=15)
    
    def lexicon(self):
        for wrd in self.words:
            if wrd.score > self.wthresh:
                yield wrd
    
    def trusted_frames(self):
        for frm in self.frames:
            if frm.score > self.fthresh:
                yield frm
                
#     def is_in_lexicon(text):
#         for wrd in self.lexicon():
#             if wrd.text == text:
#                 return True
#         return False
    
#     def is_trusted_frame(left, right):
#         for frm in self.trusted_frames():
#             if frm.left == left and frm.right == right:
#                 return True
#         return False
    def get_from_lexicon(self, text):
        for wrd in self.lexicon():
            if wrd.text == text:
                return wrd
        return None
    
    def get_from_trusted_frames(self, left, right):
        for frm in self.trusted_frames():
            if frm.left == left and frm.right == right:
                return frm
        return None
    
    def wlabel(self, text):
        # Retrieve the highest scoring label for the word.
        if not self.get_from_lexicon(text):
            return None
        wrd = max([wrd for wrd in self.lexicon() if wrd.text == text], key=attrgetter("score"))
        if wrd.score <= self.wthresh:
            return None
        return wrd.label
    
    # Retrieve the highest scoring label for the word.
    # def get_from_lexicon(self, text):
    #     return max([wrd for wrd in self.lexicon() if wrd.text == text], key=itemgetter("score"))
    
    def get_best_frame(self, target, ctx, label):
        pass
    
    def applicable_frames(self, left, right):
        for frm in self.frames:
            if isinstance(frm.left, Label) and self.wlabel(frm.left) != self.wlabel(left):
                continue
            if isinstance(frm.right, Label) and self.wlabel(frm.right) != self.wlabel(right):
                continue
            if isinstance(frm.left, str) and frm.left != left:
                continue
            if isinstance(frm.right, str) and frm.left != right:
                continue
            yield frm
    
    def get_frame(self, left, right, label):
        for frm in self.frames:
            if frm.left == left and frm.right == right and frm.label == label:
                return frm
        self.frames.append(Frame(left, right, label))
        return self.frames[-1]
    
    def get_word(text, label):
        for wrd in self.words:
            if wrd.text == text and wrd.label == label:
                return wrd
        self.words.append(Word(text, label))
        return self.words[-1]
    
    def train(self, text):
        for left, target, right in trigrams(text):
            # The target word is part of the trusted lexicon.
            if self.get_from_lexicon(target):
                wlabel = self.wlabel(target)
                frame = self.get_frame(left, right, wlabel)
                frame.score += 1
                # Update frame labels.
                for frm in self.applicable_frames(left, right):
                    if frm != frame:
                        frm.score -= 1
            # The frame is a trusted context.
            if self.get_from_trusted_frames(left, right):
                flabel = self.get_from_trusted_frames(left, right).label
                word = self.get_word(target, flabel)
                word.score += 1
                # Update word labels.
                for wrd in self.words:
                    if wrd.text == word.text and wrd != word:
                        wrd.score -= 0.75
    
    # def train(self, text):
    #     for left, target, right in trigrams(text):
    #         frame = Frame(left, right)
    #         wlabel, flabel = self.wlabel(target), self.flabel(frame)
    #         # The target word is part of the trusted lexicon.
    #         if wlabel:
    #             # Update frame labels.
    #             self.frames[frame][wlabel] += 1
    #             for lbl in self.frames[frame]:
    #                 if lbl != wlabel:
    #                     self.frames[frame][lbl] -= 1
    #         # The frame is a trusted context.
    #         elif flabel:
    #             # Update word labels.
    #             self.lexicon[target][flabel] += 1
    #             for lbl in self.lexicon[target]:
    #                 if lbl != flabel:
    #                     self.lexicon[target][lbl] -= 0.75
    #         if wlabel:
    #             self.generalize()
    
    def generalize(self):
        # Consider generalizing lexical frames.
        for lbl in Label:
            frms = [frm for frm in self.frames if self.flabel(frm) == lbl]
            lfreqs = nltk.FreqDist([(self.wlabel(frm.left), frm.right) for frm in frms])
            del lfreqs[None]
            rfreqs = nltk.FreqDist([(frm.left, self.wlabel(frm.right)) for frm in frms])
            del rfreqs[None]
            cfreqs = nltk.FreqDist([(self.wlabel(frm.left), self.wlabel(frm.right)) for frm in frms])
            for tup in cfreqs:
                if None in tup:
                    del cfreqs[tup]
            # Run the tolerance principle.
            self.lframes = {}
            for frm in lfreqs:
                n = lfreqs.N()
                threshold = n - (n / math.log(n))
                if lfreqs[frm] > threshold:
                    self.lframes[frm] = lbl 
            self.rframes = {}
            for frm in rfreqs:
                n = rfreqs.N()
                threshold = n - (n / math.log(n))
                if rfreqs[frm] > threshold:
                    self.rframes[frm] = lbl
            self.cframes = {}
            for frm in cfreqs:
                n = cfreqs.N()
                threshold = n - (n / math.log(n))
                if cfreqs[frm] > threshold:
                    self.cframes[frm] = lbl
    
    # def get_best_frame(self, target, ctx, label):
    #     # Look for lexical frames.
    #     for frm in self.frames:
    #         lbl = self.frames[frm]
    #         if frm == ctx and lbl == label:
    #             return frm
    #     for frm in self.get_applicable_frames(target, ctx):
    #         lbl = self.frames[frm]
    #         if isinstance(frm.left, Label) and isinstance(frm.right, str) and lbl == label:
    #             return frm
    #         if isinstance(frm.right, Label) and isinstance(frm.left, str) and lbl == label:
    #             return frm
    #     for frm in self.get_applicable_frames(target, ctx):
    #         lbl = self.frames[frm]
    #         if isinstance(frm.left, Label) and isinstance(frm.right, Label) and lbl == label:
    #             return frm
    
    # def get_applicable_frames(self, target, ctx):
    #     ret = []
    #     for frm in self.frames:
    #         if isinstance(frm.left, Label) and self.wlabel(frm.left) != self.wlabel(ctx.left):
    #             continue
    #         if isinstance(frm.right, Label) and self.wlabel(frm.right) != self.wlabel(ctx.right):
    #             continue
    #         if isinstance(frm.left, str) and frm.left != ctx.left:
    #             continue
    #         if isinstance(frm.right, str) and frm.left != ctx.right:
    #             continue
    #         ret.append(frm)
    #     return ret
            
    
    # TODO: This should not be taking the "max"        
    # def wlabel(self, word):
    #     # Retrieve the highest scoring label for the word.
    #     if word not in self.lexicon:
    #         return None
    #     label, score = max(self.lexicon[word].items(), key=itemgetter(1))
    #     if score <= self.wthresh:
    #         return None
    #     return label
    
    # def flabel(self, frame):
    #     if frame not in self.frames:
    #         # llbl, rlbl = self.wlabel(frame.left), self.wlabel(frame.right)
    #         # lfrm, rfrm = Frame(llbl, frame.right), Frame(frame.left, rlbl)
    #         # cfrm = Frame(llbl, rlbl)
    #         # if llbl and lfrm in self.lframes:
    #         #     return self.lframes[lfrm]
    #         # if rlbl and rfrm in self.rframes:
    #         #     return self.rframes[rfrm]
    #         # if llbl and rlbl and cfrm in self.cframes:
    #         #     return self.cframes[cfrm]
    #         return None
    #     label, score = max(self.frames[frame].items(), key=itemgetter(1))
    #     if score <= self.fthresh:
    #         return None
    #     return label
    
#     def words(self):
#         for word in self.lexicon:
#             lbl = self.wlabel(word)
#             if lbl:
#                 yield (word, lbl)
                
#     def ctxs(self):
#         for frm in self.frames:
#             lbl = self.flabel(frm)
#             if lbl:
#                 yield (frm, lbl)