# Load

In [3]:
import panel as pn
from dotenv import load_dotenv
import param

import sys
sys.path.append('..')

load_dotenv()

pn.extension()

# Main

In [8]:
from pyllments.base.model_base import Model
from pyllments.payloads.file import FilePayload

class ChunkModel(Model):
    text = param.String(doc='Text of chunk')
    source_file = param.ClassSelector(class_=FilePayload, doc='Source paylod of chunk')
    strategy = param.String(doc='Strategy used to create chunk', allow_None=True)
    start_idx = param.Integer(allow_None=True, doc="""
        Start index of chunk in source file""")
    end_idx = param.Integer(allow_None=True, doc="""
        End index of chunk in source file""")
    embedding = param.Parameter(doc=""" # TODO: Type this properly
        Embedding of chunk""")

from typing import Optional
from pathlib import Path
from pyllments.base.payload_base import Payload
from pyllments.payloads.chunk.chunk_model import ChunkModel

class ChunkPayload(Payload):
    def __init__(
        self,
        text: str = '',
        source_file: Optional[Path] = None,
        strategy: Optional[str] = None,
        start_idx: int = None,
        end_idx: int = None,
        embedding = None, 
        **params
    ):
        super().__init__(**params)
        self.model = ChunkModel(
            text=text,
            source_file=source_file,
            strategy=strategy,
            start_idx = start_idx,
            end_idx = end_idx,
            embedding = embedding
        )

In [9]:
ChunkPayload()

ChunkPayload(css_cache={}, id='9538e6c0-df1b-4007-8786-5e09facf02db', model=ChunkModel(embedding=None, end_idx=None, name='ChunkModel00120', source_file=None, start_idx=None, strategy=None, text=''), name='ChunkPayload00119', view_cache={})

In [11]:
ChunkPayload()

ChunkPayload(css_cache={}, id='4e91b88a-62c6-40b7-be6e-fd65f99a06de', model=ChunkModel(name='ChunkModel00121', source_file=None, start_end_indices=None, strategy=None, text=''), name='ChunkPayload00120', view_cache={})

In [2]:
from collections import namedtuple
from io import TextIOWrapper

from langchain_text_splitters import RecursiveCharacterTextSplitter


def base_text_splitter(
        file: TextIOWrapper, 
        chunk_size: int, 
        chunk_overlap, 
        length_function = len, 
        keep_separator = False):
    Chunk = namedtuple('Chunk', ['text', 'start_index', 'end_index'])
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, 
        chunk_overlap=chunk_overlap, 
        add_start_index=True,
        length_function=length_function,
        keep_separator=keep_separator)
    file_text = file.read_text()
    documents = text_splitter.create_documents(file_text)
    chunk_list = []
    for doc in documents:
        text = doc.page_content
        start_index = doc.metadata['start_index']
        end_index = start_index + len(text)
        chunk_list.append(Chunk(text=text, start_index=start_index, end_index=end_index))
    return chunk_list


from pyllments.base.model_base import Model

class TextChunkerModel(Model):
    
    # For ease for modification, returns named tuples to be processed by the model
    splitter_fn = param.Callable(default=base_text_splitter, doc="""
        Should return a list of Chunk Payloads given a File Payload""")
    file_types = param.List(default=['txt', 'md'])
    multi_proc = param.Boolean(default=False, doc="""
        When True, multiple processes will handle the files simultaneously""")
    file_payloads = param.List(default=[], doc="""List of files to chunk""")
    chunk_payloads = param.List(default=[], doc="""List of chunk payloads""")
    
    def __init__(self, **params):
        self._set_watchers()

    def _set_watchers(self):
        self._set_file_list_watcher()

    def _set_file_list_watcher(self):
        def fn(event):
            for file_payload in self.file_payloads:
                for chunk in self.splitter_fn(file_payload):
                    self.chunk_payloads.append(
                        ChunkPayload(
                            text=chunk.text,
                            source_file=file_payload.path,
                            start_idx=chunk.start_index,
                            end_idx=chunk.end_index
                        )
                    )
                self.param.trigger('chunk_payloads') # To let the model know that the chunk_payloads have been updated
            with param.parameterized.disable_events():
                self.file_payloads = [] # Clean up
        self.param.watch(fn, 'file_payloads')
          


from typing import Union

from pyllments.base.element_base import Element


class TextChunkerElement(Element):

    def __init__(self, **params):
        super().__init__(**params)
        self.model = TextChunkerModel()
        
        self._file_input_setup()
        self._chunk_output_setup()

    def _file_input_setup(self):
        def unpack(payload: Union[FilePayload, list[FilePayload]]):
            file_payload_list = payload if isinstance(payload, list) else [payload]
            self.model.file_payloads = file_payload_list

        self.ports.add_input(name='file_input', unpack_payload_callback=unpack)

    def _chunk_output_setup(self):
        def pack(chunk_payloads: list[ChunkPayload]) -> list[ChunkPayload]:
            with param.parameterized.disable_events():
                self.model.chunk_payloads = [] # Clean up
            return chunk_payloads
        
        self.ports.add_output(name='chunk_output', pack_payload_callback=pack)

    def _set_chunk_payloads_watcher(self):
        def fn(event):
            self.ports.output['chunk_output'].stage_emit(chunk_payloads=self.model.chunk_payloads)
            with param.parameterized.disable_events():
                self.model.chunk_payloads = [] # Clean up

        self.model.param.watch(fn, 'chunk_payloads')                             
    


    


ModuleNotFoundError: No module named 'pyllments'

# Embedder

In [11]:
from sentence_transformers import SentenceTransformer

def base_sentence_transformer_encode(
        sentences: list[str], 
        model_name: str = 'Alibaba-NLP/gte-base-en-v1.5'):
    model = SentenceTransformer(model_name)
    return model.encode(sentences)

  from tqdm.autonotebook import tqdm, trange


In [17]:
from pyllments.base.model_base import Model


class EmbedderModel(Model):
    embedder_fn = param.Callable(default=base_sentence_transformer_embed, doc="""
        Should return a list of embeddings given a list of sentences""")
    chunks = param.List(default=[], doc="""List of chunks""")
    processed_chunks = param.List(default=[], doc="""List of processed chunks""")

    def __init__(self, **params):
        super().__init__(**params)
        self._set_watchers()

    def _set_watchers(self):
        self._set_chunk_watcher()

    def _set_chunks_watcher(self):
        def fn(event):
            embed_list = self.embedder_fn([chunk.text for chunk in self.chunks])
            for chunk, embedding in zip(self.chunks, embed_list):
                chunk.embedding = embedding
            self.processed_chunks = self.chunks
            with param.parameterized.disable_events():
                self.chunks = []
            
        self.param.watch(fn, 'chunks')

from pyllments.base.element_base import Element


class EmbedderElement(Element):

    def __init__(self, **params):
        super().__init__(**params)
        self.model = EmbedderModel()
        self._chunk_input_setup()
        self._embedding_output_setup()

    def _chunk_input_setup(self):
        def unpack(chunk_payloads: Union[list[ChunkPayload], ChunkPayload]):
            self.model.chunk_payloads = (chunk_payloads
                                         if isinstance(chunk_payloads, list) 
                                         else [chunk_payloads])
            

        self.ports.add_input(name='chunk_input', unpack_payload_callback=unpack)

    def _embedding_output_setup(self):
        def pack(processed_chunks: list[ChunkPayload]) -> list[ChunkPayload]:
            return 

        self.ports.add_output(name='embedding_output', pack_payload_callback=pack)

    def _set_watchers(self):
        self._set_processed_chunks_watcher()
    
    def _set_processed_chunks_watcher(self):
        def fn(event):
            self.ports.output['embedding_output'].stage_emit(processed_chunks=self.model.processed_chunks)
            with param.parameterized.disable_events():
                self.model.processed_chunks = [] # Clean up
        self.model.param.watch(fn, 'processed_chunks')  

sdfdsf