In [6]:
#imports
import asyncio
import os
from pypaperless import Paperless
from r2r import R2RClient
import requests
from contextlib import asynccontextmanager
import nest_asyncio
nest_asyncio.apply()

In [None]:
async def getTags(paperless):
    tag_list = []
    async for item in paperless.tags:
        tag_data = item._data
        #print(f"Tag Data: {tag_data}")  # Debug structure
        try:
            tag_id = tag_data['id']  # Get tag ID
            tag_name = tag_data['name']
            if tag_name == "Personal":
                print(f"Skipping tag '{tag_name}'.")
                continue# Get tag name
            tag_list.append((tag_id, tag_name))
        except KeyError as e:
            print(f"Missing key in tag data: {e}. Skipping tag.")
            continue
    return tag_list

In [None]:
    
async def getDocs(paperless,filters):
    document_ids = []
    async with paperless.documents.reduce(**filters) as filtered:
        async for item in filtered:
            try:
                docid = item.id
                document_ids.append(docid)
            except Exception as e:
                print(f"Error processing {docid}: {e}")    
    return document_ids

In [40]:



async def fetch_tag_document_count(paperless, tag_id):
    """
    Fetch the current number of documents for a specific tag in Paperless.
    :param paperless: Initialized Paperless client.
    :param tag_id: The ID of the tag to fetch document count for.
    :return: The number of documents associated with the tag.
    """
    filters = {"tags__id": tag_id}
    document_ids = await getDocs(paperless, filters)
    return len(document_ids)


def fetch_collection_document_count(r2r_client, collection_id):
    """
    Fetch the current number of documents in a specific R2R collection.
    :param r2r_client: Initialized R2R client.
    :param collection_id: The ID of the collection to fetch document count for.
    :return: The number of documents in the collection.
    """
    response = r2r_client.collections.retrieve(collection_id)
    return response['results']['document_count']
    
async def process_documents_in_batches(document_ids, batch_size, paperless, r2r_client, collection_id):
    # Use asyncio.gather to process batches concurrently
    tasks = []
    for i in range(0, len(document_ids), batch_size):
      batch = document_ids[i:i + batch_size]
      tasks.append(process_document_batch(batch, paperless, r2r_client, collection_id))
    await asyncio.gather(*tasks)

async def sync_tag(
    paperless, r2r_client, tag_id, tag_name, r2r_collection_map, batch_size
):
    """
    Sync a single tag and its associated documents from Paperless to R2R.

    Skips the tag if the document counts in Paperless and R2R match.
    :param paperless: Initialized Paperless client.
    :param r2r_client: Initialized R2R client.
    :param tag_id: Paperless tag ID.
    :param tag_name: Paperless tag name.
    :param r2r_collection_map: A map of R2R collection names to IDs.
    :param batch_size: The batch size for processing documents.
    """
    print(f"Processing tag '{tag_name}'...")

    # Check if the tag already exists in the R2R collections
    collection_id = r2r_collection_map.get(tag_name)
    if not collection_id:
        print(f"Collection '{tag_name}' not found in R2R. Creating...")
        create_result = r2r_client.collections.create(name=tag_name)
        collection_id = create_result["id"]
        r2r_collection_map[tag_name] = collection_id  # Update the collection map

    # Fetch the document counts for this tag (from Paperless and R2R)
    paperless_doc_count = await fetch_tag_document_count(paperless, tag_id)
    r2r_doc_count = fetch_collection_document_count(r2r_client, collection_id)

    # Compare document counts
    if paperless_doc_count <= r2r_doc_count:
        print(f"Skipping tag '{tag_name}': Document counts match ({paperless_doc_count}).")
        return  # Skip this tag if the counts match

    print(
        f"Document counts differ for tag '{tag_name}': "
        f"Paperless={paperless_doc_count}, R2R={r2r_doc_count}. Processing..."
    )
    filters = {"tags__id": tag_id}
    document_ids = await getDocs(paperless, filters)
    await process_documents_in_batches(
        document_ids, batch_size, paperless, r2r_client, collection_id
    )


async def process_document_batch(batch, paperless, r2r_client, collection_id, custom_field_name='document_id'):
    """
    Process a batch of documents: either ingest them into R2R or associate existing documents
    with the correct collection.

    :param batch: List of Paperless document IDs to process.
    :param paperless: Initialized Paperless client.
    :param r2r_client: Initialized R2R client.
    :param collection_id: The collection ID in R2R to add documents to.
    :param custom_field_name: The name of the custom field storing the R2R document ID.
    """
    for docid in batch:
        # Check for existing R2RDocumentID in Paperless
        existing_r2r_id = await get_r2r_document_id_from_paperless(paperless, docid, custom_field_name)
        
        if existing_r2r_id:
            print(f"Document {docid} already has R2R ID {existing_r2r_id}.")
            # Add the existing document to the collection in R2R
            try:
                await docToCollect(existing_r2r_id, collection_id, r2r_client)
            except Exception as e:
                print(f"Error adding document {existing_r2r_id} to collection {collection_id}: {e}")
            continue  # Skip ingestion if the document already exists in R2R
        
        # Try to fetch the document file path
        file_path = await getPath(paperless, docid)
        if not file_path:
            print(f"Skipping document {docid}: File path not found.")
            continue

        # Ingest the document into R2R
        try:
            document_id = await ingest(docid, file_path, r2r_client, collection_id)
            print(f"Document {docid} successfully ingested as {document_id}.")
            
            # Add the document to the custom field in Paperless
            custom_field_name='document_id'
            await update_custom_field(paperless, docid, custom_field_name, document_id)
            
            # Add the document to the collection in R2R
            await docToCollect(document_id, collection_id, r2r_client)
        except Exception as e:
            print(f"Error processing document {docid}: {e}")
# Get full file path of a document (optimized)
async def getPath(paperless, docid):
    doc_string = 'ScanToPDF'
    document = await paperless.documents(docid)
    created_date = document.created_date
    date_string = created_date.strftime('%Y')
    doc_title = document.title
    search_paths = [
        f'/home/admin/Paperless/media/documents/archive/Private/{doc_string}/{date_string}/{doc_title}.pdf',
        f'/home/admin/Paperless/media/documents/originals/Private/{doc_string}/{date_string}/{doc_title}.pdf'
    ]
    for path in search_paths:
        if os.path.exists(path):
            return path
    print(f"File not found for document {docid}")
    return None
    
async def ingest(docid, file_path, client, collectid):
    """
    Ingest a single document into R2R and return the document ID.
    Raises an exception if ingestion fails.
    """
    try:
        # Attempt to create the document in R2R
        response = client.documents.create(
            file_path=file_path,
            metadata={"PaperlessID": docid},
            id=None
        )
        document_id = response['results'][0]['document_id']
        return document_id
    except Exception as e:
        raise Exception(f"Error during ingestion: {e}")
        
def extract_document_id_from_error(error_message):
    """
    Extracts the document ID from error messages indicating a document already exists.
    """
    import re

    # Use regex to extract UUID from the error message
    match = re.search(r"/documents/([a-f0-9\-]{36})", error_message)
    if match:
        return match.group(1)  # Return the first matching group (the document ID)
    return None
    
async def docToCollect(document_id, collection_id, r2r_client):
    """
    Assign a document to a collection in R2R.
    """
    try:
        # Add the document to the specified collection
        assign_doc_result = r2r_client.collections.add_document(collection_id, document_id)
        print(f"Document {document_id} successfully added to collection {collection_id}.")
    except Exception as e:
        print(f"Error assigning document {document_id} to collection {collection_id}: {e}")

async def sync_paperless_to_r2r(paperless, r2r_client_url, batch_size=10):
    """
    Sync all Paperless tags and documents with R2R collections in real-time.
    :param paperless: Initialized Paperless client.
    :param r2r_client_url: The R2R API base URL.
    :param batch_size: The batch size for processing documents.
    """
    # await paperless.initialize()

    try:
        # Initialize R2R client
        await paperless.initialize()
        r2r_client = R2RClient(r2r_client_url)

        # Fetch Paperless tags
        paperless_tags = await getTags(paperless)

        # Fetch R2R collections
        r2r_collections = r2r_client.collections.list(offset=0, limit=1000)["results"]
        r2r_collection_map = {collection["name"]: collection["id"] for collection in r2r_collections}

        # Process tags concurrently
        await asyncio.gather(
            *(
                sync_tag(
                    paperless, r2r_client, tag_id, tag_name, r2r_collection_map, batch_size
                )
                for tag_id, tag_name in paperless_tags
            )
        )
        await paperless.close()
    except Exception as e:
        print(f"Critical error in sync_paperless_to_r2r: {e}")
    # finally:
    #     await paperless.close()



# async def get_r2r_document_id_from_paperless(paperless, docid, custom_field_name):
#     """
#     Fetch the `R2RDocumentID` custom field's value for a Paperless document.
    
#     :param paperless: Initialized Paperless client.
#     :param docid: Document ID in Paperless.
#     :param custom_field_name: The name of the custom field to check.
#     :return: The value of the custom field (if it exists) or None.
#     """
#     try:
#         document = await paperless.documents(docid)
#         custom_fields = document.custom_fields
#         for field in custom_fields:
#             if field["name"] == custom_field_name:
#                 return field["value"]
#     except KeyError:
#         print(f"Custom field '{custom_field_name}' not found for document {docid}.")
#     except Exception as e:
#         print(f"Error fetching custom field for document {docid}: {e}")
    
#     return None
async def get_r2r_document_id_from_paperless(paperless, docid, custom_field_name):
    try:
        # Fetch the document by docid
        document = await paperless.documents(docid)
        
        # Iterate through custom fields to find the one with the specified name
        for custom_field in document.custom_fields:
            if custom_field.field == custom_field_name:
                return custom_field.value
        
        # If the custom field is not found, return None
        return None
    except Exception as e:
        print(f"Error fetching custom field for document {docid}: {e}")
        return None

In [45]:
async def update_custom_field(paperless, docid, custom_field_name, custom_field_value):
    try:
        # Fetch the document by docid
        document = await paperless.documents(docid)
        
        # Retrieve the custom field definitions to get the field ID
        custom_fields_definitions = await paperless.custom_fields.list()
        field_id = None
        for field_def in custom_fields_definitions:
            if field_def.name == custom_field_name:
                field_id = field_def.id
                break
        if field_id is None:
            print(f"Custom field '{custom_field_name}' not found.")
            return
        
        # Update existing custom field or add a new one
        custom_field_updated = False
        for custom_field in document.custom_fields:
            if custom_field.field == field_id:
                custom_field.value = str(custom_field_value)
                custom_field_updated = True
                break
        if not custom_field_updated:
            # Append the new custom field
            document.custom_fields.append({
                "field": field_id,
                "value": str(custom_field_value)
            })
        
        # Save the changes
        success = await document.update()
        
        if success:
            print(f"Successfully updated document {docid} with custom field '{custom_field_name}' = {custom_field_value}")
        else:
            print(f"Failed to update document {docid}")
    except Exception as e:
        print(f"Error updating custom field for document {docid}: {e}")

In [43]:
# Main program execution
async def run():
    paperless_url = "https://paperless.escaffinity.com"
    paperless_api_key = "0dad9947edf3a041e4e847160619213d908e3310"
    r2r_url = "https://r2r.escaffinity.com"
    paperless = Paperless(paperless_url, paperless_api_key)
    #await paperless.initialize()
    # try:
    await sync_paperless_to_r2r(paperless, r2r_url, batch_size=20)
    # finally:
    #     await paperless.close()

nest_asyncio.apply()
# Entry point
if __name__ == "__main__":
    asyncio.run(run())



2024-12-18 13:41:17,748 - INFO - pypaperless[paperless.escaffinity.com] - Initialized.


Skipping tag 'Personal'.
Processing tag 'Administrative'...
Processing tag 'Agile\Agility'...
Processing tag 'Ben Franklin'...
Processing tag 'CAD/CAM'...
Processing tag 'CAM\CIM Labs'...
Processing tag 'CELDi'...
Processing tag 'Collaboratory'...
Processing tag 'Coursework'...
Processing tag 'Focus Hope'...
Processing tag 'Iacocca Institute'...
Processing tag 'IBM Research Program'...
Processing tag 'ILR'...
Processing tag 'INFORMS'...
Processing tag 'Interdisciplinary Programs'...
Processing tag 'Leadership'...
Processing tag 'Lehigh EXPOs'...
Processing tag 'Manufacturing'...
Processing tag 'NSLS'...
Processing tag 'PMFI'...
Processing tag 'UEDA'...
Skipping tag 'Administrative': Document counts match (1).
Skipping tag 'Interdisciplinary Programs': Document counts match (0).
Skipping tag 'UEDA': Document counts match (0).
Skipping tag 'PMFI': Document counts match (0).
Document counts differ for tag 'NSLS': Paperless=3, R2R=2. Processing...
Skipping tag 'INFORMS': Document counts ma

KeyboardInterrupt: 

In [46]:

#test program
async def run():
    paperless_url = "https://paperless.escaffinity.com"
    paperless_api_key = "0dad9947edf3a041e4e847160619213d908e3310"
    r2r_url = "https://r2r.escaffinity.com"
    r2r_client = R2RClient(r2r_url)
    paperless = Paperless(paperless_url, paperless_api_key)
    await paperless.initialize()
    try:
        await backfill_r2r_ids_to_paperless(paperless, r2r_client, custom_field_name='document_id')
    except Exception as e:
        print(f"Error processing document: {e}")
    finally:
        await paperless.close()

nest_asyncio.apply()
# Entry point
if __name__ == "__main__":
    asyncio.run(run())

2024-12-18 13:53:58,655 - INFO - pypaperless[paperless.escaffinity.com] - Initialized.


Fetching all documents from R2R...
Updating Paperless document 2755 with R2R document ID 81f2e520-faae-5665-b0ad-6f0b9377f77f...
Error updating custom field for document 2755: 'CustomFieldHelper' object has no attribute 'list'
Updating Paperless document 865 with R2R document ID dcca8c33-69fc-5713-89e8-d50643af85bf...
Error updating custom field for document 865: 'CustomFieldHelper' object has no attribute 'list'
Updating Paperless document 866 with R2R document ID a2e17db8-5c07-521c-86f5-32a69374b814...
Error updating custom field for document 866: 'CustomFieldHelper' object has no attribute 'list'
Updating Paperless document 5344 with R2R document ID f9870517-2505-5c0e-9267-c2658a3a256f...
Error updating custom field for document 5344: 'CustomFieldHelper' object has no attribute 'list'
Updating Paperless document 5159 with R2R document ID 5df7ff9b-8e0c-5eaa-af63-4a56249eea99...
Error updating custom field for document 5159: 'CustomFieldHelper' object has no attribute 'list'
Updating 

2024-12-18 13:54:02,987 - INFO - pypaperless[paperless.escaffinity.com] - Closed.


Error updating custom field for document 7708: 'CustomFieldHelper' object has no attribute 'list'
Updating Paperless document 9173 with R2R document ID 3990327c-aad7-5a30-8cdf-6aba1a999ea3...
Error updating custom field for document 9173: 'CustomFieldHelper' object has no attribute 'list'
Updating Paperless document 7389 with R2R document ID e7e0f2c4-7164-5a9e-b20d-1c13e559f9e5...


KeyboardInterrupt: 

In [39]:
async def ensure_custom_field_exists(paperless, custom_field_name):
    """
    Ensure that the custom field exists in Paperless. If it doesn't, create it.

    :param paperless: Initialized Paperless client.
    :param custom_field_name: The name of the custom field.
    :return: The ID of the custom field.
    """
    try:
        # Fetch existing custom fields
        response = await paperless.get_custom_fields()
        custom_fields = response.get("results", [])

        # Check if the custom field already exists
        for field in custom_fields:
            if field["name"] == custom_field_name:
                return field["id"]

        # If the custom field doesn't exist, create it
        print(f"Custom field '{custom_field_name}' does not exist. Creating it...")
        payload = {
            "name": custom_field_name,
            "type": "string"  # You may need to adjust this type based on your use case
        }
        created_field = await paperless.create_custom_field(payload)
        return created_field["id"]

    except Exception as e:
        print(f"Error ensuring custom field '{custom_field_name}' exists: {e}")
        raise




In [41]:
async def update_custom_field(paperless, docid, custom_field_name, custom_field_value):
    try:
        # Fetch the document by docid
        document = await paperless.documents(docid)
        
        # Update the custom fields list
        document.custom_fields = [
            {
                "value": custom_field_value,
                "field": custom_field_name
            }
        ]
        
        # Save the changes
        success = await document.update()
        
        if success:
            print(f"Successfully updated document {docid} with custom field {custom_field_name} = {custom_field_value}")
        else:
            print(f"Failed to update document {docid}")
    except Exception as e:
        print(f"Error updating custom field for document {docid}: {e}")

async def backfill_r2r_ids_to_paperless(paperless, r2r_client, custom_field_name='document_id'):
    try:
        print("Fetching all documents from R2R...")
        offset = 0
        limit = 100  # Fetch documents in batches to avoid overwhelming the connection
        has_more = True

        while has_more:
            # Fetch a batch of documents from R2R
            response = r2r_client.documents.list(offset=offset, limit=limit)
            r2r_documents = response.get("results", [])
            
            if not r2r_documents:
                print("No more documents found in R2R.")
                break

            # Iterate through R2R documents and update Paperless
            for r2r_doc in r2r_documents:
                try:
                    r2r_document_id = r2r_doc.get("id")
                    paperless_id = r2r_doc["metadata"].get("PaperlessID")
                    
                    if not paperless_id:
                        print(f"Skipping R2R document {r2r_document_id}: Missing 'PaperlessID' metadata.")
                        continue

                    # Update the corresponding Paperless document using the existing update_custom_field function
                    print(f"Updating Paperless document {paperless_id} with R2R document ID {r2r_document_id}...")
                    await update_custom_field(paperless, paperless_id, custom_field_name, r2r_document_id)
                except Exception as e:
                    print(f"Error updating Paperless for R2R document {r2r_doc.get('id')}: {e}")

            # Update offset for the next batch
            offset += limit
            has_more = len(r2r_documents) == limit

        print("Backfilling complete.")
    except Exception as e:
        print(f"Critical error in backfill_r2r_ids_to_paperless: {e}")



In [42]:
async def run():
    paperless_url = "https://paperless.escaffinity.com"
    paperless_api_key = "0dad9947edf3a041e4e847160619213d908e3310"
    r2r_url = "https://r2r.escaffinity.com"
    r2r_client = R2RClient(r2r_url)
    paperless = Paperless(paperless_url, paperless_api_key)
    await paperless.initialize()
    try:
        await backfill_r2r_ids_to_paperless(paperless, r2r_client, custom_field_name='document_id')
    except Exception as e:
        print(f"Error processing document: {e}")
    finally:
        await paperless.close()

nest_asyncio.apply()
# Entry point
if __name__ == "__main__":
    asyncio.run(run())

2024-12-18 13:40:40,477 - INFO - pypaperless[paperless.escaffinity.com] - Initialized.


Fetching all documents from R2R...
Updating Paperless document 866 with R2R document ID a2e17db8-5c07-521c-86f5-32a69374b814...
Error updating custom field for document 866: Paperless [custom_fields -> field]: Incorrect type. Expected pk value, received str.
Updating Paperless document 5344 with R2R document ID f9870517-2505-5c0e-9267-c2658a3a256f...
Error updating custom field for document 5344: Paperless [custom_fields -> field]: Incorrect type. Expected pk value, received str.
Updating Paperless document 5159 with R2R document ID 5df7ff9b-8e0c-5eaa-af63-4a56249eea99...
Error updating custom field for document 5159: Paperless [custom_fields -> field]: Incorrect type. Expected pk value, received str.
Updating Paperless document 7184 with R2R document ID 543d4ca3-374c-5a6e-8a2b-30677c987800...
Error updating custom field for document 7184: Paperless [custom_fields -> field]: Incorrect type. Expected pk value, received str.
Updating Paperless document 5469 with R2R document ID 25f576a9-

2024-12-18 13:40:44,338 - INFO - pypaperless[paperless.escaffinity.com] - Closed.


Error updating custom field for document 7405: Paperless [custom_fields -> field]: Incorrect type. Expected pk value, received str.
Updating Paperless document 7424 with R2R document ID feb8e987-4de6-5730-a420-4d43dd1f59b6...


KeyboardInterrupt: 

In [37]:
#testing getting custom_fields
async def run():
    paperless_url = "https://paperless.escaffinity.com"
    paperless_api_key = "0dad9947edf3a041e4e847160619213d908e3310"
    r2r_url = "https://r2r.escaffinity.com"
    r2r_client = R2RClient(r2r_url)
    paperless = Paperless(paperless_url, paperless_api_key)
    document_id = "f9870517-2505-5c0e-9267-c2658a3a256f"
    await paperless.initialize()
    try:
        document = await paperless.documents(5344)
        custom_fields = document.custom_fields
        print(custom_fields)
        document.custom_fields = [
        {
            "value": document_id,
            "field": 3
        }]
        success = await document.update()
    except Exception as e:
        print(f"Error processing document: {e}")
    finally:
        await paperless.close()

nest_asyncio.apply()
# Entry point
if __name__ == "__main__":
    asyncio.run(run())

2024-12-18 13:29:20,731 - INFO - pypaperless[paperless.escaffinity.com] - Initialized.
2024-12-18 13:29:20,958 - INFO - pypaperless[paperless.escaffinity.com] - Closed.


[CustomFieldValueType(field=3, value='f9870517-2505-5c0e-9267-c2658a3a256f')]
