In [1]:
import re 

In [3]:
def binary_search(arr, x):
        """
        Realiza una búsqueda binaria en un array ordenado.
        Args:
        arr (list): El array ordenado donde se realizará la búsqueda.
        x: El elemento a buscar en el array.
        Returns:
        int: La posición del elemento en el array si se encuentra, -1 si no se encuentra.
        """
        left = 0
        right = len(arr) - 1

        while left <= right:
            mid = (left + right) // 2

            # Si el elemento está en el medio
            if arr[mid] == x:
                return mid
            # Si el elemento es mayor que el valor medio, se busca en la mitad derecha
            elif arr[mid] < x:
                left = mid + 1
            # Si el elemento es menor que el valor medio, se busca en la mitad izquierda
            else:
                right = mid - 1

        # Si el elemento no está en el array
        return -1


In [5]:
class PostingList:
    docids: list[int]
    scores: list[float]=None
    skips: list[int]=None
    cursor: int=0
    df: int=0
    search_method="naive"

    def __init__(self,docids,scores=[]):
        self.docids=docids
        self.scores=scores
        self.cursor = 0
        self.df=len(docids)
    
    # Devuelve el docid actual
    def docid(self):
        if self.cursor<0:
            return None
        
        return self.docids[self.cursor]
    
    # Avanza el puntero al siguiente docid(secuencialmente)
    def next_(self):
        if self.cursor < (len(self.docids)-1):
            self.cursor+=1
        else:
            self.cursor=-1
    
    # Devuelve el score que tenga el termino en el docid actual
    def score(self): 
        return self.scores[self.cursor]

    # Devuelve el docid mayor o igual al recibido por parametro aca, optimizamos por skiplist o binary_Search
    def ge(self,docid:int):
        if self.cursor == -1:
            return None

        if self.search_method == "naive":
            while(self.cursor != -1) and (self.docid()<docid):
                self.next_()
            return self.docids[self.cursor] if self.cursor !=-1 else None

        if self.search_method == "binary":
            raise NotImplementedError

        if self.search_method == "skip":
            raise NotImplementedError
    
    def _reset(self):
        self.cursor = 0

    #Retorna la lista de docids
    def get_docids(self):
        return self.docids
    
     #Retorna la lista de scores
    def get_scores(self):
        return self.scores
    
    def get_document_frequency(self): 
        return self.df

    #Actualiza el score de una posting por su docid
    def update_score_posting(self,docid,score):
        self.scores[docid]+=score
    
    #Agrega una nueva posting a la lista
    def add_posting(self,docid,score):
        self.docids.append(docid)
        self.scores.append(score)
        self.df+=1

    #Retorna el indice del docid en la lista de docids
    def search_docid(self,docid):
        index = binary_search(self.docids,docid)
        return index
    

In [6]:
def binary_merge(postings):
        a = postings[0]
        for posting in postings[1:]:
            temp_posting = []
            current = a.docid()
            while current:
                posting.ge(current)
                if posting.docid()==current:
                    temp_posting.append(current)
                a.next_()
                current = a.docid()
            a = PostingList(docids=temp_posting)
        return a

In [7]:
def query_and(posting_one,posting_two):
    intersection = []
    while(posting_one.cursor != -1) and (posting_two.cursor != -1):
        if posting_one.docid() == posting_two.docid():
            intersection.append(posting_one.docid())
            posting_one.next_()
            posting_two.next_()
        elif posting_one.docid() < posting_two.docid():
            posting_one.next_()
        else:
            posting_two.next_()
    return PostingList(docids=intersection)
    
def query_or(posting_one,posting_two):
    union = []
    while(posting_one.cursor != -1) and (posting_two.cursor != -1):
        if posting_one.docid() < posting_two.docid():
            union.append(posting_one.docid())
            posting_one.next_()
        elif posting_one.docid() > posting_two.docid():
            union.append(posting_two.docid())
            posting_two.next_()
        else:
            union.append(posting_one.docid())
            posting_one.next_()
            posting_two.next_()
    
    while posting_one.cursor != -1 :
        union.append(posting_one.docid())
        posting_one.next_()
    
    while posting_two.cursor != -1:
        union.append(posting_two.docid())
        posting_two.next_()
    
    return PostingList(docids=union)
    
def query_not(posting_one,posting_two):
    negative=[]
    
    while posting_one.cursor != -1:
        current =  posting_one.docid()
        if current != posting_two.ge(current):
            negative.append(current)

        posting_one.next_()
 
    
    return PostingList(docids=negative)


In [24]:
import heapq
TOP_K=5
def DocumentAtATime(postings):
    top_k = [(0,0)]*TOP_K
    heapq.heapify(top_k)
    result={}
    uniques = binary_merge(postings).get_docids()
    print("reseteo")
    for p in postings:
        p._reset()
    print("empezo")
    print(uniques)
    for d in uniques:
        print("procesing:",d)
        for p in postings:
            current = p.ge(d)
            if current == d:
                result[d] = result.get(p.docid(),0) + p.score()
                p.next_()
        print("score:",result[d], "min-heap: ",top_k[0][0])
        if result[d] > top_k[0][0]:
            heapq.heappushpop(top_k,(result[d],d))
    
    return heapq.nlargest(TOP_K,top_k)

In [27]:
p1 = PostingList(docids=[1, 2, 3, 4, 6,7,8,9,10,11], scores=[23,2,41,2,10,12,3,45,65,2])
p2 = PostingList(docids=[1,4,6,8,9,10,50,], scores=[2,3,23,45,677,8,23])
p3 = PostingList(docids=[1,4,6,8,9,10], scores=[2,3,4,234,1,45])
# print(binary_merge([p1,p2,p3]).docids)
print(DocumentAtATime([p1,p2,p3]))



reseteo
empezo
[1, 4, 6, 8, 9, 10]
procesing: 1
score: 27 min-heap:  0
procesing: 4
score: 8 min-heap:  0
procesing: 6
score: 37 min-heap:  0
procesing: 8
score: 282 min-heap:  0
procesing: 9
score: 723 min-heap:  0
procesing: 10
score: 118 min-heap:  8
[(723, 9), (282, 8), (118, 10), (37, 6), (27, 1)]


In [35]:
print(query_not(p1,p2).docids)

[2, 3, 4, 6]


In [None]:
def query_processing():
    def inter(l1: list[int],l2:list[int])->list[int]:
        return list(set(l1).union)

In [1]:
def analizar_query(query):
    stack = []
    pares_terminos = []

    for token in query.split():
        if token == '(':
            stack.append(token)
        elif token == ')':
            while stack[-1] != '(':
                operador = stack.pop()
                termino2 = pares_terminos.pop()
                termino1 = pares_terminos.pop()
                pares_terminos.append((termino1, termino2, operador))
            stack.pop()  # Eliminar el '(' correspondiente
        elif token in ['AND', 'OR', 'NOT']:
            stack.append(token)
        else:
            pares_terminos.append(token)

        while len(stack) >= 2 and stack[-1] not in ['(', ')']:
            operador = stack.pop()
            termino2 = pares_terminos.pop()
            termino1 = pares_terminos.pop()
            pares_terminos.append((termino1, termino2, operador))

    return pares_terminos

# Ejemplo de uso:
query = "(perro AND casa) OR (gato AND (pato AND (perro NOT casa)))"
pares_terminos = analizar_query(query)
print("Pares de términos y operadores:")
print(pares_terminos)

Pares de términos y operadores:
[(((('(perro', 'casa)', 'OR'), '(gato', 'AND'), '(pato', 'AND'), '(perro', 'NOT'), 'casa)))']


In [10]:
user_input='(perro AND gato)OR camion'
parenthesis = re.findall(r'\((.*?)\)', user_input)[0]
# parenthesis_resultset = set(self.query(parenthesis))
# rest = user_input.replace("("+parenthesis+")", "")
parenthesis
rest = user_input.replace("("+parenthesis+")", "")
rest

'OR camion'

In [11]:
if "AND" in rest:
    print("Hay un AND")

if "NOT" in rest:
    print("Hay un NOT")
if "OR" in rest:
    print("Hay un OR")

Hay un OR
