Check if in virtual environment

In [1]:
import sys

def get_base_prefix_compat():
    """Get base/real prefix, or sys.prefix if there is none."""
    return getattr(sys, "base_prefix", None) or getattr(sys, "real_prefix", None) or sys.prefix

def in_virtualenv():
    return get_base_prefix_compat() != sys.prefix

in_virtualenv()


True

Check tokenizer

In [2]:
sample_bbc_news_sentences = [
    "China confirms Interpol chief detained",
    "Turkish officials believe the Washington Post writer was killed in the Saudi consulate in Istanbul.",
    "US wedding limousine crash kills 20",
    "Bulgarian journalist killed in park",
    "Kanye West deletes social media profiles",
    "Brazilians vote in polarised election",
    "Bull kills woman at French festival",
    "Indonesia to wrap up tsunami search",
    "Tina Turner reveals wedding night ordeal",
    "Victory for Trump in Supreme Court battle",
    "Clashes at German far-right rock concert",
    "The Walking Dead actor dies aged 76",
    "Jogger in Netherlands finds lion cub",
    "Monkey takes the wheel of Indian bus"
]
#basic tokenization
from nltk.tokenize import TweetTokenizer

tokenizer = TweetTokenizer()
sample_bbc_news_sentences_tokenized = [tokenizer.tokenize(sent) 
                            for sent in sample_bbc_news_sentences]
sample_bbc_news_sentences_tokenized[0]

['China', 'confirms', 'Interpol', 'chief', 'detained']

In [3]:
sample_bbc_news_sentences_tokenized_lower = [[_t.lower() 
                                              for _t in _s] 
                for _s in sample_bbc_news_sentences_tokenized]
sample_bbc_news_sentences_tokenized_lower[0]

['china', 'confirms', 'interpol', 'chief', 'detained']

In [4]:
#get all unique tokens
unique_tokens = set(sum(sample_bbc_news_sentences_tokenized_lower, 
                        []))
unique_tokens

{'.',
 '20',
 '76',
 'actor',
 'aged',
 'at',
 'battle',
 'believe',
 'brazilians',
 'bulgarian',
 'bull',
 'bus',
 'chief',
 'china',
 'clashes',
 'concert',
 'confirms',
 'consulate',
 'court',
 'crash',
 'cub',
 'dead',
 'deletes',
 'detained',
 'dies',
 'election',
 'far-right',
 'festival',
 'finds',
 'for',
 'french',
 'german',
 'in',
 'indian',
 'indonesia',
 'interpol',
 'istanbul',
 'jogger',
 'journalist',
 'kanye',
 'killed',
 'kills',
 'limousine',
 'lion',
 'media',
 'monkey',
 'netherlands',
 'night',
 'of',
 'officials',
 'ordeal',
 'park',
 'polarised',
 'post',
 'profiles',
 'reveals',
 'rock',
 'saudi',
 'search',
 'social',
 'supreme',
 'takes',
 'the',
 'tina',
 'to',
 'trump',
 'tsunami',
 'turkish',
 'turner',
 'up',
 'us',
 'victory',
 'vote',
 'walking',
 'was',
 'washington',
 'wedding',
 'west',
 'wheel',
 'woman',
 'wrap',
 'writer'}

In [5]:
from nltk.tokenize import sent_tokenize
from collections import defaultdict, Counter
from string import punctuation
import os
def preprocess_document(content):
    """
    Returns a list of tokens for a document's content. 
    Tokens should not contain punctuation and should be lower-cased.
    """
    sentences = sent_tokenize(content)
    tokens = []
    for _sent in sentences:
        sent_tokens = tokenizer.tokenize(_sent)
        sent_tokens = [_tok.lower() for _tok in sent_tokens if _tok not in punctuation]
        tokens += sent_tokens
    
    return tokens

def prepare_dataset(documents_dir):
    """
    Returns list of documents in the documents_dir, where each document is a list of its tokens. 
    
    """
    tokenized_documents = []
    for document in os.listdir(documents_dir):
        with open(os.path.join(documents_dir, document), errors='ignore') as outf:
            tokenized_documents.append(preprocess_document(outf.read()))
    print("Found documents: ", len(tokenized_documents))
    return tokenized_documents      
    

In [6]:

print(prepare_dataset('test/'))

Found documents:  3
[['one', 'one'], ['two', 'one', 'two', 'two'], ['two', 'tree']]


In [7]:
from os import scandir # can be used for easier iteration of documents in a folder
# can check is_file() on the objects returned by scan_dir 
# contain whole document path, so no need to join with the directory

def get_token_doc_id_pairs(category_dir):
    """
    Iteratively goes through the documents in the category_dir and constructs/returns:
    1. A list of (token, doc_id) tuples
    2. A dictionary of doc_id:doc_name
    """
    token_docid = []
    doc_ids = {}

    for i, document in enumerate(scandir(category_dir)):
        if document.is_file():
            doc_ids[i] = document
            with open(document) as out_fp:
                document_tokens = preprocess_document(out_fp.read())
                token_docid += [(token, i) for token in document_tokens]
    return token_docid, doc_ids

In [8]:
token_docid, doc_ids = get_token_doc_id_pairs('test/')
print(doc_ids)
token_docid

{0: <DirEntry 'one.txt'>, 1: <DirEntry 'one_two.txt'>, 2: <DirEntry 'two_tree.txt'>}


[('one', 0),
 ('one', 0),
 ('two', 1),
 ('one', 1),
 ('two', 1),
 ('two', 1),
 ('two', 2),
 ('tree', 2)]

In [9]:
from operator import itemgetter
sorted_token_docid = sorted(token_docid, key=itemgetter(0))
sorted_token_docid[-10:]

[('one', 0),
 ('one', 0),
 ('one', 1),
 ('tree', 2),
 ('two', 1),
 ('two', 1),
 ('two', 1),
 ('two', 2)]

In [10]:
def merge_token_in_doc(sorted_token_docid):
    """
    Returns a list of (token, doc_id, term_freq) tuples from a sorted list of (token, doc_id) list, 
    where if a token appears n times in a doc_id, we merge it in a tuple (toke, doc_id, n).
    """
    merged_tokens_in_doc = []
    for token, doc_id in sorted_token_docid:
        if merged_tokens_in_doc:
            prev_tok, prev_doc_id, prev_freq = merged_tokens_in_doc[-1]
            if prev_tok == token and prev_doc_id == doc_id:     
                merged_tokens_in_doc[-1] = (token, doc_id, prev_freq+1)
            else:
                merged_tokens_in_doc.append((token, doc_id, 1))
        else:
            merged_tokens_in_doc.append((token, doc_id, 1))
    return merged_tokens_in_doc

In [11]:
merged_tokens_in_doc = merge_token_in_doc(sorted_token_docid)
merged_tokens_in_doc[-10:]

[('one', 0, 2), ('one', 1, 1), ('tree', 2, 1), ('two', 1, 3), ('two', 2, 1)]

In [12]:
from collections import defaultdict
dictionary = defaultdict(lambda: (0, 0)) # term : doc_freq, tot freq
postings = defaultdict(lambda: []) # term: doc_ids, doc_freq

for token, doc_id, doc_freq in merged_tokens_in_doc:
    dictionary[token] = (dictionary[token][0]+1, dictionary[token][0]+doc_freq)

# usually implemented as linked lists
for token, doc_id, doc_freq in merged_tokens_in_doc:
    postings[token].append((doc_id, doc_freq)) 

In [13]:
doc_ids

{0: <DirEntry 'one.txt'>,
 1: <DirEntry 'one_two.txt'>,
 2: <DirEntry 'two_tree.txt'>}

In [14]:
dictionary["one"],dictionary['two'],dictionary['tree'],dictionary['zero']

((2, 2), (2, 2), (1, 1), (0, 0))

In [15]:
postings["one"],postings['two'],postings['tree'],postings['zero']

([(0, 2), (1, 1)], [(1, 3), (2, 1)], [(2, 1)], [])

Check and query if edit is equivalent

In [16]:
def and_query(postings, word1, word2):
    """
    merging postings lists of two words
    """
    postings_word1 = postings[word1]
    postings_word2 = postings[word2]
    
    documents_results = []
    
    postings_ind1, postings_ind2 = 0, 0
    while postings_ind1 < len(postings_word1) and postings_ind2 < len(postings_word2):
        doc_id1, doc_id2 = postings_word1[postings_ind1][0], postings_word2[postings_ind2][0]
        if doc_id1 == doc_id2:
            documents_results.append(doc_id1)
            postings_ind1 += 1
            postings_ind2 += 1
        elif doc_id1 < doc_id2:
            postings_ind1 += 1
        elif doc_id1 > doc_id2:
            postings_ind2 += 1
    return documents_results

Check or query for correctness

In [17]:
def or_query(postings, word1, word2):
    """
    merging postings lists of two words
    """
    postings_word1 = postings[word1]
    postings_word2 = postings[word2]
    
    documents_results = []
    
    postings_ind1, postings_ind2 = 0, 0
    while postings_ind1 < len(postings_word1) and postings_ind2 < len(postings_word2):
        doc_id1, doc_id2 = postings_word1[postings_ind1][0], postings_word2[postings_ind2][0]
        if doc_id1 == doc_id2:
            documents_results.append(doc_id1)
            postings_ind1 += 1
            postings_ind2 += 1
        elif doc_id1 < doc_id2:
            documents_results.append(doc_id1)
            postings_ind1 += 1
        elif doc_id1 > doc_id2:
            documents_results.append(doc_id2)
            postings_ind2 += 1
    if postings_ind1 == len(postings_word1):
        for i in range(postings_ind2,len(postings_word2)):
            documents_results.append(postings_word2[i][0])
    if postings_ind2 == len(postings_word2):
        for i in range(postings_ind1,len(postings_word1)):
            documents_results.append(postings_word1[i][0])
    return documents_results

In [18]:
doc_id = and_query(postings, 'one', 'two')
print(doc_id)

[1]


In [19]:
doc_id = or_query(postings, 'one', 'two')
print(doc_id)

[0, 1, 2]


In [20]:
doc_id = and_query(postings, 'tree', 'two')
print(doc_id)

[2]


In [21]:
doc_id = or_query(postings, 'tree', 'two')
print(doc_id)

[1, 2]


In [22]:
doc_id = and_query(postings, 'one', 'tree')
print(doc_id)

[]


In [23]:
doc_id = or_query(postings, 'one', 'tree')
print(doc_id)


[0, 1, 2]


In [24]:
doc_id = or_query(postings, 'tree', 'one')
print(doc_id)

[0, 1, 2]


In [25]:
postings["one"]

[(0, 2), (1, 1)]

In [26]:
postings["tree"]

[(2, 1)]

In [27]:
#
# simpleBool.py
#
# Example of defining a boolean logic parser using
# the operatorGrammar helper method in pyparsing.
#
# In this example, parse actions associated with each
# operator expression will "compile" the expression
# into BoolXXX class instances, which can then
# later be evaluated for their boolean value.
#
# Copyright 2006, by Paul McGuire
# Updated 2013-Sep-14 - improved Python 2/3 cross-compatibility
# Updated 2021-Sep-27 - removed Py2 compat; added type annotations
#
from typing import Callable, Iterable

from pyparsing import infixNotation, opAssoc, Keyword, Word, alphas, ParserElement

ParserElement.enablePackrat()


# define classes to be built at parse time, as each matching
# expression type is parsed
class BoolOperand:
    def __init__(self, t):
        self.label = t[0]
        self.value = eval(t[0])
        print("BoolOperand")
        print(t)

    def __bool__(self) -> bool:
        return self.value

    def __str__(self) -> str:
        return self.label

    __repr__ = __str__


class BoolNot:
    def __init__(self, t):
        print("BoolNot")
        print(t)
        self.arg = t[0][1]

    def __bool__(self) -> bool:
        v = bool(self.arg)
        return not v

    def __str__(self) -> str:
        return "~" + str(self.arg)

    __repr__ = __str__


class BoolBinOp:
    repr_symbol: str = ""
    eval_fn: Callable[
        [Iterable[bool]], bool
    ] = lambda _: False

    def __init__(self, t):
        print("BoolBinOp "+self.repr_symbol)
        print(t)
        self.args = t[0][0::2]

    def __str__(self) -> str:
        sep = " %s " % self.repr_symbol
        return "(" + sep.join(map(str, self.args)) + ")"

    def __bool__(self) -> bool:
        return self.eval_fn(bool(a) for a in self.args)

    __repr__ = __str__


class BoolAnd(BoolBinOp):
    repr_symbol = "&"
    eval_fn = all


class BoolOr(BoolBinOp):
    repr_symbol = "|"
    eval_fn = any


# define keywords and simple infix notation grammar for boolean
# expressions
TRUE = Keyword("True")
FALSE = Keyword("False")
NOT = Keyword("not")
AND = Keyword("and")
OR = Keyword("or")
boolOperand = TRUE | FALSE | Word(alphas, max=1)
boolOperand.setParseAction(BoolOperand).setName("bool_operand")

# define expression, based on expression operand and
# list of operations in precedence order
boolExpr = infixNotation(
    boolOperand,
    [
        (NOT, 1, opAssoc.RIGHT, BoolNot),
        (AND, 2, opAssoc.LEFT, BoolAnd),
        (OR, 2, opAssoc.LEFT, BoolOr),
    ],
).setName("boolean_expression")


p = True
q = False
r = True
tests = [
    ("p", True),
    ("q", False),
    ("p and q", False),
    ("p and not q", True),
    ("not not p", True),
    ("not(p and q)", True),
    ("q or not p and r", False),
    ("q or not p or not r", False),
    ("q or not (p and r)", False),
    ("p or q or r", True),
    ("p or q or r and False", True),
    ("(p or q or r) and False", False),
]

print("p =", p)
print("q =", q)
print("r =", r)
print()
for test_string, expected in tests:
    res = boolExpr.parseString(test_string)[0]
    success = "PASS" if bool(res) == expected else "FAIL"
    print(test_string, "\n", res, "=", bool(res), "\n", success, "\n")

p = True
q = False
r = True

BoolOperand
['p']
p 
 p = True 
 PASS 

BoolOperand
['q']
q 
 q = False 
 PASS 

BoolOperand
['p']
BoolOperand
['q']
BoolBinOp &
[[p, 'and', q]]
p and q 
 (p & q) = False 
 PASS 

BoolOperand
['p']
BoolOperand
['q']
BoolNot
[['not', q]]
BoolBinOp &
[[p, 'and', ~q]]
p and not q 
 (p & ~q) = True 
 PASS 

BoolOperand
['p']
BoolNot
[['not', p]]
BoolNot
[['not', ~p]]
not not p 
 ~~p = True 
 PASS 

BoolOperand
['p']
BoolOperand
['q']
BoolBinOp &
[[p, 'and', q]]
BoolNot
[['not', (p & q)]]
not(p and q) 
 ~(p & q) = True 
 PASS 

BoolOperand
['q']
BoolOperand
['p']
BoolNot
[['not', p]]
BoolOperand
['r']
BoolBinOp &
[[~p, 'and', r]]
BoolBinOp |
[[q, 'or', (~p & r)]]
q or not p and r 
 (q | (~p & r)) = False 
 PASS 

BoolOperand
['q']
BoolOperand
['p']
BoolNot
[['not', p]]
BoolOperand
['r']
BoolNot
[['not', r]]
BoolBinOp |
[[q, 'or', ~p, 'or', ~r]]
q or not p or not r 
 (q | ~p | ~r) = False 
 PASS 

BoolOperand
['q']
BoolOperand
['p']
BoolOperand
['r']
BoolBinOp &


In [28]:
def and_query_list(postings_word1, postings_word2):
    """
    merging postings lists of two words
    """

    documents_results = []
    
    postings_ind1, postings_ind2 = 0, 0
    while postings_ind1 < len(postings_word1) and postings_ind2 < len(postings_word2):
        doc_id1, doc_id2 = postings_word1[postings_ind1][0], postings_word2[postings_ind2][0]
        if doc_id1 == doc_id2:
            documents_results.append((doc_id1,0))
            postings_ind1 += 1
            postings_ind2 += 1
        elif doc_id1 < doc_id2:
            postings_ind1 += 1
        elif doc_id1 > doc_id2:
            postings_ind2 += 1
    return documents_results

In [29]:
class BoolRetrievalOperand:
    def __init__(self, t):
        self.label = t[0]
        self.value = postings[t[0]]
        print("BoolRetrievalOperand")
        print(t)
        print(self.value)
    
    def gimme(self) -> list:
        return self.value

    def __list__(self) -> list:
        return self.value

    

    def __str__(self) -> str:
        return self.label

    __repr__ = __str__


In [30]:
class BoolRetrievalBinOp:
    repr_symbol: str = ""
    eval_fn: Callable[
        [Iterable[list]], list
    ] = lambda _: []

    def __init__(self, t):
        print("BoolRetrievalBinOp "+self.repr_symbol)
        print(t)
        self.args = t[0][0::2]

    def __str__(self) -> str:
        sep = " %s " % self.repr_symbol
        return "(" + sep.join(map(str, self.args)) + ")"

   # def __bool__(self) -> bool:
    #    return self.eval_fn(bool(a) for a in self.args)

    def gimme(self) -> list:
        return self.eval_fn(a.gimme() for a in self.args)

    __repr__ = __str__

def wazaa(a,bb) -> list:
    print(a,"yolo",bb)
    b = []
    for i in bb:
        b.append(i)
    prev = b[0]
    for i in range(0,len(b)-1):
        print(prev)
        prev = and_query_list(prev,b[i+1])
    return prev

class BoolRetrievalAnd(BoolRetrievalBinOp):
    repr_symbol = "&"
    eval_fn = wazaa


class BoolRetrievalOr(BoolRetrievalBinOp):
    repr_symbol = "|"
    eval_fn = wazaa


In [31]:
NOT = Keyword("not")
AND = Keyword("and")
OR = Keyword("or")
boolOperand = Word(alphas)
boolOperand.setParseAction(BoolRetrievalOperand).setName("bool_operand")

# define expression, based on expression operand and
# list of operations in precedence order
boolExpr = infixNotation(
    boolOperand,
    [
        (NOT, 1, opAssoc.RIGHT),
        (AND, 2, opAssoc.LEFT,BoolRetrievalAnd),
        (OR, 2, opAssoc.LEFT),
    ],
).setName("boolean_expression")


In [32]:
postings["one"][0]

(0, 2)

In [33]:
p = True
q = False
r = True
one = "one"
tests = [
    ("one", True),
    ("tree", True),
    ("one and tree",True),
    ("one and two",True),
    ("two and two",True),
    ("two and (two and one)",True),
    ("one and tree and two",True),
]

print("p =", p)
print("q =", q)
print("r =", r)
print()
for test_string, expected in tests:
    res = boolExpr.parseString(test_string)[0]
    success = "test"#"PASS" if bool(res) == expected else "FAIL"
    print(test_string, "\n", res, "=", str(res.gimme()), "\n", success, "\n")

p = True
q = False
r = True

BoolRetrievalOperand
['one']
[(0, 2), (1, 1)]
one 
 one = [(0, 2), (1, 1)] 
 test 

BoolRetrievalOperand
['tree']
[(2, 1)]
tree 
 tree = [(2, 1)] 
 test 

BoolRetrievalOperand
['one']
[(0, 2), (1, 1)]
BoolRetrievalOperand
['tree']
[(2, 1)]
BoolRetrievalBinOp &
[[one, 'and', tree]]
(one & tree) yolo <generator object BoolRetrievalBinOp.gimme.<locals>.<genexpr> at 0x00000253C6233AC0>
[(0, 2), (1, 1)]
one and tree 
 (one & tree) = [] 
 test 

BoolRetrievalOperand
['one']
[(0, 2), (1, 1)]
BoolRetrievalOperand
['two']
[(1, 3), (2, 1)]
BoolRetrievalBinOp &
[[one, 'and', two]]
(one & two) yolo <generator object BoolRetrievalBinOp.gimme.<locals>.<genexpr> at 0x00000253C6233AC0>
[(0, 2), (1, 1)]
one and two 
 (one & two) = [(1, 0)] 
 test 

BoolRetrievalOperand
['two']
[(1, 3), (2, 1)]
BoolRetrievalOperand
['two']
[(1, 3), (2, 1)]
BoolRetrievalBinOp &
[[two, 'and', two]]
(two & two) yolo <generator object BoolRetrievalBinOp.gimme.<locals>.<genexpr> at 0x00000253C623