In [1]:
class Config:
    def __init__(self, for_git=True):
        self.gap_too_large_threshold = 1000
        self.savetime_on_fulltext = False   # If True, operations on fulltext will be kept to a minimum

config = Config()

In [2]:
import time

def time_function(func):
    def wrapper(*args, **kwargs):
        appendix = ""
        # if instances in args:
        if "instances" in kwargs:
            # append len of instances
            appendix = f"({len(kwargs['instances'])} instances"
        if "papers" in kwargs:
            if appendix:
                appendix += ", "
            appendix += f"{len(kwargs['papers'])} papers"
        if appendix:
            appendix += ")"
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} executed in {end_time - start_time} seconds" + appendix)
        return result
    return wrapper

In [3]:
def split_string(input_string, delimiters = [" ", "-", "_"]):
    for delimiter in delimiters:
        input_string = " ".join(input_string.split(delimiter))
    return input_string.split()

In [4]:
# Step 2: find occurrences of instances in full text of papers
import sys
from bisect import bisect_left
from sortedcontainers import SortedSet
import json
from typing import List, Tuple
from collections import deque

class PosInPaper:
    def __init__(self):
        self.data = {}
        self.min_distances = {}
        """
        Initialize the PosInPaper class with empty dictionaries for storing data and minimum distances.
        """

    def save_to_file(self, path=None):
        if path is None:
            path = "pos_in_paper"
        data_to_save = {paper: {literal: list(positions) for literal, positions in literals.items()}
                        for paper, literals in self.data.items()}
        with open(path + ".json", 'w', encoding="utf-8") as f:
            json.dump(data_to_save, f, ensure_ascii=False)
        data_to_save = {outer_key: {str(inner_key): value for inner_key, value in inner_dict.items()}
                        for outer_key, inner_dict in self.min_distances.items()}

        with open(path + "_min_distances.json", 'w', encoding="utf-8") as f:
            json.dump(data_to_save, f, ensure_ascii=False)

    def load_from_file(self, path=None):
        if path is None:
            path = "pos_in_paper"
        with open(path + ".json", 'r', encoding="utf-8") as f:
            self.data = json.load(f)
        with open(path + "_min_distances.json", 'r', encoding="utf-8") as f:
            min_distances_str_keys = json.load(f)
            # Convert string keys back to frozensets
            self.min_distances = {outer_key: {frozenset(eval(inner_key)): value for inner_key, value in inner_dict.items()}
                                for outer_key, inner_dict in min_distances_str_keys.items()}
        self.optimize_data()
        
    def add_occurrence(self, paper, literal, pos):
        """
        Add an occurrence of a literal in a paper.

        Parameters:
        - paper (str): The identifier for the paper.
        - literal (hashable): A literal that occurs in the paper. string or frozenset
        - pos (int): The position of the literal's occurrence in the paper.
        """
        if paper not in self.data:
            self.data[paper] = {}
        if literal not in self.data[paper]:
            self.data[paper][literal] = []
        self.data[paper][literal].append(pos)

    def optimize_data(self):
        """
        Optimize the data structure for memory efficiency.
        """
        for paper in self.data:
            for literal, positions in self.data[paper].items():
                # self.data[paper][literal] = np.array(self.data[paper][literal], dtype=int)
                self.data[paper][literal] = SortedSet(positions)

    @time_function
    def find(self, config:Config, papers, paper_full_text, instances, paper_instance_occurrence_matrix):
        """
        Find all occurrences of instances in the full text of papers.

        Parameters:
        - config (Config): Configuration object with settings.
        - papers (list): A list of paper identifiers.
        - paper_full_text (dict): A mapping from paper identifiers to their full text file paths.
        - instances (list): A list of instances to find in the papers.
        - paper_instance_occurrence_matrix (list of lists): A matrix indicating whether an instance is in a paper.
        """
        # find all occurrences of instances in text files
        for paperID, paper in enumerate(papers):
            if paperID % 100 == 0:
                # print(f"Processing paper {paperID} of {len(papers)}")
                pass
            if paper in paper_full_text:
                # Full text of paper is available
                with open(paper_full_text[paper], 'r', encoding="utf8") as f:
                    text = f.read().lower()
                    for i, instance in enumerate(instances):
                        # if this instance is not in this document, move on.
                        if config.savetime_on_fulltext:
                            if not paper_instance_occurrence_matrix[paperID][i]:
                                # assume instance is not in this paper
                                continue
                            
                        pieces = split_string(instance) 
                        for piece in pieces:
                            piece = piece.lower()
                            pos = text.find(piece)
                            while pos != -1:
                                self.add_occurrence(paper, piece, pos)
                                pos = text.find(piece, pos + 1)
                                # Idea: store the sentence in which the instance was found
                if not self.data.get(paper):
                    print(f"Paper {paper} has no instances in full text.")
            else:
                print(f"Paper {paper} has no full text available.")
   
    def set_min_distance(self, paper, literals, distance):
        """
        Set the minimum distance between occurrences of literals in a paper.

        Parameters:
        - paper (str): The identifier for the paper.
        - literals (list): A list of literals for which the distance is calculated.
        - distance (int): The calculated minimum distance.
        """
        key = frozenset(literals)
        if paper not in self.min_distances:
            self.min_distances[paper] = {}
        self.min_distances[paper][key] = distance

    def get_min_distance(self, paper, literals, allow_call = True):
        """
        Get the minimum distance between occurrences of literals in a paper, if previously calculated.

        Parameters:
        - paper (str): The identifier for the paper.
        - literals (list): A list of literals to find the minimum distance between.
        - allow_call (bool): Flag to allow recursive call to find_min_distance.

        Returns:
        - int or None: The minimum distance if found, otherwise None.
        """
        key = frozenset(literals)
        if paper in self.min_distances and key in self.min_distances[paper]:
            return self.min_distances[paper][key]
        if allow_call:
            return self.find_min_distance(paper, literals, allow_call = False)
        else:
            return None

    def find_min_distance(self, paper, literals, allow_call = True):
        if allow_call:
            min_distance = self.get_min_distance(paper, literals, allow_call = False)
            if min_distance:
                return min_distance
        literals = sorted(literals, key=len, reverse=True)
        added = []
        for literal in literals:
            if literal in added:
                continue
            added.append(literal)
            if literal not in self.data[paper]:
                return -1  # Literal not found in paper
        lit_len = [len(literal) for literal in added]
            
        inputs = [[(x, i) for x in self.data[paper][literal]] for i, literal in enumerate(added)]

        indices = [lst[0][0] for lst in inputs]
        best = float('inf')

        for item in sorted(sum(inputs, [])):
            indices[item[1]] = item[0]
            arr_min = min(indices)
            best = min(max(indices) - arr_min - lit_len[indices.index(arr_min)], best)

        return best

    # Begin testing here. find_min_distance needs to be improved. the new method needs to have the same results, but be faster.

    def find_min_distance_test(self, paper, literals):
        raise NotImplementedError("You can implement and test a new function here.")

    

In [5]:
import numpy as np

@time_function
def calculate_proximity_matrix_test(config:Config, pos_in_paper:PosInPaper, instances, mode = "sqrt", try_to_save_time = False, check_against:PosInPaper = None, test = False, stop_at=None, start_at=0):
    instance_instance_proximity_matrix = np.zeros((len(instances), len(instances)), dtype=float)
    if check_against:
        test = True
    for paperID, paper in enumerate(pos_in_paper.data):
        if paperID < start_at:
            continue
        if stop_at:
            if paperID >= stop_at:
                break
        for id1, instance1 in enumerate(instances):
            for id2, instance2 in enumerate(instances):
                if id1 < id2:
                    continue
                if instance1 != instance2:
                    literals = []
                    for instance in [instance1, instance2]:
                        pieces = split_string(instance)
                        literals += pieces
                    if test:
                        distance = pos_in_paper.find_min_distance_test(paper, literals)
                        if check_against:
                            distance_old = check_against.find_min_distance(paper, literals)
                            if distance_old != distance:
                                print(f"Error: new: {distance} != old: {distance_old}")
                                print(f"Paper: {paper}, Instances: {instance1}, {instance2}")
                                print(f"Literals: {literals}")
                                raise ValueError("Error in distance calculation")
                    else:
                        distance = pos_in_paper.get_min_distance(paper, literals)
                    if distance < 0:
                        # print(f"Error: {instance1} and {instance2} not found in {paper}")
                        continue
                    result = 0.0
                    if distance == 0:
                        result = 1
                    elif distance == 1:
                        result = 1
                    elif mode == "sqrt":
                        result = 1 / np.sqrt(distance)
                    elif mode == "linear":
                        result = 1 / distance
                    elif mode == "binary":
                        result = 1 if distance < config.gap_too_large_threshold else 0
                    elif mode == "log":
                        result = 1 / np.log(distance)
                    else:
                        print("Error: unknown mode")
                        break
                    if result > 0.0:
                        instance_instance_proximity_matrix[id1][id2] += result
                        instance_instance_proximity_matrix[id2][id1] += result

    
    return instance_instance_proximity_matrix

In [6]:
pos_in_paper = PosInPaper()
pos_in_paper.load_from_file("pos_in_paper")
# there are some bugs in old min_distances
pos_in_paper.min_distances = {}

instances = []
with open("instances.txt", 'r', encoding="utf-8") as f:
    instances = f.read().split("\n")

In [7]:
import copy
stop_at=5

known_pos_in_paper = PosInPaper()
known_pos_in_paper.data = {
    "paper1": {
        "instance1": [1, 550, 1031],
        "ins2": [50, 85, 1512]
    }
}
known_pos_in_paper.optimize_data()
check_against = copy.deepcopy(known_pos_in_paper)
known_literals = ["instance1", "ins2"]


instance_instance_proximity_matrix = calculate_proximity_matrix_test(config, known_pos_in_paper, instances, check_against=check_against, test = True, stop_at=stop_at)


calculate_proximity_matrix_test executed in 0.21204781532287598 seconds


In [8]:
import copy
start_at=0
stop_at=10

# instances = [
#     'knowledge based engineering',
#     'engine analysis'
# ]

print("Now testing new vs. old")
test_pos_in_paper = copy.deepcopy(pos_in_paper)
check_against = copy.deepcopy(pos_in_paper)
instance_instance_proximity_matrix = calculate_proximity_matrix_test(config, test_pos_in_paper, instances, check_against=check_against, test = True, stop_at=stop_at, start_at=start_at)

print("Now testing only old")
test_pos_in_paper = copy.deepcopy(pos_in_paper)
instance_instance_proximity_matrix = calculate_proximity_matrix_test(config, test_pos_in_paper, instances, stop_at=stop_at)

print("Now testing the new version")
test_pos_in_paper = copy.deepcopy(pos_in_paper)
instance_instance_proximity_matrix = calculate_proximity_matrix_test(config, test_pos_in_paper, instances, test = True, stop_at=stop_at)

# Debug barrier
raise Exception("This is the end of the script. The following code is not executed.")

Now testing new vs. old
calculate_proximity_matrix_test executed in 47.845316886901855 seconds
Now testing only old
calculate_proximity_matrix_test executed in 13.13222050666809 seconds
Now testing the new version
calculate_proximity_matrix_test executed in 33.89843773841858 seconds


In [None]:
### code graveyard
raise Exception("This is the end of the script. The following code is not executed.")



# further down = older






### All functions below have some issues. They are not used in the current implementation.


def find_min_distance_v3(self, paper, literals, allow_call = True):
    # Step 1: Merge and tag all occurrences
    merged_occurrences = []
    added = []
    for literal in literals:
        if literal in added:
            continue
        added.append(literal)
        for pos in self.data[paper].get(literal, []):
            merged_occurrences.append((pos, literal))
    # merged_occurrences.sort(key=lambda x: (x[0], len(x[1])))
    merged_occurrences.sort(key=lambda x: (x[0], -len(x[1])))
    # merged_occurrences.sort()

    # Step 2: Initialize sliding window
    window = deque()
    seen_literals = set()
    min_distance = float('inf')

    for pos, literal in merged_occurrences:
        window.append((pos, literal))
        seen_literals.add(literal)

        # Remove elements from the left of the window if they are no longer needed
        while window and len(seen_literals) == len(added):
            if literal in seen_literals:  # Check if current literal is still in the window
                current_distance = window[-1][0] - window[0][0] - len(window[0][1])
                min_distance = min(min_distance, current_distance)
                _, left_literal = window.popleft()
                if not any(lit == left_literal for _, lit in window):
                    seen_literals.remove(left_literal)

    return min_distance if min_distance != float('inf') else -1
    raise NotImplementedError("You can implement and test a new function here.")


## -------- broken functions. Reference functions for future improvements --------
def find_min_distance_v2(self, paper, literals, allow_call = True):
    #TODO: currently, this does not consider stemmed words
    if allow_call:
        min_distance = self.get_min_distance(paper, literals, allow_call = False)
        if min_distance:
            return min_distance
        
    # Step 1: Ensure all literals are present

    for literal in literals:
        if literal not in self.data[paper]:
            return -1  # Literal not found in paper

    # Step 2: Aggregate positions with tags
    literals = sorted(literals, key=len, reverse=True)
    aggregated_positions = []
    added = []
    for literal in literals:
        if literal in added:
            continue
        added.append(literal)
        for position in self.data[paper][literal]:
            if position not in aggregated_positions:
                aggregated_positions.append((position, literal))
    
    # Step 3: Sort the aggregated list by positions
    aggregated_positions.sort()

    # Step 4: Find the minimum range using a sliding window approach
    from collections import Counter
    count = Counter()
    min_range = float('inf')
    left = 0  # Window start
    for right, (position, literal) in enumerate(aggregated_positions):
        count[literal] += 1
        # Try to contract the window from the left if it still contains all literals
        while all(count[lit] > 0 for lit in literals):
            _, left_literal = aggregated_positions[left]
            current_range = aggregated_positions[right][0] - aggregated_positions[left][0] - len(left_literal)
            # current_range = aggregated_positions[right][0] - aggregated_positions[left][0]
            min_range = min(min_range, current_range)
            # Move the window leftwards
            count[left_literal] -= 1
            left += 1

    return min_range if min_range != float('inf') else -1


def find_min_distance_v1(self, paper, literals = [], allow_call = True, ignore_errors = False):
    """
    Find the minimum distance between occurrences of literals in a paper.

    Parameters:
    - paper (str): The identifier for the paper.
    - literals (list, optional): A list of literals to find the minimum distance between. Defaults to an empty list.
    - allow_call (bool): Flag to allow recursive call to get_min_distance.

    Returns:
    - int: The minimum distance between the occurrences of the literals, or -1 if not all literals are found.
    """
    if not ignore_errors:
        raise NotImplementedError("This function is not used in the current implementation. It does not reliably subtract the leftmost literal from the distance.")
    

    #TODO: currently, this does not consider stemmed words
    if allow_call:
        min_distance = self.get_min_distance(paper, literals, allow_call = False)
        if min_distance:
            return min_distance
    keys = [key for key in set(literals)]
    lit_len = [len(key) for key in keys]
    
    # Initialize pointers for each of the lists
    pointers = [0] * len(keys)
    min_distance = sys.maxsize
    for key in keys:
        if key not in self.data[paper]:
            self.set_min_distance(paper, literals, -1)
            return -1
        if not self.data[paper][key]:
            self.set_min_distance(paper, literals, -1)
            return -1
    while True:
        # Get the current elements from the lists
        # self.data[paper][keys[i]][pointers[i]] =
        # self.data (dict of papers)
        # [paper] (current paper)
        # [keys[i]] (current instance)
        # [pointers[i]] (current position in the list of positions for the instance)
        # current_elements = list of some positions of all instances from literals
        current_elements = [self.data[paper][keys[i]][pointers[i]] for i in range(len(keys))]
        
        # Calculate the current distance
        current_min = min(current_elements)
        current_max = max(current_elements)

        # FIXME: This is not 100% reliable, as current_min could be shared by multiple instances
        # completely discarding this function now.
        min_index = current_elements.index(current_min)
        # reduce the distance by the length of the first word
        current_distance = current_max - (current_min + lit_len[min_index] + 1)
        
        # Update the minimum distance
        if current_distance < min_distance:
            min_distance = current_distance
            if min_distance <= 0:
                min_distance = 0
                break
            
        # Check if we can move forward in the list containing the minimum element
        # If the pointer exceeds its list length, exit the loop
        for i in range(len(keys)):
            if pointers[i] < len(self.data[paper][keys[i]]):
                break
        if pointers[min_index] + 1 >= len(self.data[paper][keys[min_index]]):
            break
        
        # Otherwise, increment the pointer
        pointers[min_index] += 1
    
    self.set_min_distance(paper, literals, min_distance)
    return min_distance





# Step 3: find the gap between the pieces of an instance
import sys

def find_min_distance(lists, literals = []):
    """
    Old version of the function, kept for reference.
    """
    #TODO: currently, this does not consider stemmed words
    # implemented: consider the length of the words for measuring the distance.
    ## e.g. "four" will always have a distance of at least 5 (4 characters + 1 space) to any consecutive word
    ## We must subtract the length of the words from the distance later.

    # Initialize pointers for each of the lists
    pointers = [0] * len(lists)
    min_distance = sys.maxsize
    for list in lists:
        if not list:
            # There are cases where e.g. "system integration" is not found in full text
            # This happens when NLP converts e.g. "integrated" to "integration"
            # example:
            # "Liu und Hu - 2013 - A reuse oriented representation model for capturin"
            # "system integration" -> "integration" is not found in the full text
            return -1
    while True:
        # Get the current elements from the lists
        current_elements = [lists[i][pointers[i]] for i in range(len(lists))]
        
        # Calculate the current distance
        current_min = min(current_elements)
        current_max = max(current_elements)
        current_distance = current_max - current_min
        
        # reduce the distance by the length of the first word
        if literals:
            min_element_index = current_elements.index(current_min)
            current_distance -= len(literals[min_element_index]) + 1 # +1 for the space
        
        # Update the minimum distance
        if current_distance < min_distance:
            min_distance = current_distance
            if min_distance <= 0:
                min_distance = 0
                break
            
        # Check if we can move forward in the list containing the minimum element
        min_index = current_elements.index(current_min)
        
        # If the pointer exceeds its list length, exit the loop
        for i in range(len(lists)):
            if pointers[i] < len(lists[i]) - 1:
                break
        if pointers[min_index] + 1 >= len(lists[min_index]):
            break
        
        # Otherwise, increment the pointer
        pointers[min_index] += 1
    
    return min_distance


def find_min_distance_keep(lists, literals = [], keep_close_matches = False):

    #TODO: currently, this does not consider stemmed words
    # implemented: consider the length of the words for measuring the distance.
    ## e.g. "four" will always have a distance of at least 5 (4 characters + 1 space) to any consecutive word
    ## We must subtract the length of the words from the distance later.
    positions = []

    # Initialize pointers for each of the lists
    pointers = [0] * len(lists)
    min_distance = sys.maxsize
    
    pointer_map = [i for i in range(len(lists))]
    has_lists = []
    for i, item in enumerate(lists):
        if not item:
            # There are cases where e.g. "system integration" is not found in full text
            # This happens when NLP converts e.g. "integrated" to "integration"
            # example:
            # "Liu und Hu - 2013 - A reuse oriented representation model for capturin"
            # "system integration" -> "integration" is not found in the full text
            return -1
        
        if isinstance(item[0], list):
            has_lists.append(i)
            pointer_map = pointer_map[:i] + [pointer_map[i]] * len(item[0]) + pointer_map[i+1:]
            break