In [12]:
from lxml import etree
import re
from tqdm.notebook import tqdm
import os

class Corpus(object):
    def __init__(self, corpus=None, directory="Corpus/"):
        if corpus is None:
            self._corpus = self._CorpusAsListOfSentences(directory)
        else:
            self._corpus = corpus
        identifier = tuple([1, "vanilla"])
        self._ngrams = {}
        self._ngrams[identifier] = self.NGram(n=1)
        self._models = {}
        self._models[identifier] = self.Model(n=1)

    def __len__(self):
        return len(self._corpus)

    def __iter__(self):
        for word in self._corpus:
            yield word

    def __getitem__(self, index):
        return self._corpus[index]

    # Create functions to free memory once function scope is left
    @staticmethod
    def _readCorpus(root="Corpus/"):
        if not os.access(root, os.R_OK):
            print("Check root!!")

        xml_data = []

        for file in tqdm(os.listdir(root), desc='Reading Files'):
            xml_data.append(open(os.path.join(root, file), 'r', encoding='utf8').read())  # Read file
        return xml_data

    @staticmethod
    def _ParseAsXML(root="Corpus/"):
        parser = etree.XMLParser(recover=True)
        roots = []
        xml_data = Corpus._readCorpus(root)
        for xml in tqdm(xml_data, desc='Parsing XML'):
            roots.append(etree.fromstring(xml, parser=parser))
        return roots

    # Mention garbage collection effort and try outs for pd and numpy and why i settled with list of lists (pd matrix, list of lists no)
    @staticmethod
    def _CorpusAsListOfSentences(root="Corpus/"):
        roots = Corpus._ParseAsXML(root)
        sentences = []
        for root in tqdm(roots, desc='XML File'):
            for i, p in tqdm(enumerate(root), desc='Paragraph'):
                for k, s in enumerate(p):
                    unfiltered_sentence = re.split(r'\n', s.text.lstrip('\n'))
                    sentence = []
                    for unfiltered_word in unfiltered_sentence:
                        if unfiltered_word is not "":
                            filtered_word = unfiltered_word.split('\t')
                            sentence.append(filtered_word[0])
                    if sentence is not []:
                        sentence.insert(0, "<s>")
                        sentences.append(sentence)
                        sentence.append("</s>")
        return sentences

    def X_Counts(self, n):
        x_counts = {}
        for s in tqdm(self, desc='Counting x counts'):
            for i in range(len(s) + 1):
                if i < n - 1 or i == 0:
                    continue
                count = []
                for x in range(n,0,-1):
                    count.append(s[i - x])
                count = tuple(count)

                if count in x_counts:
                    x_counts[count] += 1
                else:
                    x_counts[count] = 1

        return x_counts

    def NGram(self, n=2, model="vanilla"):
        if n < 1:
            raise Exception("Unigrams and up are supported, otherwise no.")

        if model != "vanilla" and \
            model != "laplace" and \
            model != "unk":
            raise Exception("Only 'vanilla'/'laplace'/'unk' models are supported.")

        identifier = tuple([n, model])
        if identifier in self._ngrams:
            if  self._ngrams[identifier]["model"] == model:
                return self._ngrams[identifier]

        x_counts = self.X_Counts(n=n)

        if model == "unk":
            _count = self.X_Counts(n=1)
            tc = []
            for s in self:
                ts = []
                for w in s:
                    if _count[tuple([w])] < 3:
                        ts.append("UNK")
                    else:
                        ts.append(w)
                tc.append(ts)

            temp = Corpus(corpus=tc)
            x_counts = temp.X_Counts(n=n)


        gram = {}
        for x in x_counts:
            gram[x] = {"count":x_counts[x]}

        result =    {
                        "gram": gram,
                        "model": model
                    }

        self._ngrams[identifier] = result
        return self._ngrams[identifier]


    def Model(self, n=2, model="vanilla"):
        identifier = tuple([n, model])
        if identifier in self._models:
            if  self._models[identifier].model == model:
                return self._models[identifier]


        self._models[identifier] = Model(corpus=self,n=n,model=model)
        return self._models[identifier]

    def LinearInterpolation(self, trigram:tuple, model="vanilla"):
        if len(trigram) != 3:
            raise Exception("Only trigrams are supported with this function.")

        l1 = 0.1
        l2 = 0.3
        l3 = 0.6

        probabilities = [self.Model(n=1,model=model),
                         self.Model(n=2,model=model),
                         self.Model(n=3,model=model)
                         ]

        return  l3 * probabilities[2].GetProbability(trigram[2], trigram[:2]) + \
                l2 * probabilities[1].GetProbability(trigram[2], tuple(trigram[1])) + \
                l1 * probabilities[0].GetProbability(trigram[2])


class Model(object):
    def __init__(self, corpus, n=2, model="vanilla"):
        x_counts = corpus.NGram(n,model=model)['gram']
        V = 0
        if model == "laplace":
            V = len(corpus.NGram(n=1)['gram'])

        probabilities = {}

        if n is not 1:
            previous = corpus.NGram(n-1,model=model)['gram']
            for x in x_counts:
                probabilities[x] = {"probability":(x_counts[x]["count"] + int(model=="laplace")) / (previous[x[1:]]["count"] + V)}
        else:
            N = len([w for s in corpus for w in s])
            for x in x_counts:
                probabilities[x] = {"probability":(x_counts[x]["count"] + int(model=="laplace")) / (N + V)}

        self.probabilities = probabilities
        self.model = model

    def GetProbability(self, forX, givenY=tuple()):
        # Add input validation
        sequence =  givenY + tuple(forX)

        if sequence in self.probabilities:
            return self.probabilities[sequence]["probability"]
        else:
            return 0


#http://pages.di.unipi.it/pibiri/papers/NGrams18.pdf

corpus = Corpus(directory="Test Corpus/")
vanilla = corpus.Model(n=3)
laplace = corpus.Model(model="laplace")

y3 = tuple(['a', 'b'])
y2 = tuple(['b'])
print(corpus.Model(n=3, model="vanilla").GetProbability('c', y3))
print(corpus.Model(n=2, model="vanilla").GetProbability('c', y2))
print(corpus.Model(n=1, model="vanilla").GetProbability('c'))
print("**")

i = tuple(['a', 'b', 'c'])
print(corpus.LinearInterpolation(i, model="vanilla"))

Reading Files:   0%|          | 0/1 [00:00<?, ?it/s]

Parsing XML:   0%|          | 0/1 [00:00<?, ?it/s]

XML File:   0%|          | 0/1 [00:00<?, ?it/s]

Paragraph: 0it [00:00, ?it/s]

Counting x counts:   0%|          | 0/1 [00:00<?, ?it/s]

Counting x counts:   0%|          | 0/1 [00:00<?, ?it/s]

Counting x counts:   0%|          | 0/1 [00:00<?, ?it/s]

Counting x counts:   0%|          | 0/1 [00:00<?, ?it/s]

Counting x counts:   0%|          | 0/1 [00:00<?, ?it/s]

1.0
0.2
0.29411764705882354
**
1.0
0.2
0.29411764705882354
0.6894117647058823
