# Upload and transform RAW files with PhariaDataPlatform
<a id="data-setup"></a>

This section describes how to establish a complete document ingestion pipeline in PhariaAI. The ingestion pipeline is a crucial foundation for RAG applications, as it transforms source RAW documents into searchable, AI-ready processed documents.

## Pipeline components

<img src="../Visualizations/E2E-Tutorial-data-pipeline.png" alt="Ingestion workflow" style="width:85%"/>


## What you will learn

1. How to configure your environment and connection parameters
2. How to create an ingestion pipeline with the PhariaData API
3. How to upload documents and monitor their processing
4. How to use PhariaSearch on the uploaded files

## Prerequisites

Before starting, ensure you have performed the previous tutorial <a href="./Setup collections and indexex with PhariaSearch.ipynb">Setup collections and indexes with PhariaSearch</a>


## Procedure

Below, you can see all concepts involved in the creation of the pipeline and their relationships.

<img src="../Visualizations/E2E-Tutorial-data-pipeline-relationships.png" alt="Resources relationships" style="width:70%;"/>

### Import dependencies and configure the environment

We begin by importing necessary dependencies and setting up the environment. We use standard Python libraries such as `requests` for API communication, `pandas` for data handling, as well as specialised libraries such as `tenacity` for robust error handling with retry mechanisms.

In [13]:
from dotenv import load_dotenv
from os import (
    getenv, 
    path,
    listdir
)
import json
import requests
import pandas as pd
import warnings
import concurrent.futures
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)
warnings.filterwarnings("ignore")

In [4]:
## Setups
load_dotenv(override=True)

TOKEN = getenv("PHARIA_AI_TOKEN")
NAMESPACE = getenv("PHARIA_DATA_NAMESPACE")
COLLECTION = getenv("PHARIA_DATA_COLLECTION")
INDEX = getenv("INDEX")

PHARIA_API_BASE_URL = getenv("PHARIA_API_BASE_URL")

DATA_PLATFORM_URL = f"{PHARIA_API_BASE_URL}/studio/data"
DOCUMENT_INDEX_API_URL = f"{PHARIA_API_BASE_URL}/studio/search"

STAGE_NAME = getenv("STAGE_NAME")
REPOSITORY_NAME = getenv("REPOSITORY_NAME")
TRANSFORMATION_NAME = getenv("TRANSFORMATION_NAME")
TRIGGER_NAME = getenv("TRIGGER_NAME")


### Create a document repository

A repository in PhariaData is a storage container that organises processed documents. 

The `get_or_create_repository` function checks if a repository with the specified name already exists and creates one if it does not. The function returns the repository ID, which is referenced in later steps when configuring the ingestion pipeline.

In [5]:
## Helper function 

def get_or_create_repository(repository: dict) -> str:
    """Get or create a repository in the Data Platform."""
    dataplatform_base_url = DATA_PLATFORM_URL
    name = repository["name"]
    url = f"{dataplatform_base_url}/repositories?name={name}"

    token = TOKEN
    response = requests.get(
        url=url, headers={"Authorization": f"Bearer {token}"}, verify=False
    )
    response.raise_for_status()
    page = response.json()

    if page["total"] > 0:
        return page["repositories"][0]["repositoryId"]
    else:
        url = f"{dataplatform_base_url}/repositories"
        response = requests.post(
            url=url,
            json=repository,
            headers={"Authorization": f"Bearer {token}"},
            verify=False,
        )
        response.raise_for_status()
        repo_created = response.json()
        return repo_created["repositoryId"]

In [7]:
## Create the repository

repository_payload = {
    "name": REPOSITORY_NAME,
    "mediaType": "jsonlines",
    "modality": "text",
    "schema": None,
}

repository_id = get_or_create_repository(repository_payload)
print(f"Repository ID: {repository_id}")

Repository ID: 64155b2d-86d1-495b-979d-b0e64f317693


### Configure a document upload stage

A stage provides storage for source documents before they are processed.

The stage configuration includes a trigger that defines what happens when source documents are uploaded. This trigger specifies the transformation to apply and where to store the results.

The `get_or_create_stage` function returns a stage ID that is used when uploading documents in later steps.

In [8]:
## Helper function 

def get_or_create_stage(stage: dict) -> str:
    """Get or create a stage in the Data Platform."""
    dataplatform_base_url = DATA_PLATFORM_URL
    name = stage["name"]
    url = f"{dataplatform_base_url}/stages?name={name}"

    token = TOKEN
    response = requests.get(
        url=url, headers={"Authorization": f"Bearer {token}"}, verify=False
    )
    response.raise_for_status()
    page = response.json()

    if page["total"] > 0:
        return page["stages"][0]["stageId"]
    else:
        url = f"{dataplatform_base_url}/stages"
        response = requests.post(
            url=url,
            json=stage,
            headers={"Authorization": f"Bearer {token}"},
            verify=False,
        )
        response.raise_for_status()
        stage_created = response.json()
        return stage_created["stageId"]

In [9]:
## Setup stage

stage_payload = {
    "name": STAGE_NAME,
    "triggers": [
        {
            "transformationName": TRANSFORMATION_NAME,
            "destinationType": "DataPlatform:Repository",
            "connectorType": "DocumentIndex:Collection",
            "name": TRIGGER_NAME,
        }
    ],
}

stage_id = get_or_create_stage(stage_payload)
print(f"Stage ID: {stage_id}")

Stage ID: 350dcaf6-fff0-4ad0-a69b-bf16eecb0d0b


### Set up automated document processing

The trigger configuration defines what happens when source documents are uploaded to the stage. The `ingestion_context` object combines three key elements:

1. The trigger name that identifies which trigger to activate
2. The destination repository where processed documents are stored
3. The collection and namespace where processed documents are indexed

This context is included with source document uploads to instruct the system on how to process each document. When a source document is uploaded, the specified trigger automatically applies the transformation and indexes the processed document.

In [10]:
ingestion_context = {
    "triggerName": TRIGGER_NAME,
    "destinationContext": {"repositoryId": repository_id},
    "connectorContext": {
        "collection": COLLECTION,
        "namespace": NAMESPACE,
    },
}
print(f"Ingestion context: {ingestion_context}")

Ingestion context: {'triggerName': 'testTrigger - DocumentStorageTutorial', 'destinationContext': {'repositoryId': '64155b2d-86d1-495b-979d-b0e64f317693'}, 'connectorContext': {'collection': 'pharia-tutorial-rag', 'namespace': 'Studio'}}


### Upload and process documents

With our infrastructure set-up complete (repository, stage, index, and trigger), we can now upload source documents to the PhariaAI platform. This section demonstrates how to upload source documents and initiate the document ingestion process.

The document ingestion workflow transforms source documents into searchable processed documents through several steps: uploading to the stage, applying transformations, storing in the repository, and indexing for search.

The `ingest_all_documents` helper function returns a DataFrame with details on each upload attempt, making it easy to track successes and failures.

In [14]:
## Helper functions

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(requests.RequestException),
)
def ingest_document(
    document_path: str, ingestion_context: dict, name: str, stage_id: str
) -> dict:
    """Attempts to ingest a document and returns the ingestion result."""
    with open(document_path, mode="rb") as file_reader:
        dataplatform_base_url = DATA_PLATFORM_URL
        url = f"{dataplatform_base_url}/stages/{stage_id}/files"
        token = TOKEN
        response = requests.post(
            url=url,
            headers={"Authorization": f"Bearer {token}"},
            verify=False,
            files={
                "name": name,
                "sourceData": file_reader,
                "ingestionContext": json.dumps(ingestion_context),
            },
        )
        response.raise_for_status()

        file_uploaded = response.json()
        return {
            "file_id": file_uploaded["fileId"],
            "status": "Success",
            "error_type": None,
            "error_message": None,
        }
    


def ingest_all_documents(
    directory_path: str, ingestion_context: dict, stage_id: str, max_workers: int = 3
):
    """Ingest all files in a directory concurrently and store results in a DataFrame."""

    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {
            executor.submit(
                ingest_document,
                path.join(directory_path, file),
                ingestion_context,
                file,
                stage_id,
            ): file
            for file in listdir(directory_path)
        }

        for future in concurrent.futures.as_completed(future_to_file):
            file_name = future_to_file[future]
            file_path = path.join(directory_path, file_name)
            try:
                result = future.result()
                results.append(
                    {
                        "file_path": file_path,
                        "file_id": result["file_id"],
                        "status": result["status"],
                        "error_type": result["error_type"],
                        "error_message": result["error_message"],
                    }
                )
            except Exception as e:
                print(f"An error occurred while ingesting {file_path}: {e}")
                results.append(
                    {
                        "file_path": file_path,
                        "file_id": None,
                        "status": "Ingestion Failed",
                        "error": str(e),
                    }
                )

    df_results = pd.DataFrame(results)
    return df_results

In [15]:
# Ingesting the files
directory_path = "files_to_upload"
df_results = ingest_all_documents(directory_path, ingestion_context, stage_id)
df_results

Unnamed: 0,file_path,file_id,status,error_type,error_message
0,files_to_upload/Azure Cognitive Search_ Outper...,687496e4-5d0b-4abd-a35a-30790dd2d7ba,Success,,
1,files_to_upload/What is RAG_ - Retrieval-Augme...,7f4f5e3a-02a6-437d-bcfe-22d87d337b9a,Success,,
2,files_to_upload/RAG.pdf,9112b8af-caf7-4274-a46a-e7c57e4f3c06,Success,,
3,files_to_upload/Attention is all you need.pdf,7e472330-eebe-4420-8c22-67bbb587e9a5,Success,,


### Monitor the source document processing status

After uploading source documents, you need to verify their processing status. The code in this section does the following:

1. Extracts IDs of successfully uploaded source documents
2. Retrieves the transformation ID
3. Checks the status of each source document's transformation
4. Extracts dataset IDs from completed transformations

The `check_files_status` function combines all this information into a comprehensive report that shows which files completed processing and which encountered errors. The dataset IDs are particularly important as they are used to access your processed documents in subsequent operations.


In [16]:
def get_successful_document_ids(df: pd.DataFrame) -> list:
    """Retrieve a list of successful file_ids from the DataFrame."""
    return df[df["status"] == "Success"]["file_id"].tolist()

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(requests.RequestException),
)
def check_status_of_ingestion(transformation_id: str, file_id: str) -> dict:
    """Query the status of the ingestion for a given transformation and file_id."""
    dataplatform_base_url = DATA_PLATFORM_URL
    url = f"{dataplatform_base_url}/transformations/{transformation_id}/runs?file_id={file_id}"

    token = TOKEN
    response = requests.get(
        url=url, headers={"Authorization": f"Bearer {token}"}, verify=False
    )
    response.raise_for_status()
    page = response.json()

    assert page["total"] > 0
    return page["runs"][0]

def get_transformation_id(name: str) -> str:
    """Get the transformation ID from the Data Platform."""
    dataplatform_base_url = DATA_PLATFORM_URL
    url = f"{dataplatform_base_url}/transformations?name={name}"

    token = TOKEN
    response = requests.get(
        url=url, headers={"Authorization": f"Bearer {token}"}, verify=False
    )
    response.raise_for_status()
    page = response.json()

    assert page["total"] > 0
    return page["transformations"][0]["transformationId"]

def check_files_status(transformation_id: str, df: pd.DataFrame, max_workers: int = 3):
    """Check the status of ingested files and store the results in a DataFrame."""

    successful_file_ids = get_successful_document_ids(df)
    status_results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {
            executor.submit(
                check_status_of_ingestion, transformation_id, file_id
            ): file_id
            for file_id in successful_file_ids
        }

        for future in concurrent.futures.as_completed(future_to_file):
            file_id = future_to_file[future]
            try:
                run = future.result()
                output = json.dumps(run.get("output", {}), indent=4)
                status_results.append(
                    {
                        "file_id": file_id,
                        "run_id": run["runId"],
                        "status": run["status"],
                        "output": output,
                        "error": run["errors"],
                    }
                )
            except Exception as e:
                status_results.append(
                    {
                        "file_id": file_id,
                        "status": run["status"],
                        "output": None,
                        "error": str(e),
                    }
                )

    return df.merge(
        pd.DataFrame(status_results),
        on="file_id",
        how="left",
        suffixes=("_ingestion", ""),
    )

def get_successful_dataset_ids(df: pd.DataFrame) -> list:
    """Retrieve a list of successful dataset_ids from the DataFrame."""
    dataset_ids_list = []
    for i in range(len(df)):
        dataset_ids_list.append(json.loads(df["output"][i]).get("datasetId"))
    return dataset_ids_list

In [21]:
transformation_id = get_transformation_id(TRANSFORMATION_NAME)
status_df = check_files_status(transformation_id, df_results)
status_df.to_csv("ingestion_status.csv", index=False)
successful_dataset_ids = get_successful_dataset_ids(status_df[status_df["status"] == "completed"])
status_df

Unnamed: 0,file_path,file_id,status_ingestion,error_type,error_message,run_id,status,output,error
0,files_to_upload/Azure Cognitive Search_ Outper...,687496e4-5d0b-4abd-a35a-30790dd2d7ba,Success,,,7ea6222d-f831-48d2-bc40-51d3d82653ab,completed,"{\n ""type"": ""DataPlatform:Repository:Datase...",
1,files_to_upload/What is RAG_ - Retrieval-Augme...,7f4f5e3a-02a6-437d-bcfe-22d87d337b9a,Success,,,30a1a78f-b283-4f0f-baf0-ec37cb4b3330,completed,"{\n ""type"": ""DataPlatform:Repository:Datase...",
2,files_to_upload/RAG.pdf,9112b8af-caf7-4274-a46a-e7c57e4f3c06,Success,,,d0bd1fdd-2e05-4588-9177-28d0c2c46c09,completed,"{\n ""type"": ""DataPlatform:Repository:Datase...",
3,files_to_upload/Attention is all you need.pdf,7e472330-eebe-4420-8c22-67bbb587e9a5,Success,,,b2a399a8-c7ae-42af-b887-8451acaa849b,completed,"{\n ""type"": ""DataPlatform:Repository:Datase...",


### Interact with processed documents

As done in the previous tutorial, it is possible to use PhariaSearch (previously Document Index) to perform any type of search on the collection where we uploaded the document. For example, let's use our semantic search index to check the newly uploaded documents.

In [22]:
from pharia_data_sdk.connectors import DocumentIndexClient
from pharia_data_sdk.connectors.retrievers import DocumentIndexRetriever

search_client = DocumentIndexClient(
    token=TOKEN,
    base_url=DOCUMENT_INDEX_API_URL,
)

document_index_retriever = DocumentIndexRetriever(
    document_index=search_client,
    index_name=INDEX,
    namespace=NAMESPACE,
    collection=COLLECTION,
    k=5,
)

document_index_retriever.get_relevant_documents_with_scores(
    query="what is attention?"
)

[SearchResult(id=DocumentPath(collection_path=CollectionPath(namespace='Studio', collection='pharia-tutorial-rag'), document_name='7c9e1645-b0cd-4198-9ed9-bcaca5f82cfd'), score=0.66146505, document_chunk=DocumentChunk(text='Self-attention, sometimes called intra-attention is an attention mechanism relating different positions of a single sequence in order to compute a representation of the sequence.', start=5535, end=5712, metadata=None)),
 SearchResult(id=DocumentPath(collection_path=CollectionPath(namespace='Studio', collection='pharia-tutorial-rag'), document_name='d3178aad-7e4f-4116-82f9-594a59a650f4'), score=0.6613786, document_chunk=DocumentChunk(text='Self-attention, sometimes called intra-attention is an attention mechanism relating different positions of a single sequence in order to compute a representation of the sequence.', start=5535, end=5712, metadata=None)),
 SearchResult(id=DocumentPath(collection_path=CollectionPath(namespace='Studio', collection='pharia-tutorial-rag'

## Conclusion

In this section, you successfully set up the complete document ingestion pipeline to be able to ingest RAW file and index them with PhariaSearch.