In [None]:
!pip install kfp[kubernetes] -q

In [None]:
import os

import kfp
from kfp import dsl
from kfp import kubernetes

In [None]:
kfp_client = kfp.Client()

# Steps

In [None]:
@dsl.component(
    base_image='python:3.11',
    packages_to_install=["opensearch-py==2.7.1"]
)
def setup_os(rag_index_name:str = "rag_index", force_recreate: bool = True):
    import os
    from opensearchpy import OpenSearch
    
    def delete_opensearch_index(opensearch_client, index_name):
        print(f"Trying to delete index {index_name}")
        try:
            response = opensearch_client.indices.delete(index=index_name)
            print(f"Index {index_name} deleted")
            return response['acknowledged']
        except Exception as e:
            print(f"Index {index_name} not found, nothing to delete")
            return True
    
    def create_index(opensearch_client, index_name):
        settings = {
            "settings": {
                "index": {
                    "knn": True
                    }
                }
            }
        response = opensearch_client.indices.create(index=index_name, body=settings)
        return bool(response['acknowledged'])
    
    def create_index_mapping(opensearch_client, index_name):
        response = opensearch_client.indices.put_mapping(
            index=index_name,
            body={
                "properties": {
                    "vector_field": {
                        "type": "knn_vector",
                        "dimension": 384
                    },
                    "text": {
                        "type": "keyword"
                    }
                }
            }
        )
        return bool(response['acknowledged'])

    host = os.environ['OPENSEARCH_HOST']
    port = os.environ['OPENSEARCH_PORT']
    auth = (
        os.environ['OPENSEARCH_USER'],
        os.environ['OPENSEARCH_PASSWORD']
    ) 
    
    client = OpenSearch(
        hosts = [{'host': host, 'port': port}],
        http_compress = True, 
        http_auth = auth,
        use_ssl = True,
        verify_certs = False,
        ssl_assert_hostname = False,
        ssl_show_warn = False
    )
    
    if force_recreate:
        delete_opensearch_index(client, rag_index_name)
    
    index_exists = client.indices.exists(index=rag_index_name)
    
    if not index_exists:
        print("Creating OpenSearch index")
        index_created = create_index(client, rag_index_name)
        if index_created:
            print("Creating OpenSearch index mapping")
            success = create_index_mapping(client, rag_index_name)
            print(f"OpenSearch Index mapping created")
    else:
        print("Opensearch index already exists")

In [None]:
@dsl.component(
    base_image='python:3.11',
    packages_to_install=["minio<7.0"]
)
def download_data(bucket_name: str = "rag-demo-source", data_mount_point: str = "/data", data_folder: str = "raw") -> str:
    import os

    from minio import Minio
    from minio.error import BucketAlreadyOwnedByYou, NoSuchKey

    # Initialize a MinIO client
    mc = Minio(
        endpoint=os.environ["MINIO_ENDPOINT_URL"].split("http://")[1],
        access_key=os.environ["AWS_ACCESS_KEY_ID"],
        secret_key=os.environ["AWS_SECRET_ACCESS_KEY"],
        secure=False,
    )

    objects = mc.list_objects(bucket_name)
    for obj in objects:
        mc.fget_object(bucket_name, obj.object_name, f"{data_mount_point}/{data_folder}/{obj.object_name}")
        print("\t", "Downloaded", obj.object_name)

    return data_folder

In [None]:
@dsl.component(
    base_image='python:3.11'
)
def remove_unsupported_files(data_source_folder:str, data_mount_point: str = "/data", data_target_folder:str = "cleaned") -> str:
    import os
    import shutil
    
    SUPPORTED_FORMATS = {".csv", ".doc", ".docx", ".enex", ".epub", ".html", ".md", ".odt", ".pdf", ".ppt", ".pptx", ".txt",}
    source_folder = f"{data_mount_point}/{data_source_folder}"
    target_folder = f"{data_mount_point}/{data_target_folder}"

    os.makedirs(os.path.join(data_mount_point, data_target_folder), exist_ok=True)
    
    for f in os.listdir(source_folder):
        _, ext = os.path.splitext(f)
        if ext in SUPPORTED_FORMATS:
            shutil.copy2(f"{source_folder}/{f}", f"{target_folder}/{f}")
            print("Copied file to :", f"{target_folder}/{f}")

    return data_target_folder
    

In [None]:
# @dsl.component(
#     base_image='python:3.11',
#     packages_to_install=["opensearch-py==2.7.1", "langchain==0.2.16", "langchain-community==0.2.16", "sentence-transformers==3.0.1"]
# )
@dsl.component(
    base_image='bponieckiklotz/kfp-steps:ingestion-os',
    packages_to_install=["pymupdf", "unstructured"]
)
def ingest_os(
    index_name:str, 
    data_source_folder:str, 
    data_mount_point: str = "/data",
    use_gpu:bool = False,
    chunk_size:int = 500,
    chunk_overlap:int = 50,
    batch_size:int = 10,
    embeddings_model_name:str = "sentence-transformers/all-MiniLM-L6-v2",
):
    
    from langchain.document_loaders import (
        CSVLoader,
        EverNoteLoader,
        PyMuPDFLoader,
        TextLoader,
        UnstructuredEmailLoader,
        UnstructuredEPubLoader,
        UnstructuredHTMLLoader,
        UnstructuredMarkdownLoader,
        UnstructuredODTLoader,
        UnstructuredPowerPointLoader,
        UnstructuredWordDocumentLoader,
    )
    from langchain.docstore.document import Document
    from langchain.text_splitter import RecursiveCharacterTextSplitter

    from langchain.embeddings import HuggingFaceEmbeddings
    from langchain.vectorstores import OpenSearchVectorSearch

    from opensearchpy import OpenSearch

    import os
    import glob
    
    
    # Map file extensions to document loaders and their arguments
    LOADER_MAPPING = {
        ".csv": (CSVLoader, {}),
        ".doc": (UnstructuredWordDocumentLoader, {}),
        ".docx": (UnstructuredWordDocumentLoader, {}),
        ".enex": (EverNoteLoader, {}),
        ".epub": (UnstructuredEPubLoader, {}),
        ".html": (UnstructuredHTMLLoader, {}),
        ".md": (UnstructuredMarkdownLoader, {}),
        ".odt": (UnstructuredODTLoader, {}),
        ".pdf": (PyMuPDFLoader, {}),
        ".ppt": (UnstructuredPowerPointLoader, {}),
        ".pptx": (UnstructuredPowerPointLoader, {}),
        ".txt": (TextLoader, {"encoding": "utf8"}),
        # Add more mappings for other file extensions and loaders as needed
    }
    
    def load_single_document(file_path: str) -> List[Document]:  
        ext = "." + file_path.rsplit(".", 1)[-1]
        if ext in LOADER_MAPPING:
            loader_class, loader_args = LOADER_MAPPING[ext]
            loader = loader_class(file_path, **loader_args)
            return loader.load()
    
        raise ValueError(f"Unsupported file extension '{ext}'")
    
    files = []
    for ext in LOADER_MAPPING:
        files.extend(glob.glob(f"{data_mount_point}/{data_source_folder}/*{ext}", recursive=True))
    
    files_len = len(files)
    docs = []
    count = 0
    for f in files:
        count+=1
        print(f"Processing file {f}, {count}/{files_len}")
        docs.extend(load_single_document(f))
        
    
    if not docs:
        print("No new documents to load")
        exit(0)
    
    print(f"Loaded {len(docs)} new documents")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap
    )  
    texts = text_splitter.split_documents(docs)
    print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
    
    hfe = HuggingFaceEmbeddings(
        model_name=embeddings_model_name,
        model_kwargs={"device": "cuda" if use_gpu else "cpu"},
    )

    host = os.environ['OPENSEARCH_HOST']
    port = os.environ['OPENSEARCH_PORT']
    auth = (
        os.environ['OPENSEARCH_USER'],
        os.environ['OPENSEARCH_PASSWORD']
    ) 

    opensearch_vector_search = OpenSearchVectorSearch(
        opensearch_url = f"https://{host}:{port}",
        index_name = index_name,
        embedding_function = hfe,
        http_auth = auth,
        verify_certs = False,
        ssl_assert_hostname = False,
        ssl_show_warn = False
    )

    texts_len = len(texts)
    for i in range(0, texts_len, batch_size):
        batch = texts[i:i + batch_size]
        print(f"Processing batch {int(i/batch_size)}/{int(texts_len/batch_size)}")
        opensearch_vector_search.add_texts(
            texts=[t.page_content for t in batch],
            ids=[f"{t.metadata.get('ID')}_{hash(t.page_content)}" for t in batch],
            metadatas=[t.metadata for t in batch],
            bulk_size=batch_size
        )


In [None]:
@dsl.pipeline(
    name="Ingestion Pipeline",
    description="Ingest data from S3 to OpenSearch"
)
def ingestion_pipeline(
    rag_index_name:str = "rag_index",
    rag_index_force_recreate:bool = True,
):
    setup_os_task = setup_os(
        rag_index_name = rag_index_name, 
        force_recreate = rag_index_force_recreate
    )
    kubernetes.use_secret_as_env(setup_os_task,
                                 secret_name='opensearch-secret',
                                 secret_key_to_env={
                                     'username': 'OPENSEARCH_USER',
                                     'password': 'OPENSEARCH_PASSWORD',
                                     'host': 'OPENSEARCH_HOST',
                                     'port': 'OPENSEARCH_PORT',
                                 })
    
    pvc_data_ingestion = kubernetes.CreatePVC(
        pvc_name_suffix='-data-ingestion',
        access_modes=['ReadWriteMany'],
        size='1Gi',
        storage_class_name='microk8s-hostpath',
    )
    
    download_data_task = download_data(
        bucket_name = "rag-demo-source"
    ).set_env_variable(
        "MINIO_ENDPOINT_URL", os.environ["MINIO_ENDPOINT_URL"]
    ).set_caching_options(
        enable_caching = False
    ).after(setup_os_task)
    kubernetes.use_secret_as_env(download_data_task,
                                 secret_name='mlpipeline-minio-artifact',
                                 secret_key_to_env={
                                     'accesskey': 'AWS_ACCESS_KEY_ID',
                                     'secretkey': 'AWS_SECRET_ACCESS_KEY',
                                 })
    kubernetes.mount_pvc(
        download_data_task,
        pvc_name=pvc_data_ingestion.outputs['name'],
        mount_path='/data',
    )

    remove_unsupported_files_task = remove_unsupported_files(
        data_source_folder = download_data_task.output
    ).set_caching_options(
        enable_caching = False
    ).after(download_data_task)
    kubernetes.mount_pvc(
        remove_unsupported_files_task,
        pvc_name=pvc_data_ingestion.outputs['name'],
        mount_path='/data',
    )

    ingest_os_task = ingest_os(
        index_name = rag_index_name,
        data_source_folder = remove_unsupported_files_task.output
    ).set_caching_options(
        enable_caching = False
    ).after(remove_unsupported_files_task)
    kubernetes.mount_pvc(
        ingest_os_task,
        pvc_name=pvc_data_ingestion.outputs['name'],
        mount_path='/data',
    )
    kubernetes.use_secret_as_env(ingest_os_task,
                                 secret_name='opensearch-secret',
                                 secret_key_to_env={
                                     'username': 'OPENSEARCH_USER',
                                     'password': 'OPENSEARCH_PASSWORD',
                                     'host': 'OPENSEARCH_HOST',
                                     'port': 'OPENSEARCH_PORT',
                                 })

    delete_pvc_data_ingestion = kubernetes.DeletePVC(
        pvc_name=pvc_data_ingestion.outputs['name']
    ).after(ingest_os_task)

In [None]:
kfp_client.create_run_from_pipeline_func(
    ingestion_pipeline,
    arguments={
        "rag_index_name" : "rag_index",
        "rag_index_force_recreate" : True,
    }
)

In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func=ingestion_pipeline,
    package_path='./ingestion-pipeline.yaml',
    pipeline_parameters={
        "rag_index_name" : "rag_index",
        "rag_index_force_recreate" : True,
    },
)