# Iterative Prompt Research

## Data sources and their cache

### Node and Leaf definitions

In [27]:
from typing import List
import pickle
import os

class Node:
    def __init__(self, name : str, predecessor : 'Node' = None, alternative_name : str = None):
        self.name = name
        self.predecessor = predecessor
        self.alternative_name : str = alternative_name
        self.children = []

    def get_child(self, index : int):
        if (index >= len(self.children)):
            assert IndexError
        return self.children[index]
    
    def add_children(self, children : List['Node']):
        for child in children:
            child.predecessor = self
            self.children.append(child)

    def __str__(self) -> str:
        return f"{self.name}, #children: {len(self.children)}"
    
    def __repr__(self) -> str:
        return self.__str__()

class Leaf(Node):
    def __init__(self, name : str, predecessor : 'Node' = None, alternative_name : str = None):
        super().__init__(name, predecessor, alternative_name)
    
    def __str__(self) -> str:
        return f"{self.name}"
    
class CorrectPath:
    def __init__(self, root : Node, query : str, child_sequence : List['int']):
        ## sequence must be given in reversed order, having the first move at the last position
        self.root = root
        self.query = query
        self.child_sequence = child_sequence 
    
    def is_next_move_correct(self, move_index : int) -> bool:
        if (len(self.child_sequence) == 0): return False ## if no child given, I am already at the correct node
        if (move_index == self.child_sequence[-1]): return True
        else: return False
        
    def make_correct_step(self) -> Node | None:
        if (len(self.child_sequence) == 0): return None
        self.root = self.root.get_child(self.child_sequence[-1])
        self.child_sequence = self.child_sequence[:-1]
        return self

def create_correct_sequence(root : Node, query : str, child_sequence_str : str) -> 'CorrectPath':
    child_sequence = [int(x) for x in child_sequence_str.split()]
    child_sequence.reverse()
    return CorrectPath(root, query, child_sequence)

#### WorldBank - XML

In [2]:
import xml.etree.ElementTree as ET

worldbank_namespaces =  {
        'nt': 'urn:eu.europa.ec.eurostat.navtree'
    }

def parse_worldbank_xml(path : str) -> Node:
    """Parses the xml tree from the given path and returns the root node."""
    tree = ET.parse(path)
    root = tree.getroot()
    dataset : Node = Node("WorldBank")
    dataset.add_children(get_node_children(root, dataset))
    return dataset

def get_node_children(root : ET.Element, predecessor_node : Node) -> List['Node']:
    """Parses the given root node and returns the corresponding DataSet object."""
    datasets = []
    for branch in root.findall('nt:branch', worldbank_namespaces):
        title = branch.find('nt:title/[@language="en"]', worldbank_namespaces).text
        branch_dataset = Node(title, predecessor_node)
        for child in branch.findall('nt:children', worldbank_namespaces):
            branch_dataset.children = get_node_children(child, branch_dataset)
        datasets.append(branch_dataset)

    for leaf in root.findall('nt:leaf', worldbank_namespaces):
        title = leaf.find('nt:title/[@language="en"]', worldbank_namespaces).text
        datasets.append(Leaf(title, predecessor_node))

    return datasets

In [6]:
worldbank : Node = None
if (os.path.exists("worldbank.pkl")):
    worldbank = pickle.load(open("worldbank.pkl", "rb"))
else:
    worldbank = parse_worldbank_xml("worldBank_content.xml")
    pickle.dump(worldbank, open("worldbank.pkl", "wb"))

worldbank

WorldBank, #children: 3

#### WebPage - HTML

In [10]:
from bs4 import BeautifulSoup
import requests
import re

TIMEOUT = 10
DEPTH = 5

def parse_html_webpage(path : str) -> Node:
    """Parses the html webpage into a tree and returns the root node."""
    dataset = Node("MFF home page", None, path)
    dataset.add_children(get_html_children(dataset, DEPTH))
    return dataset

def get_html_children(predecessor_node : Node, remaining_depth : int) -> List['Node']:
    """Parses the given soup and returns the corresponding DataSet object."""
    datasets = []
    if (remaining_depth <= 0): return datasets

    print(f"Requesting {predecessor_node.alternative_name}")
    page = requests.get(predecessor_node.alternative_name, timeout=TIMEOUT)
    soup = BeautifulSoup(page.content, 'html.parser')
    
    for link in soup.find_all('a'):
        if ('href' in link.attrs and link.text.strip() != '' and not url_is_blacklisted(link.attrs['href'])):
            url = url_get_absolute(link.attrs['href'], predecessor_node.alternative_name)
            title = re.sub('[\\n\\s]+',' ',link.text.strip())
            branch_dataset = Node(title, predecessor_node, url)
            branch_dataset.children = get_html_children(branch_dataset, remaining_depth - 1)
            datasets.append(branch_dataset)
    return datasets

def url_is_blacklisted(url : str, base_url : str | None = None) -> bool:
    """Checks if the url is blacklisted. Blacklist is very basic."""
    ## keep relatives
    if (url.startswith('./') or (url != "/" and url.startswith('/'))):
        return False
    
    ## filter out some basic stuff
    if (url == "/" 
        or url.startswith('#') 
        or url.startswith('mailto:') 
        or url.startswith('javascript:')
        or url.startswith('tel:')
        or (base_url is not None and not url.startswith(base_url))):
        return True

    ## passed
    return False

def url_get_absolute(input : str, current_url : str | None) -> str:
        """Tries to merge the relative input url with the current url prefix to get the absolute url. 
        If no current url is provided, the input is returned."""
        result = ""
        
        if (current_url is None): return input
        ## remove the query string from the url
        if ('?' in current_url): current_url = current_url.split('?')[0]
        ## remove any anchors from the url
        if ('#' in input): input = input.split('#')[0]

        ## try to cut as much from the current url as possible
        if (input.startswith('/')): 
            current_split = [x for x in current_url.split('/') if x != '']
            input_split = [x for x in input.split('/') if x != '']
            for i in range(len(current_split)):
                if (len(input_split) == 0): return current_url
                if (current_split[i] == input_split[0]): input_split.pop(0)
            input = '/'.join(input_split)

            if (current_url.endswith('/')): result = current_url + input
            else: result = current_url + '/' + input
        ## just append the relative to the current
        elif (input.startswith('./')): result = current_url + input.strip('./')
        ## otherwise legit URL given
        else: result = input
        ## always add the trailing slash if not present
        if (not result.endswith('/') and not "." in result.split('/')[-1]): ## only except if file is given
            result += '/'

        return result

In [11]:
webpage : Node = None
if (os.path.exists("webpage.pkl")):
    webpage = pickle.load(open("webpage.pkl", "rb"))
else:
    webpage = parse_html_webpage("https://www.mff.cuni.cz/")
    pickle.dump(webpage, open("webpage.pkl", "wb"))

webpage

Requesting https://www.mff.cuni.cz/
Requesting https://www.mff.cuni.cz/casLogin?callbackUrl=%2Fcs%2F&_locale=cs/
Requesting https://ldapuser.cuni.cz/reset/
Requesting https://ldapuser.cuni.cz/reset/login/cert?destpage=account/index/
Requesting https://ldapuser.cuni.cz/reset/login/cert/account/password/nomenu/1/
Requesting https://ldapuser.cuni.cz/reset/login/cert/account/password/nomenu/1/
Requesting https://ldapuser.cuni.cz/reset/login/cert/renew/nomenu/1/
Requesting https://ldapuser.cuni.cz/reset/login/cert/search/
Requesting https://ldapuser.cuni.cz/reset/login/cert/domain/index/
Requesting https://ldapuser.cuni.cz/reset/login/cert/domain/eduroam/nomenu/1/
Requesting https://ldapuser.cuni.cz/reset/login/cert/account/contact/nomenu/1/
Requesting https://ldapuser.cuni.cz/reset/login/cert/account/validate/nomenu/1/
Requesting https://ldapuser.cuni.cz/reset/login/cert/ext/
Requesting https://ldapuser.cuni.cz/reset/login/cert/doc/
Requesting https://is.cuni.cz/webapps/index.php?controlle

ConnectionError: HTTPConnectionPool(host='www.jinonice.cuni.cz', port=80): Max retries exceeded with url: /user/ (Caused by NameResolutionError("<urllib3.connection.HTTPConnection object at 0x0000027B88A10080>: Failed to resolve 'www.jinonice.cuni.cz' ([Errno 11001] getaddrinfo failed)"))

#### FileSystem - HTML

In [2]:
from bs4 import BeautifulSoup
from collections import deque

def parse_html_file(path : str) -> Node:
    """Parses the html file into a tree and returns the root node."""
    with open(path, 'r', encoding="utf-8") as file:
        soup = BeautifulSoup(file, 'html.parser')
        return get_html_file_children(soup)
    
def get_html_file_children(soup : BeautifulSoup) -> Node:
    stack = deque()
    initial_node : Node = None
    previous_node : Node = None
    for link in soup.find_all('a'):
        if ('href' in link.attrs and link.text.strip() != ''):
            url : str = link.attrs['href']
            dir : str = url[:url.rindex('/')]
            file : str = url[url.rindex('/')+1:]
            text : str = link.text.strip()

            if (initial_node is None):
                initial_node = Node(text, None, dir)
                previous_node = initial_node
                continue

            if (file == "" and  dir.startswith(previous_node.alternative_name)): ## when the file is empty, it is a directory
                current_node = Node(text, previous_node, dir)
                previous_node.children.append(current_node)
                stack.append(previous_node)
                previous_node = current_node
            elif (file != "" and dir.startswith(previous_node.alternative_name)): 
                previous_node.children.append(Leaf(file, previous_node, dir))
            else:
                while (len(stack) > 0 and not dir.startswith(previous_node.alternative_name)):
                    previous_node = stack.pop()
                if (file == ""): ## when the file is empty, it is a directory
                    current_node = Node(text, previous_node, dir)
                    previous_node.children.append(current_node)
                    stack.append(previous_node)
                    previous_node = current_node
                elif (file != "" and dir.startswith(previous_node.alternative_name)): 
                    previous_node.children.append(Leaf(file, previous_node, dir))
    return initial_node

In [4]:
filesystem : Node = None
if (os.path.exists("filesystem.pkl")):
    filesystem = pickle.load(open("filesystem.pkl", "rb"))
else:
    filesystem = parse_html_file("filesystem_content.html")
    pickle.dump(filesystem, open("filesystem.pkl", "wb"))

filesystem

DOCUMENTS, #children: 43

## Prompt engineering

### AI Assistant - ChatGPT

In [33]:
from openai import OpenAI
from openAI_secret import API_KEY
from consts import MODEL

class OpenAIWrapper:
    def __init__(self, keep_whole_context : bool, system_message : str):
        self.client = OpenAI(api_key=API_KEY)
        self.keep_whole_context = keep_whole_context
        self.system_message = system_message
        self.messages = [self.__create_system_message(self.system_message)]

    def __create_user_message(self, content):
        return {"role": "user", "content": content}
    
    def __create_system_message(self, content):
        return {"role": "system", "content": content}
    
    def __create_assistant_message(self, content):
        return {"role": "assistant", "content": content}
    
    def __add_message(self, content):
        if (self.keep_whole_context):
            self.messages.append(self.__create_user_message(content))
        else:
            self.messages = [
                self.__create_system_message(self.system_message),
                self.__create_user_message(content) 
            ]

    def __get_response(self):
        full_response = self.client.chat.completions.create(
            model = MODEL,
            messages=self.messages,
            temperature=0,
        )

        response_content = full_response.choices[-1].message.content
        self.messages.append(self.__create_assistant_message(response_content))
        return response_content
    
    def get_response_to_prompt(self, prompt) -> str:
        self.__add_message(prompt)
        response = self.__get_response()
        return response if response is not None else ""

ImportError: cannot import name 'MODEL' from 'consts' (c:\Users\mikol\Documents\IterativePromptResearch\research\consts.py)

### Prompt generator

In [None]:
from nltk.stem.snowball import SnowballStemmer
from consts import *

class AssistantWorker:
    def __init__(self):
        self.stemmer : SnowballStemmer = SnowballStemmer("english", ignore_stopwords=True)
        pass

    def create_llm_query(self, initial_query : str, path_done : List['Node'], current_node : Node, mode : int) -> str:
        """Creates a query for the LLM model based on the current node."""
        next_moves = []
        prompt = ""

        match mode:
            case WORKER_MODE.STEP_BY_STEP | WORKER_MODE.MATCH_AND_FILTER:
                next_moves = [f"\t{i}: {child.textual_name}\n" for i, child in enumerate(current_node.children)]
                prompt = (
                f"query: {initial_query}\n"
                f"steps done: {steps_so_far}\n"
                f"next possible subsection names:\n"
                f"{''.join(next_moves)}"
                )
            case WORKER_MODE.LOOK_AHEAD:
                for i, child in enumerate(current_node.children):
                    next_moves.append(f"{i}: {self.create_look_ahead_prompt(child, self.params['look_ahead_depth']-1, self.params['look_ahead_depth']-1)}")
                prompt = (
                f"query: {initial_query}\n"
                f"steps done: {steps_so_far}\n"
                f"next possible subsection names:\n"
                f"{''.join(next_moves)}"
                )
            case WORKER_MODE.KEYWORD_GEN_AND_MATCH:
                prompt = (
                f"query: {initial_query}\n"
                f"For the query above, please write {NUM_OF_KEYWORDS} keywords that might be relevant names of subsections to dive into in an upcoming search. Prefere single words. Use the language of the query!\n"
                f"Please separate the keywords with semicolon. Dont write anything else!\n"
                )

        return prompt

    def process_llm_child_pick_response(self, current_node : Node, raw_response : str, mode : int) -> Node | None:
        ...

    def process_llm_keyword_gen_response(self, raw_response : str) -> List['str']:
        ...

    def create_look_ahead_prompt(self, child : Node, depth_total : int, depth_remaining : int) -> str:
        ...