# Example testing notebook

This notebook provides an example of how you can test LLM-assisted document conversion vs. the alternative. Before attempting to run, be sure to set up your Python environment using the code in `initial-setup.ipynb` and configure the `.ini` file as discussed below.

The notebook begins by loading credentials and configuration from an `.ini` file stored in `~/.hbai/ai-workflows.ini`. The `~` in the path refers to the current user's home directory, and the `.ini` file contents should follow this format (with keys, models, and paths as appropriate):

    [openai]
    openai-api-key=keyhere-with-sk-on-front
    openai-model=gpt-4o
    azure-api-key=keyhere-or-blank
    azure-api-base=azure-base-url-here
    azure-api-engine=gpt-4o
    azure-api-version=2024-02-01

    [anthropic]
    anthropic-api-key=keyhere
    anthropic-model=

    [aws]
    aws-profile=
    bedrock-model=
    bedrock-region=us-east-1

    [langsmith]
    langsmith-api-key=leave-blank-unless-you're-using-langsmith

    [files]
    input-dir=~/Files/ai-workflows/inputs
    output-dir=~/Files/ai-workflows/outputs

You can set up either OpenAI, Azure, Anthropic, or Bedrock as the LLM, leaving settings for the other LLMs blank. You also don't need to supply a Langsmith API key unless you're using Langsmith. The `input-dir` and `output-dir` settings are used to specify the directories where input and output files are stored, respectively.

## Installing dependencies

This next code block installs the dependencies required for this notebook.

In [None]:
# install requirements specific to this notebook (only need to run once in a given environment)
%pip install pandas rapidfuzz

## Initializing

This next code block initializes the notebook, reading parameters from the configuration file and initializing an LLM interface.

In [None]:
# for convenience, auto-reload modules when they've changed
%load_ext autoreload
%autoreload 2

import logging
import configparser
import os
from ai_workflows.llm_utilities import LLMInterface 

# set log level to WARNING
logging.basicConfig(level=logging.INFO)

# load credentials and other configuration from a local ini file
inifile_location = os.path.expanduser("~/.hbai/ai-workflows.ini")
inifile = configparser.RawConfigParser()
inifile.read(inifile_location)

# load configuration
openai_api_key = inifile.get("openai", "openai-api-key")
openai_model = inifile.get("openai", "openai-model")
azure_api_key = inifile.get("openai", "azure-api-key")
azure_api_base = inifile.get("openai", "azure-api-base")
azure_api_engine = inifile.get("openai", "azure-api-engine")
azure_api_version = inifile.get("openai", "azure-api-version")
anthropic_api_key = inifile.get("anthropic", "anthropic-api-key")
anthropic_model = inifile.get("anthropic", "anthropic-model")
aws_profile = inifile.get("aws", "aws-profile")
bedrock_model = inifile.get("aws", "bedrock-model")
bedrock_region = inifile.get("aws", "bedrock-region")
input_dir = os.path.expanduser(inifile.get("files", "input-dir"))
output_dir = os.path.expanduser(inifile.get("files", "output-dir"))
langsmith_api_key = inifile.get("langsmith", "langsmith-api-key")

# initialize LangSmith API (if key specified)
if langsmith_api_key:
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_PROJECT"] = "local"
    os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
    os.environ["LANGCHAIN_API_KEY"] = langsmith_api_key

# initialize the LLM
llm = LLMInterface(
    openai_api_key=openai_api_key,
    openai_model=openai_model,
    azure_api_key=azure_api_key,
    azure_api_base=azure_api_base,
    azure_api_engine=azure_api_engine,
    azure_api_version=azure_api_version,
    langsmith_api_key=langsmith_api_key,
    anthropic_api_key=anthropic_api_key,
    anthropic_model=anthropic_model,
    bedrock_model=bedrock_model,
    bedrock_region=bedrock_region,
    bedrock_aws_profile=aws_profile
)

# report results
print("Local configuration loaded, LLM initialized.")

## Converting input documents to Markdown

This next code block runs through all the files in the configured input directory and converts them to Markdown, saving the Markdown files in the output directory. It also converts the files without the LLM (adding a "-no-llm" suffix to the base name of each output file) so that you can see the difference betweeen LLM-assisted and regular conversion.

In [None]:
# use document_utilities to convert all files in the input directory
from ai_workflows.document_utilities import DocumentInterface

# initialize the document interface
doc = DocumentInterface(llm_interface=llm)
doc_no_llm = DocumentInterface()

# convert all files in the input directory
for filename in os.listdir(input_dir):
    if os.path.isfile(os.path.join(input_dir, filename)) and not filename.startswith('.') and not filename.endswith('.md'):
        print(f"Converting {filename} to markdown...")
        input_path = os.path.join(input_dir, filename)
        markdown = doc.convert_to_markdown(input_path)

        # write the markdown to the output directory
        output_path = os.path.join(output_dir, os.path.splitext(os.path.basename(filename))[0] + '.md')
        with open(output_path, 'w') as f:
            f.write(markdown)

        # now convert again without the LLM
        markdown_no_llm = doc_no_llm.convert_to_markdown(input_path)
        output_path = os.path.join(output_dir, os.path.splitext(os.path.basename(filename))[0] + '-no-llm.md')
        with open(output_path, 'w') as f:
            f.write(markdown_no_llm)        

        print(f"Conversion complete. Markdown saved to {output_path}")

In [None]:
# Customized version of "the Laterite method" — thanks, Laterite!

import pandas as pd
from rapidfuzz import fuzz, process
import unicodedata
from nltk.tokenize import sent_tokenize
import os
import re

def normalize_text(text):
    """
    Normalizes text by:
    - Converting to NFC form.
    - Replacing specific problematic characters.
    """
    
    # normalize Unicode characters to NFC form
    text = unicodedata.normalize('NFC', text)
    
    # replace specific problematic characters
    replacements = {
        "â€™": "'",  # Right single quotation mark
        "â€œ": '"',  # Left double quotation mark
        "â€": '"',  # Right double quotation mark
        "â€“": '-',  # En dash
        "â€”": '-',  # Em dash
        "…": '...',  # Ellipsis
    }
    for wrong, correct in replacements.items():
        text = text.replace(wrong, correct)
    
    return text

def segment_text(text, segment_type='paragraph'):
    """
    Segments text into paragraphs or sentences and ensures each segment has at least two words.
    """
    
    if segment_type == 'sentence':
        segments = sent_tokenize(text)
    elif segment_type == 'paragraph':
        paragraphs = re.split(r'\n{2,}', text)
        if len(paragraphs) == 1:
            paragraphs = text.split('\n')
        segments = [para.strip() for para in paragraphs if para.strip()]
    else:
        raise ValueError("segment_type must be 'sentence' or 'paragraph'")
    
    # filter segments to include only those with at least two words
    filtered_segments = [seg for seg in segments if len(seg.split()) >= 2]
    return filtered_segments

def find_best_match(word_seg, n_grams, similarity_threshold):
    """
    Finds the best matching n-gram above the given similarity threshold.
    """
    
    match = process.extractOne( # type: ignore
        word_seg,
        n_grams,
        scorer=fuzz.ratio,
        score_cutoff=similarity_threshold
    )
    return match

def compare_text(baseline_text: str, comparison_text: str, output_dir: str, base_filename: str):
    """
    Compares comparison_text against baseline_text using tiered similarity thresholds.
    """
    
    # start by normalizing both strings
    baseline_text = normalize_text(baseline_text)
    comparison_text = normalize_text(comparison_text)
    
    # output normalized text to files
    with open(os.path.join(output_dir, f"{base_filename}-baseline-normalized.txt"), "w", encoding="utf-8") as f:
        f.write(baseline_text)
    with open(os.path.join(output_dir, f"{base_filename}-comparison-normalized.txt"), "w", encoding="utf-8") as f:
        f.write(comparison_text)

    # segment the baseline text into paragraphs
    word_segments = segment_text(baseline_text, segment_type='paragraph')
    word_segments = [seg for seg in word_segments if len(seg.split()) >= 2]
    print(f"Number of segments in baseline text (with at least two words): {len(word_segments)}")

    # tokenize the comparison text into words for n-gram generation
    comparison_words = comparison_text.split()

    # initialize mapping table and track unmatched segments
    mapping_table = []
    unmatched_segments = [(i + 1, seg) for i, seg in enumerate(word_segments)]
    
    # define similarity thresholds for tiered matching
    similarity_thresholds = [100, 95, 90, 80, 70]
    
    # process each similarity threshold in turn
    for threshold in similarity_thresholds:
        if not unmatched_segments:
            # if we're done, break out of the loop
            break
            
        still_unmatched = []        
        for seg_num, word_seg in unmatched_segments:
            # count the number of words in the baseline segment
            n = len(word_seg.split())
            
            # calculate the range for 10% variation
            min_length = max(1, int(n * 0.9))  # ensure minimum length is at least 1
            max_length = int(n * 1.1)
            
            # generate n-grams of varying lengths within the 10% range
            n_grams = []
            for length in range(min_length, max_length + 1):
                n_grams.extend([' '.join(comparison_words[j:j+length]) for j in range(len(comparison_words) - length + 1)])
            
            # remove empty n-grams
            n_grams = [ng for ng in n_grams if ng.strip()]

            # find best match at current threshold
            match = find_best_match(word_seg, n_grams, threshold)
            
            if match:
                matched_text = match[0]
                similarity = match[1]
                
                # remove the matched text from comparison_words
                matched_words = matched_text.split()
                start_index = None
                for i in range(len(comparison_words) - len(matched_words) + 1):
                    if comparison_words[i:i+len(matched_words)] == matched_words:
                        start_index = i
                        break
                if start_index is None:
                    # if no match is found, raise exception
                    raise ValueError("Matched words not found in comparison_words")
                del comparison_words[start_index:start_index+len(matched_words)]                
                             
                # add mapping to output table           
                mapping_table.append({
                    'Segment Number': seg_num,
                    'Baseline Segment': word_seg,
                    'Matched Comparison Text': matched_text,
                    'Similarity Score (%)': similarity
                })
            else:
                still_unmatched.append((seg_num, word_seg))
        
        # keep track of unmatched segments for next iteration
        unmatched_segments = still_unmatched

    # add any remaining unmatched segments to mapping table
    for seg_num, word_seg in unmatched_segments:
        mapping_table.append({
            'Segment Number': seg_num,
            'Baseline Segment': word_seg,
            'Matched Comparison Text': '',
            'Similarity Score (%)': 0
        })

    # sort mapping table by segment number
    mapping_table.sort(key=lambda x: x['Segment Number'])

    print("\nMatching summary:\n")
    for threshold in similarity_thresholds + [0]:
        count = sum(1 for mapping in mapping_table 
                   if (threshold == 0 and mapping['Similarity Score (%)'] == 0) or
                   (0 < threshold <= mapping['Similarity Score (%)'] < (threshold + 10 if threshold < 90 else 101)))
        if count > 0:
            if threshold == 0:
                print(f"Segments with no match: {count}")
            else:
                print(f"Segments with {threshold}-{threshold+9}% similarity: {count}")

    # compute overall similarity score as weighted average across all segments, weighting by segment length
    total_size = 0
    total_weighted_similarity = 0
    for mapping in mapping_table:
        # add mapping to running totals
        total_size += len(mapping['Baseline Segment'])
        total_weighted_similarity += len(mapping['Baseline Segment']) * mapping['Similarity Score (%)']
    # calculate final weighted score
    overall_similarity = total_weighted_similarity / total_size if total_size > 0 else 0
    
    # output total similarity score
    print(f"\nOverall Similarity Score: {overall_similarity:.2f}%")
    
    # reconstruct comparison text without matched segments
    remaining_comparison_text = ' '.join(word for word in comparison_words if word)
    
    # report and add unmatched text to mapping table
    if remaining_comparison_text:
        print(f"\nFound {len(remaining_comparison_text)} characters of unmatched text: {remaining_comparison_text}")
        mapping_table.append({
            'Segment Number': f'99999',
            'Baseline Segment': '',
            'Matched Comparison Text': remaining_comparison_text,
            'Similarity Score (%)': 0
        })
    
    # export mapping table
    if mapping_table:
        df = pd.DataFrame(mapping_table)
        output_table_path = os.path.join(output_dir, f"{base_filename}-mapping-table.xlsx")
        try:
            df.to_excel(output_table_path, index=False, engine='openpyxl')
            print(f"\nMapping table has been exported to {output_table_path}")
        except Exception as e:
            print(f"\nError exporting mapping table: {e}")
    else:
        print("\nNo mapping information to export.")

## Comparing Markdown documents

The next several sections have code to compare the LLM and no-LLM versions of the Markdown outputs. They are a work-in-progress.

Many thanks to Laterite for contributing ideas and code for this evaluation process.

For each compared filename, this comparison process will output:

1. **In the output window**: Summary results, including a weighted similarity score and "extra text" in the LLM version
2. **filename-mapping-table.xlsx**: A mapping table showing the similarity of each segment in the LLM version to the no-LLM version, with extra text at the end
3. **filename-baseline-normalized.txt**: The normalized text of the no-LLM version
4. **filename-comparison-normalized.txt**: The normalized text of the LLM version

In [None]:
# loop through input files and compare the -no-llm version to the LLM version of the output files
for filename in os.listdir(input_dir):
    if os.path.isfile(os.path.join(input_dir, filename)) and not filename.startswith('.') and not filename.endswith('.md'):
        output_path1 = os.path.join(output_dir, os.path.splitext(os.path.basename(filename))[0] + '.md')
        output_path2 = os.path.join(output_dir, os.path.splitext(os.path.basename(filename))[0] + '-no-llm.md')

        print(f"\n\nComparing {output_path1} to {output_path2}...")
        print()
        with open(output_path1, 'r') as f:
            md1 = f.read()
        with open(output_path2, 'r') as f:
            md2 = f.read()

        # compare using the Laterite method
        compare_text(DocumentInterface.markdown_to_text(md2), DocumentInterface.markdown_to_text(md1), output_dir, os.path.splitext(os.path.basename(filename))[0])