In [None]:
import boto3
from typing import Optional
from pyarrow.filesystem import FileSystem
from pyarrow import fs
from sycamore.connectors.file.file_scan import JsonManifestMetadataProvider
from sycamore.functions import HuggingFaceTokenizer
from sycamore.llms import OpenAI, OpenAIModels
from sycamore.materialize_config import MaterializeSourceMode
from sycamore.reader import DocSetReader
from sycamore.transforms.embed import SentenceTransformerEmbedder
from sycamore.transforms import COALESCE_WHITESPACE
from sycamore.transforms.merge_elements import GreedyTextElementMerger
from sycamore.transforms.partition import ArynPartitioner
import sycamore
from time import time
from pathlib import Path

from ray.data import ActorPoolStrategy

In [None]:
# retrieve AWS credentials to return S3 filesystem
def get_s3_fs():
    session = boto3.session.Session()
    credentials = session.get_credentials()
    from pyarrow.fs import S3FileSystem

    fs = S3FileSystem(
        secret_key=credentials.secret_key,
        access_key=credentials.access_key,
        region=session.region_name,
        session_token=credentials.token,
    )

    return fs

In [None]:
os_client_args = {
    "hosts": [{"host": "localhost", "port": 9200}],
    "http_compress": True,
    "http_auth": ('admin', 'admin'),
    "use_ssl": True,
    "verify_certs": False,
    "ssl_assert_hostname": False,
    "ssl_show_warn": False,
    "timeout": 120
}

index_settings = {
    "body": {
        "settings": {
            "index.knn": True,
            "number_of_shards": 5,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "embedding": {
                  "dimension": 768,
                  "method": {
                    "engine": "faiss",
                    "space_type": "l2",
                    "name": "hnsw",
                    "parameters": {}
                  },
                  "type": "knn_vector"
                },
            }
        }
    }
}

In [None]:
index = "financebench-etl"
manifest_path = "/Users/aanyapratapneni/Documents/Aryn/manifest.json" # Note: AWS credentials were not working for the S3 manifest path, so this is a local file

hf_model = "sentence-transformers/all-mpnet-base-v2"

openai_llm = OpenAI(OpenAIModels.GPT_4O.value)
tokenizer = HuggingFaceTokenizer(hf_model)
embedder = SentenceTransformerEmbedder(model_name=hf_model, batch_size=100)

In [None]:
start = time()

ctx = sycamore.init()
reader = DocSetReader(ctx)
ds = (
    reader.manifest(binary_format="pdf", metadata_provider=JsonManifestMetadataProvider(manifest_path), filesystem=get_s3_fs())
    .partition(partitioner=ArynPartitioner(extract_table_structure=True, threshold=0.35, use_ocr=True))
    .materialize(path="/Users/aanyapratapneni/Documents/Aryn/materialize", source_mode=MaterializeSourceMode.IF_PRESENT)
    .regex_replace(COALESCE_WHITESPACE)
    .merge(merger=GreedyTextElementMerger(tokenizer, 512))
    .spread_properties(["path", "company", "year", "doc-type"])
    .explode()
    .embed(embedder=embedder)
)

end = time()
print(f"Took {(end - start) / 60} mins")

###########################

start = time()

ds.write.opensearch(
    os_client_args=os_client_args,
    index_name=index,
    index_settings=index_settings,
)

end = time()
print(f"Took {(end - start) / 60} mins")

In [None]:
import json
import os
from pathlib import Path
from typing import Any
import datasets

from datasets import Dataset

from sycamore.connectors.file.file_writer import JSONEncodeWithUserDict
from sycamore.data import Element
from sycamore.data.document import Document
from sycamore.evaluation import EvaluationDataPoint
from sycamore.evaluation.datasets import EvaluationDataSetReader
from sycamore.evaluation.pipeline import EvaluationPipeline
from sycamore.transforms.query import OpenSearchQueryExecutor

In [None]:
# CSV uploaded to s3://aryn-datasets-us-east-1/financebench/financebench_sample_150.csv
# CSV of 10 questions is at https://www.notion.so/FinanceBench-Documents-f19756a506184caf8491d5ad54b29862?pvs=4#d0055e19ebd9443f877b62a069ddeba8
hf_dataset = datasets.load_dataset("csv", data_files='/Users/aanyapratapneni/Documents/Aryn/financebench_sample_10.csv', split="train") # local path

In [None]:
# Year extraction

import re

def extract_year(question, company):
    pattern = r'\bFY\d{2}\b|\b\d{4}\b|\bFY\d{4}\b'
    yrs = (re.findall(pattern, question))

    yrs = [yr[-2:] for yr in yrs]

    year = ('20' + max(yrs)) if len(yrs) != 0 else ''
    
    return '' if not year else doc_exists(year, company)

# check all documents to see if filters are valid
def doc_exists(year, company):
    df = hf_dataset.filter(lambda entry: entry["doc_name"].startswith([company, year].join("_")))
    
    return '' if len(df) == 0 else year



In [None]:
# convert each question in FinanceBench to an EvaluationDataPoint for future query construction
def _hf_to_qa_datapoint(datapoint: dict[str, Any]) -> dict[str, Any]:
    document = EvaluationDataPoint()

    page_numbers = [int(num.strip()) for num in datapoint["page_number"].split(",")]

    document.question = datapoint["question"]
    document.ground_truth_answer = datapoint["answer"]
    document.ground_truth_source_documents = [Element({
        "properties": {
            "_location": datapoint["doc_link"],
            "page_numbers": page_numbers
        }
    })]

    company = datapoint["doc_name"].split("_")[0]

    document.filters = {
        "properties.company": company,
    }
    year = extract_year(document.question, company)
    if year:
        document.filters["properties.year"] = year
    
    document["raw"] = datapoint
    return {"doc": document.serialize()}

INDEX = "financebench-etl"

if os.path.exists("/.dockerenv"):
    opensearch_host = "opensearch"
    print("Assuming we are in a sycamore jupyter container, using opensearch for opensearch host")
else:
    opensearch_host = "localhost"
    print("Assuming we are running outside of a container, using localhost for opensearch host")

OS_CLIENT_ARGS = {
    "hosts": [{"host": opensearch_host, "port": 9200}],
    "http_compress": True,
    "http_auth": ("admin", "admin"),
    "use_ssl": True,
    "verify_certs": False,
    "ssl_assert_hostname": False,
    "ssl_show_warn": False,
    "timeout": 120,
}

OS_CONFIG = {
    "size": 10,
    "neural_search_k": 200,
    "embedding_model_id": "7-KAu5EBeZTJZhOyaQFc",
    "search_pipeline": "hybrid_rag_pipeline",
    "llm": "gpt-4o",
    "context_window": "10",
}

In [None]:
output_path = "/Users/aanyapratapneni/Documents/Aryn/myresultdir/etl.json" # local path to save results

reader = EvaluationDataSetReader(ctx)
input_docset = reader.huggingface(hf_dataset, doc_extractor=_hf_to_qa_datapoint)

data = {
    "experiment_name": "FinanceBench gpt-4o ocr + filters",
    "description": "gpt-4o",
    "created_by": "aanyapratapneni",
    "index": INDEX,
    "os_client_args": OS_CLIENT_ARGS,
    "os_config": OS_CONFIG,
    "qa_path": ["huggingface: PatronusAI/financebench"]
}

pipeline = EvaluationPipeline(
    index=INDEX,
    os_config=OS_CONFIG,
    metrics=[],
    query_executor=OpenSearchQueryExecutor(OS_CLIENT_ARGS),
    embedder=embedder
)

start = time()
query_level_metrics = pipeline.execute(input_docset)[0]
data["query_level_data"] = query_level_metrics.take_all()
data["evaluation_time"] = f'{"{:.2f}".format(time() - start)} seconds'
with open(output_path, "w+") as outfile:
    json.dump(data, outfile, cls=JSONEncodeWithUserDict)