# Environment

## Load enviroment

In [1]:
import os
from dotenv import load_dotenv


load_dotenv(override=True)

True

## Import modules

In [2]:
import ollama
from opensearchpy import OpenSearch
from dataclasses import dataclass
from retry import retry
from typing import Union, Literal, Optional, Generator
import json
from IPython.display import Markdown, clear_output, display

## List models

In [16]:
print("Available models")
for i, model in enumerate(ollama.list()["models"], 1):
    print(f"{i}: {model['name']}")

Available models
1: gemma2:2b
2: gemma2:latest
3: all-minilm:latest
4: qwen2:7b-instruct-q6_K
5: llama3.1:latest
6: llava:13b
7: phi:latest
8: gemma:7b-instruct-q6_K
9: mxbai-embed-large:latest


## Configable parameters

In [61]:
MODEL = "gemma2:2b"
# MODEL = "gemma2:latest"
# MODEL = "phi:latest"
EMBEDDING_MODEL = "all-minilm:latest"
OLLAMA_HOST = "http://localhost:11434"
RETRY_COUNT = 5
SELECT_TOP_RESULTS = 3
INDEX_NAME = "sfc_code_preprocess"

# OLLAMA client

In [5]:
ollama_client: ollama.Client = ollama.Client(host=OLLAMA_HOST)

def get_embedding(text: str, embedding_model: str) -> list[float]:
    response = ollama_client.embeddings(
        model=EMBEDDING_MODEL,
        prompt=text,
    )
    return response["embedding"]

# Opensearch client

In [6]:
OPENSEARCH_USERNAME = os.environ["OPENSEARCH_USERNAME"]
OPENSEARCH_PASSWORD = os.environ["OPENSEARCH_PASSWORD"]
OPENSEARCH_URL = os.environ["OPENSEARCH_URL"]
os.environ["TOKENIZERS_PARALLELISM"] = "false"


def get_open_search(cluster_url: str, username: str, password: str):

    client = OpenSearch(
        hosts=[cluster_url], http_auth=(username, password), verify_certs=False
    )
    return client

open_search_client: OpenSearch = get_open_search(
    OPENSEARCH_URL, OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD
)

# Get distinct topics

In [7]:
def get_open_search(cluster_url: str, username: str, password: str):

    client = OpenSearch(
        hosts=[cluster_url], http_auth=(username, password), verify_certs=False
    )
    return client


open_search_client: OpenSearch = get_open_search(
    OPENSEARCH_URL, OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD
)


results = open_search_client.search(
    body={
        "size": 0,
        "aggs": {
            "distinct_sources": {
                "composite": {
                    "sources": [
                        {"topic_title": {"terms": {"field": "topic_title.keyword"}}},
                        {"file_url": {"terms": {"field": "file_url.keyword"}}},
                    ],
                    "size": 10000,
                }
            }
        },
    },
    index=INDEX_NAME,
)

buckets = results["aggregations"]["distinct_sources"]["buckets"]
buckets_topic_to_url = {
    bucket["key"]["topic_title"]: bucket["key"]["file_url"] for bucket in buckets
}
topic_list = list(buckets_topic_to_url.keys())
topic_choices: str = "\n".join([f"{i}. {topic}" for i, topic in enumerate(topic_list, 1)])

In [8]:
print(topic_choices)

1. Code of Conduct for Persons Providing Credit Rating Services
2. Code of Conduct for Share Registrars
3. Code on Immigration-Linked Investment Schemes
4. Code on Open-ended Fund Companies
5. Code on Pooled Retirement Funds
6. Code on Real Estate Investment Trusts
7. Corporate Finance Adviser Code of Conduct
8. Fund Manager Code of Conduct
9. SFC Code on MPF Products
10. Section I - Overarching Principles Section
11. Section II - Code on Unit Trusts and Mutual Funds
12. Section III - Code on Investment-Linked Assurance Schemes
13. Section IV - Code on Unlisted Structured Investment Products
14. The Codes on Takeovers and Mergers and Share Buy-backs


# Prompt handlers
## Topic selector

In [62]:
@dataclass
class TopicSelector:
    verbose: int = 0
    header: str = (
        "Pick an index of document that you think that it can help answer the following question or pick 0 if you think they are not helpful. Please answer only as a number and do not include prologue, prefix or suffix"
    )
    system_prompt: str = (
        "Pick a choice, please answer only a number and do not include prologue, prefix or suffix"
    )
    topic_choices: tuple = tuple(topic_list)

    def generate(self, prompt: str) -> int:
        stream = ollama.chat(
            model=MODEL,
            messages=[
                {
                    "role": "system",
                    "content": self.system_prompt,
                },
                {"role": "user", "content": prompt},
            ],
            stream=True,
        )
        try:
            response = ""
            for chunk in stream:
                response += chunk["message"]["content"]

        finally:
            stream.close()
        return response

    def construct_prompt(self, question: str) -> str:
        header = self.header
        topic_choices = "\n".join(
            [f"{i}. {topic}" for i, topic in enumerate(self.topic_choices, 1)]
        )
        prompt = (
            topic_selection_prompt
        ) = f"""{header}

# Available source:
{topic_choices}

# question:
{question}
"""
        return prompt

    @retry(tries=RETRY_COUNT, exceptions=ValueError)
    def pick_a_choice(self, question) -> int:
        prompt = self.construct_prompt(question)
        result = int(self.generate(prompt))
        assert result >= 0, "Invalid generated result, Regenerating..."
        return int(result)

In [63]:
TopicSelector().pick_a_choice("Tell me about real estate law")

0

## Query builder

In [24]:
## Query builder

@dataclass
class TextQueryBuilder:
    verbose: int = 0
    header: str = (
        "Based on the following question, what keywords should be queried in Opensearch"
    )
    system_prompt: str = (
        "We have an Opensearch instant storing docuements about code of conduct."
        " You are a data engineer who expertise Opensearch query."
        " Please suggest text query based on user's question"
        " return your answer only  and do not include prologue, prefix or suffix"
    )
    topic_choices: tuple = tuple(topic_list)

    def generate(self, prompt: str) -> int:
        stream = ollama.chat(
            model=MODEL,
            messages=[
                {
                    "role": "system",
                    "content": self.system_prompt,
                },
                {"role": "user", "content": prompt},
            ],
            stream=True,
        )
        try:
            response = ""
            for chunk in stream:
                response += chunk["message"]["content"]
        finally:
            stream.close()
        return response

    def construct_prompt(self, question: str) -> str:
        header = self.header
        prompt = (
            topic_selection_prompt
        ) = f"""{header}

# question:
{question}
"""
        return prompt

    @retry(tries=RETRY_COUNT)
    def build(self, question) -> int:
        prompt = self.construct_prompt(question)
        result = self.generate(prompt)

        return result


def get_topic(question: str, verbose: int = 0) -> str:
    topic_selected_index = topic_selector.pick_a_choice(question)
    if topic_selected_index:
        selected_topic = topic_selector.topic_choices[topic_selected_index - 1]
        if verbose:
            print(
                f'THE QUESTION: "{question}" \nSELECTED TOPIC: {topic_selected_index}. "{selected_topic}"\nFROM {buckets_topic_to_url[selected_topic]}'
            )
        return selected_topic
    else:
        if verbose:
            print("Provided sources are not seem related to the question")
        return None

## Source summarizer

In [57]:
@dataclass
class SourceSummarizer:
    system_prompt: str = (
    "You are an expert in lawfirm who are assigned to consider whether a text data source "
    "is useful to answer a user question or not. If yes, you will summarize the text "
    "which corespond user's question for another expert to write answer the user , otherwise, do nothing. "
    '''You answer must be in JSON format with field:
"is_useful": boolean determining whether the source is useful,
"summarize: string your summarization refering the part for the text or empty string if not useful
    '''
     " return your answer only and do not include prologue, prefix or suffix"
    )
    def generate(self, prompt: str) -> int:
        stream = ollama.chat(
            model=MODEL,
            messages=[
                {
                    "role": "system",
                    "content": self.system_prompt,
                },
                {"role": "user", "content": prompt},
            ],
            stream=True,
        )
        try:
            response = ""
            for chunk in stream:
                response += chunk["message"]["content"]
        finally:
            stream.close()
        return response
    
    def construct_prompt(self, question: str, text_source: str) -> str:
        prompt = (
            topic_selection_prompt
        ) = f"""# Source:
{text_source}
# question:
{question}
"""
        return prompt

    @retry(tries=RETRY_COUNT, exceptions=json.JSONDecodeError)
    def summarize(self, question: str, text_source: str):
        prompt = self.construct_prompt(question, text_source)
        result = self.generate(prompt)
        print(result)
        result = result.strip().strip("`").lstrip("json")
        print(result)
        return json.loads(result)
    

## User interactive

In [45]:
@dataclass
class UserInteractor:
    system_prompt: str = (
        "You are an humble expert in lawfirm, and your secretary already "
        "prepared gists from the related document for "
        "you to answer user's question. "
        "Your duty is to answer the question "
        "with confidence using the prepared "
        "data source as a reference."
        "Please also add the reference of data source with URL to PDF file with page number "
        "and encourage user to find out more information with it"
    )

    def generate(self, prompt: str, stream: bool = False) -> str:
        response = ollama.chat(
            model=MODEL,
            messages=[
                {
                    "role": "system",
                    "content": self.system_prompt,
                },
                {"role": "user", "content": prompt},
            ],
            stream=stream,
        )
        return response

    def stream_text(
        self,
        generator: Generator[None, int, None],
        additional_text: str = "",
    ) -> Generator[None, int, None]:
        for chunk in generator:
            yield chunk["message"]["content"]
        yield from additional_text

    def construct_prompt(
        self, question: str, topic: str, contexts: list[str], source_url: str
    ) -> str:
        system_prompt = self.system_prompt.format()
        context_prompt = "- " + "\n- ".join(contexts)

        prompt = (
            topic_selection_prompt
        ) = f"""# question:
{question}

# Prepared data source:
Document: {topic}
{context_prompt}
URL: {source_url}
"""
        return prompt

    def answer(
        self, question: str, topic: str, contexts: list[str], stream: bool = False
    ) -> str:
        source_url = buckets_topic_to_url[topic]
        prompt = self.construct_prompt(question, topic, contexts, source_url)
        response = self.generate(prompt, stream)
        references = (
            "<br><br>**Reference**\n"
            f"> From: {source_url}"
            "\n* "
            + "\n* ".join(sorted(contexts, key=lambda x: int(x.rsplit(" ", 1)[1])))
        )
        if not stream:
            return response["message"]["content"] + references

        return self.stream_text(response, references)

# Useful functions

In [46]:
def search_data_in_opensearch(
    query: str,
    search_method: Union[Literal["text"], Literal["vector"]],
    topic_title: Optional[str],
) -> dict:
    
    query_embedding = get_embedding(question, EMBEDDING_MODEL)

    if search_method == "vector":
        must = [{"knn": {"embedding": {"vector": query_embedding, "k": 5}}}]
    elif search_method == "text":
        must = [
            {
                "match": {
                    "text": {
                        "query": query,
                    },
                }
            }
        ]
    else:
        raise ValueError("Invalid search method")
    must += [
        {
            "match": {
                "topic_title": {
                    "query": topic_title,
                },
            }
        }
    ]
    query_body = {
        "query": {"bool": {"must": must}},
        "_source": False,
        "fields": ["id", "topic_title", "text", "file_url", "page_number"],
    }

    results = open_search_client.search(body=query_body, index=INDEX_NAME)
    return results


def extract_search_results(search_results_raw: dict) -> list[str]:
    return [
        {
            "text": result["fields"]["text"][0],
            "topic": result["fields"]["topic_title"][0],
            "url": result["fields"]["file_url"][0],
            "page": result["fields"]["page_number"][0],
        }
        for result in search_results_raw["hits"]["hits"][:SELECT_TOP_RESULTS]
    ]


def summarize_into_contexts(
    source_summarizer: SourceSummarizer, search_results: list
) -> list[str]:
    contexts = []
    for search_result in search_results:
        summarized = source_summarizer.summarize(question, search_result["text"])
        topic = search_result["topic"]
        page = search_result["page"]
        if summarized["is_useful"]:
            summarized_text = summarized["summarize"]
            context = f'"{summarized_text}" - Page: {page}'
            contexts.append(context)
    return contexts


def apologize(question: str, stream: bool = False) -> str:
    response = ollama.chat(
        model=MODEL,
        messages=[
            {
                "role": "user",
                "content": (
                    "Apologize the inqueriing user "
                    "because we're don't have any information or duty to "
                    "answer user's question."
                    "If possible, suggest any another helpful resource that "
                    "he may find the answer"
                ),
            },
            {"role": "user", "content": f'Let the user know that you cannot the question "{question}" due to you don\'t have information which user just enqueried.'},
        ],
        stream=stream,
    )
    if not stream:
        return response["message"]["content"]
    return response


# Testing

In [58]:
topic_selector = TopicSelector()
text_query_builder = TextQueryBuilder()
source_summarizer = SourceSummarizer()
user_interactor = UserInteractor()



def answer_the_question(question: str, stream: bool = False, debug: bool = False) -> Union[Generator[str, None, None], str]:
    """Run the responding flow to answer user's question.
    
    Args:
        question (str): User's question
        stream (bool): Choose how to emitting the answer
    Returns:
        - Generator[str, None, None] if stream = True
        - str if stream = False
    """
    topic_selected_index = topic_selector.pick_a_choice(question)
    if topic_selected_index < 1:
        # We don't have information to answer the question.
        return apologize(question, stream)
    topic_title = topic_selector.topic_choices[topic_selected_index - 1]
    if debug: print(f"{topic_title=}")
    query_text = text_query_builder.build(question)
    if debug: print(f"{query_text=}")
    # Query
    search_text_results_raw = search_data_in_opensearch(
        query_text, search_method="text", topic_title=topic_title
    )
    search_vector_results_raw = search_data_in_opensearch(
        query_text, search_method="vector", topic_title=topic_title
    )
    search_text_results = extract_search_results(search_text_results_raw)
    search_vector_results = extract_search_results(search_vector_results_raw)

    # Summerize useful resources
    if debug: print("summarizing contexts")
    context_text_results = summarize_into_contexts(source_summarizer, search_text_results)
    if debug: print(f"{context_text_results=}")
    context_vector_results = summarize_into_contexts(
        source_summarizer, search_vector_results
    )
    contexts = context_text_results + context_vector_results
    if len(contexts) == 0:
        # The retrieved resources are not useful. 
        return apologize(question, stream)
    # Generate the answer
    if debug: print("start generate the answer")
    answer = user_interactor.answer(question, topic=topic_title, contexts=contexts, stream=stream)
    return answer


def display_answer(answer: Union[Generator[str, None, None], str]):
    if isinstance(answer, Generator):
        cumulative_response = ""
        for c in answer:
            if isinstance(c, dict):
                c = c["message"]["content"]
            print(c, end="", flush=True)
            cumulative_response += c
        clear_output(wait=True)
        display(Markdown(cumulative_response))
    else:
        display(Markdown(answer))
        
        
# Display output


# Example
## Example 1

In [59]:
question: str = "I want to invest in real estates. What detail should I know?"
answer = answer_the_question(question, stream=True, debug=True)
display_answer(answer)

It's great you're interested in real estate investment!  A key detail to know is that successful real estate investments often focus on generating recurring income.

According to the Code on Real Estate Investment Trusts, at least 75% of a scheme's assets **must** be invested in properties that produce consistent rental income. This ensures a steady flow of cash for investors.  You can read more about this requirement on page 35 and 37 of the document.

It's also crucial to understand the specific types of real estate being considered by the investment scheme. The offering document, which you can find at [https://www.sfc.hk/-/media/EN/files/COM/Reports-and-surveys/REIT-Code_Aug2022_en.pdf?rev=572cff969fc344fe8c375bcaab427f3b](https://www.sfc.hk/-/media/EN/files/COM/Reports-and-surveys/REIT-Code_Aug2022_en.pdf?rev=572cff969fc344fe8c375bcaab427f3b), details the investment strategy, including the types of properties (residential, commercial, industrial) and the market conditions they operate in (page 78).

I encourage you to thoroughly review the offering document to make an informed decision.


<br><br>**Reference**
> From: https://www.sfc.hk/-/media/EN/files/COM/Reports-and-surveys/REIT-Code_Aug2022_en.pdf?rev=572cff969fc344fe8c375bcaab427f3b
* "The scheme primarily invests in income-generating real estate. At least 75% of the gross asset value must be invested in real estate generating recurrent rental income. The scheme may acquire up to 25% of its gross asset value in uncompleted units subject to certain conditions and disclosures." - Page: 35
* "At least 75% of a scheme's gross asset value shall be invested in real estate that generates recurrent rental income at all times." - Page: 37
* "The offering document of the scheme shall clearly include: (c) a discussion of the business plan for property investment and management covering the scope and type of investments made or intended to be made by the scheme, including the type(s) of real estate (e.g. residential/commercial/industrial);  (d) the general character and competitive conditions of all real estate now held or intended to be acquired by the scheme and how it" - Page: 78

## Example 2

In [60]:
question: str = "I want to design product about pooled Retirement funds, any suggestion?"
answer = answer_the_question(question, stream=True)
display_answer(answer)

Designing a product about pooled retirement funds is an exciting venture!  

Here are some suggestions based on the "Code on Pooled Retirement Funds":

* **Focus on transparency:** The code emphasizes clear and comprehensive information for investors. Your product should have a well-structured "principal brochure" (offering document) that outlines fees, investment strategies, termination conditions, and transfer/withdrawal rules clearly and accessibly.  Think about using visuals and simple language to make complex financial information understandable.
* **Tailor your offering:** Consider different investor needs and risk appetites. You could offer various PRF options with diverse investment strategies – some conservative, others more aggressive.  

Remember, the "Code on Pooled Retirement Funds" is a valuable resource for developing your product. It provides detailed guidance on regulations and best practices. 


For more in-depth information, I encourage you to explore the document yourself: [https://www.sfc.hk/-/media/EN/assets/components/codes/files-current/web/codes/code-on-pooled-retirement-funds/code-on-pooled-retirement-funds.pdf?rev=9badf81950734ee08c799832be6ff92b](https://www.sfc.hk/-/media/EN/assets/components/codes/files-current/web/codes/code-on-pooled-retirement-funds/code-on-pooled-retirement-funds.pdf?rev=9badf81950734ee08c799832be6ff92b) (Page 12, 45).  


Good luck with your product development!<br><br>**Reference**
> From: https://www.sfc.hk/-/media/EN/assets/components/codes/files-current/web/codes/code-on-pooled-retirement-funds/code-on-pooled-retirement-funds.pdf?rev=9badf81950734ee08c799832be6ff92b
* "This text defines several terms related to Pooled Retirement Funds (PRFs) such as 'principal brochure', 'Product Code', 'Product Provider' and refers to relevant legislation like Occupational Retirement Schemes Ordinance. It outlines the process of having PRFs authorized by the Commission." - Page: 12
* "  3.10 “pooled retirement fund” or “PRF” has the same meaning as “pooling agreement” in the Occupational Retirement Schemes Ordinance (Chapter 426 of Laws of Hong Kong). 

3.11 “principal brochure” or “offering document” means documents (including any other documents issued together) containing information on a PRF and investment portfolio(s) as stipulated in Appendix A. 

3.11A “Product Code” means any of the following codes administered by the Commission: 
 (a) Code on Unit Trusts and Mutual Funds 
(b) Code on Investment-Linked Assurance Schemes 
(c) Code on Pooled Retirement Funds 
(d) SFC Code on MPF Products" - Page: 12
* "The text describes various aspects of a Pooled Retirement Fund (PRF) including fees, investment strategy, termination conditions, and transfer/withdrawal rules. This information could be helpful in designing a product about pooled retirement funds." - Page: 45

## Example 3 (Unrelated question)

In [18]:
question: str = "How to cook fried chicken?"
answer = answer_the_question(question, stream=True)
display_answer(answer)

I apologize, but I'm afraid I won't be able to provide an answer to your question on how to cook fried chicken as we don't have any relevant information or duty to answer your query.

However, if you're looking for a delicious recipe, there are many other helpful resources available online that can guide you through the process. You might want to check out reputable cooking websites such as Food.com, Allrecipes, or Epicurious, which offer a wide range of recipes and tutorials on how to cook fried chicken.

Additionally, you could also consider searching for videos on YouTube or Pinterest, where many home cooks and chefs share their favorite recipes and cooking techniques. You might find some inspiration and guidance from there!

If you have any more specific questions or need further assistance in finding the information you're looking for, feel free to ask!