In [1]:
from uuid import uuid4
from typing import List, TYPE_CHECKING
from functools import reduce, wraps
from operator import itemgetter
from pathlib import Path
from dataclasses import dataclass
import os

import boto3
from pydantic import Field
from faststream.redis.fastapi import RedisRouter
from elasticsearch import Elasticsearch
from langchain.chains import (
    StuffDocumentsChain,
    LLMChain,
    ReduceDocumentsChain,
    MapReduceDocumentsChain,
)
from langchain_text_splitters.character import CharacterTextSplitter
from langchain_community.chat_models import ChatLiteLLM
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_elasticsearch import ApproxRetrievalStrategy, ElasticsearchStore
from langchain.schema import Document
from langchain_core.runnables import RunnableLambda, Runnable, chain, RunnablePassthrough, RunnableBranch
from langchain_core.runnables.config import RunnableConfig
from langchain.schema import StrOutputParser, Document
from langchain_core.runnables.base import RunnableEach
from unstructured.partition.auto import partition
from unstructured.chunking.basic import chunk_elements

from core_api.src.publisher_handler import FilePublisher
from redbox.storage import ElasticsearchStorageHandler
from redbox.models import File
from redbox.models.settings import Settings
from redbox.models.file import Metadata, UUID, PersistableModel
from redbox.models.chat import ChatRequest, ChatResponse
from redbox.storage import ElasticsearchStorageHandler
from redbox.llm.prompts.core import _core_redbox_prompt
from redbox.storage.storage_handler import BaseStorageHandler
from redbox.models.file import Chunk, File
from redbox.model_db import SentenceTransformerDB


In [2]:
creator_user_uuid=UUID('673f53f0-15e5-4ca1-be4b-41adcf602ab8')

In [3]:
env = Settings(_env_file=".env")
es_root_index = "summarisation"

output_max_tokens=256

es = Elasticsearch(
    hosts=[
        {
            "host": "localhost",
            "port": env.elastic.port,
            "scheme": env.elastic.scheme,
        }
    ],
    basic_auth=(env.elastic.user, env.elastic.password),
)
if env.elastic.subscription_level == "basic":
    strategy = ApproxRetrievalStrategy(hybrid=False)
elif env.elastic.subscription_level in ["platinum", "enterprise"]:
    strategy = ApproxRetrievalStrategy(hybrid=True)

sentence_transformer_db = SentenceTransformerDB(env.embedding_model)

vector_store = ElasticsearchStore(
    es_connection=es,
    index_name="redbox-data-chunk",
    embedding=env.embedding_model,
    strategy=strategy,
    vector_query_field="embedding",
)

s3_client = boto3.client("s3", endpoint_url=f"http://{env.minio_host}:{env.minio_port}", aws_access_key_id=env.aws_access_key, aws_secret_access_key=env.aws_secret_key)

storage_handler = ElasticsearchStorageHandler(
    es_client=es,
    root_index=es_root_index
)

api_base="https://oai-i-dot-ai-playground-sweden.openai.azure.com/"

print(os.environ["AZURE_OPENAI_API_KEY"])

llm = ChatLiteLLM(
    model="azure/gpt-35-turbo",
    api_base=api_base,
    max_tokens=output_max_tokens
)


  warn_deprecated(
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-mpnet-base-v2
INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu


a877928388654d678aab2e5014c96c3a


In [4]:

### Ingest Pipeline ###

if TYPE_CHECKING:
    from mypy_boto3_s3.client import S3Client
else:
    S3Client = object

@dataclass
class LocalFile:
    creator_user_uuid: UUID
    filepath: Path


def upload_file(
        storage_handler: BaseStorageHandler, 
        s3: S3Client,
        env: Settings
    ):
    @chain
    def wrapped(local_file: LocalFile) -> File:
        file_uuid = str(uuid4())
        s3.put_object(Bucket=env.bucket_name, Key=str(file_uuid), Body=open(local_file.filepath, 'rb'))
        file = File(uuid=file_uuid, creator_user_uuid=local_file.creator_user_uuid, key=file_uuid, bucket=env.bucket_name)
        storage_handler.write_item(file)
        return file
    return wrapped


def file_chunker(env: Settings, s3_client: S3Client, max_chunk_size: int = 20000):
    @chain
    def wrapped(file: File) -> List[Chunk]:
        authenticated_s3_url = s3_client.generate_presigned_url(
            "get_object",
            Params={"Bucket": file.bucket, "Key": file.key},
            ExpiresIn=3600,
        )
        elements = partition(url=authenticated_s3_url, strategy=env.partition_strategy)
        raw_chunks = chunk_elements(
            elements, 
            new_after_n_chars=max_chunk_size, 
            max_characters=max_chunk_size+32
        )
        print(f"Elements chunked")
        return [
            Chunk(
                parent_file_uuid=file.uuid,
                index=i,
                text=raw_chunk.text,
                metadata=Metadata(
                    parent_doc_uuid=file.uuid,
                    page_number=raw_chunk.metadata.page_number,
                    languages=raw_chunk.metadata.languages,
                    link_texts=raw_chunk.metadata.link_texts,
                    link_urls=raw_chunk.metadata.link_urls,
                    links=raw_chunk.metadata.links,
                ),
                creator_user_uuid=file.creator_user_uuid,
            )
            for i, raw_chunk in enumerate(raw_chunks)
        ]
    return wrapped


def local_embedder(model: SentenceTransformerDB):
    @chain
    def wrapped(chunks: List[Chunk]) -> List[Chunk]:
        print(f"Starting Embedding")
        embedded_sentences = model.embed_sentences([c.text for c in chunks])
        for i, c in enumerate(chunks):
            c.embedding = embedded_sentences.data[i].embedding
        return chunks
    return wrapped

def chunk_writer(storage_handler: BaseStorageHandler):
    @chain
    def wrapped(chunks: List[Chunk]) -> UUID:
        print(f"Writing Chunks")
        storage_handler.write_items(chunks)
        return chunks[0].parent_file_uuid
    return wrapped

def summarisation_ingest_chain(n=20000):
    chain =(
        upload_file(storage_handler, s3_client, env)
        | file_chunker(env, s3_client, max_chunk_size=4000)
        | local_embedder(sentence_transformer_db)
        | chunk_writer(storage_handler)
    )
    return chain


In [5]:
### Execution Ingest ###

es.options(ignore_status=[400,404]).indices.delete(index=f"{es_root_index}-file")
es.options(ignore_status=[400,404]).indices.delete(index=f"{es_root_index}-chunk")

summarisation_ingest = summarisation_ingest_chain()

ingest_result = summarisation_ingest.invoke(
    LocalFile(
        filepath=Path("../data/TS_Rules_Deluxe.pdf"),
        creator_user_uuid=creator_user_uuid
    )
)

file_uuid = ingest_result

INFO:elastic_transport.transport:DELETE http://localhost:9200/summarisation-file [status:200 duration:0.125s]
INFO:elastic_transport.transport:DELETE http://localhost:9200/summarisation-chunk [status:200 duration:0.098s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-file/_doc/5a5ff2df-dff8-46d8-af43-12f7c99db429 [status:201 duration:0.435s]
INFO:pikepdf._core:pikepdf C++ to Python logger bridge initialized


Elements chunked
Starting Embedding


Batches:   0%|          | 0/2 [00:00<?, ?it/s]

Writing Chunks


INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/6f235222-2d4e-4f5a-a265-3f4099b7afc9 [status:201 duration:0.296s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/8d8da0c6-04bf-44c9-9646-9d44cfe1bdc1 [status:201 duration:0.002s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/a8fb7c5f-27bb-4c79-85c3-3a1c82b82d8f [status:201 duration:0.001s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/57dc5cb6-9614-431d-bd00-a4b9586bdce1 [status:201 duration:0.003s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/de130ebc-c93f-4414-9ad3-147e099261c3 [status:201 duration:0.002s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/c575023b-63a1-43e9-8e31-94f1876cae29 [status:201 duration:0.001s]
INFO:elastic_transport.transport:PUT http://localhost:9200/summarisation-chunk/_doc/c45a20ec-642e-47

In [4]:

### Summarisation Pipeline ###

def document_reader(storage_handler: BaseStorageHandler, user_uuid):
    @chain
    def wrapped(parent_file_uuid):
        chunks = storage_handler.get_file_chunks(
            parent_file_uuid=parent_file_uuid,
            user_uuid=user_uuid
        )
        return [
            Document(page_content=chunk.text, metadata={"source": "local"})
            for chunk in chunks
        ]
    return wrapped



max_prompt_size = 4096 - output_max_tokens - 32

@chain
def summarise(file_uuid):
    docs = document_reader(storage_handler, creator_user_uuid).invoke(file_uuid)
    mapreduce_loops = 0
    while mapreduce_loops < 3:
        summaries = (
            ChatPromptTemplate.from_template("Summarize this content: {context}")
            | llm
        ).batch(
            docs, 
            config=RunnableConfig(
                max_concurrency=64
            )
        )
        prompt_splitter = CharacterTextSplitter.from_tiktoken_encoder(
            encoding_name="cl100k_base", chunk_size=max_prompt_size, chunk_overlap=0
        )

        def combine_summaries(summaries):
            return " ; ".join([s.content for s in summaries[:24]])

        combined = combine_summaries(summaries)
        summarise_prompt = ChatPromptTemplate.from_template("Combine these summaries: {docs}").invoke(combined)
        if (len(prompt_splitter.split_text(str(summarise_prompt))) == 1):
            # Stop summarising the summaries we can go to final summary
            break
        else:
            # We can't do a summary of all docs due to length so combine them into two docs and summarise again
            number_summaries = int(len(summaries)/2)
            docs = [
                summarise_prompt.invoke(combine_summaries(summaries[:number_summaries])),
                summarise_prompt.invoke(combine_summaries(summaries[number_summaries:]))
            ]
            mapreduce_loops += 1
    else:
        # Panic because we're looping a long time to get this down to a reasonable size?
        print("Too many loops")
    result = llm.invoke(summarise_prompt)
    return StrOutputParser().invoke(result)


In [12]:
file_uuid="5a5ff2df-dff8-46d8-af43-12f7c99db429"
answer = summarise.invoke(file_uuid)

print(f"[{file_uuid}] {answer}")

INFO:elastic_transport.transport:POST http://localhost:9200/summarisation-chunk/_search?scroll=5m [status:200 duration:0.013s]
INFO:elastic_transport.transport:POST http://localhost:9200/_search/scroll [status:200 duration:0.001s]
INFO:elastic_transport.transport:DELETE http://localhost:9200/_search/scroll [status:200 duration:0.001s]
[92m14:39:40 - LiteLLM:INFO[0m: utils.py:1307 - [92m

POST Request Sent from LiteLLM:
curl -X POST \
https://oai-i-dot-ai-playground-sweden.openai.azure.com/ \
-H 'api_key: a87792838865********************' -H 'azure_ad_token: None' \
-d '{'model': 'gpt-35-turbo', 'messages': [{'role': 'user', 'content': "Summarize this content: page_content='Twilight Struggle —2nd Edition—\\n\\nDeluxe Edition by Jason Matthews & Ananda Gupta\\n\\nRU L E BOOK\\n\\nT A B L E O F C O N T E N T S\\n\\n\\x18. Introduction ........................................................... 2\\n\\n\\x180. Scoring and Victory .............................................. \\x180\\n\\

[5a5ff2df-dff8-46d8-af43-12f7c99db429] The combined summary describes the rulebook and components of the board game "Twilight Struggle." It provides an overview of the game's setup, gameplay sequence, card play, events, and operations. The game simulates the Cold War conflict between the Soviet Union and the United States and takes place on a world map divided into six regions. The content also includes a quote from John F. Kennedy's inaugural speech and a design note about the inclusion of certain countries for political purposes. The game is quick-playing and low-complexity, based on card-driven classics We the People and Hannibal: Rome vs. Carthage. The event cards cover historical events, and the map represents countries or blocs with stability numbers. Battleground states have special rules, and players can control countries through Influence points. Some cards have special effects when played as events, and the game has a headline phase where players select and reveal cards. Scor