In [1]:
import os
import time
from google.cloud import documentai_v1beta3 as documentai
from google.cloud import storage
from google.api_core.client_options import ClientOptions

In [2]:
# --------------- CONFIGURATION -------------------
PROJECT_ID = "prj-app-wh-dev"
LOCATION = "us"  # or "eu" depending on your processor location

PROCESSOR_ID = 'de482f2a75ceb96'
# Provide paths
LOCAL_DOCUMENTS_DIR = "path_to_documents"
LOCAL_ANNOTATIONS_DIR = "path_to_annotations"  # JSON annotation files

# Schema
SCHEMA_FIELDS = [
    {"name": "Patient Name", "type": "text"},
    {"name": "DOB", "type": "date"},
    {"name": "Medical Record Number", "type": "number"},
    # Add more fields according to your form
]

In [13]:
def create_processor(client, display_name):
    parent = f"projects/{PROJECT_ID}/locations/{LOCATION}"
    processor = documentai.Processor(
        type_="CUSTOM_EXTRACTION_PROCESSOR",  # Very important
        display_name=display_name,
    )
    response = client.create_processor(parent=parent, processor=processor)
    print(f"Processor created: {response.name}")
    return response.name


client = documentai.DocumentProcessorServiceClient()
processor_name = create_processor(client, display_name="automation_test_processor")
processor_id = processor_name.split("/")[-1]





Processor created: projects/905161924890/locations/us/processors/de482f2a75ceb96


In [7]:
URIS = [
    "gs://prj-app-wh-dev-backend-data/docai/1736724814.pdf",
    "gs://prj-app-wh-dev-backend-data/docai/Sabbagh, Dalal.pdf",
]

In [1]:
dataset_path = 'prj-app-wh-dev-backend-data/docai_samples'

In [None]:
URIS = [
    "gs://prj-app-wh-dev-backend-data/docai/1736724814.pdf",
    "gs://prj-app-wh-dev-backend-data/docai/Sabbagh, Dalal.pdf",
]

In [5]:
# from google.cloud import documentai_v1beta3 as documentai
# from google.api_core.client_options import ClientOptions

# Your settings
project_id = "prj-app-wh-dev"
location = "us"  # based on your endpoint
processor_id = "de482f2a75ceb96"
gcs_uri_prefix = "gs://prj-app-wh-dev-backend-data/docai_dataset/"

# Setup the endpoint correctly
client_options = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

# Initialize the Document AI client
client = documentai.DocumentServiceAsyncClient(client_options=client_options)

# Build the full dataset resource name
dataset_name = f"projects/{project_id}/locations/{location}/processors/{processor_id}/dataset"

# Prepare the dataset configuration
dataset = documentai.Dataset(
    name=dataset_name,
    gcs_managed_config=documentai.Dataset.GCSManagedConfig(
        gcs_prefix=documentai.GcsPrefix(
            gcs_uri_prefix=gcs_uri_prefix
        )
    ),
    spanner_indexing_config=documentai.Dataset.SpannerIndexingConfig()
)

# Prepare the update request
update_request = documentai.UpdateDatasetRequest(
    dataset=dataset
)

# Call the update_dataset API
operation = client.update_dataset(request=update_request)

response = (await operation).result()





In [None]:
from google.cloud import documentai_v1 as documentai
from google.api_core.client_options import ClientOptions

# Your settings
PROJECT_ID = "PROJECT_ID"
LOCATION = "LOCATION"  # based on your endpoint
PROCESSOR_ID = "PROCESSOR_ID"
gcs_uri_prefix = "GCS_URI"

# Setup the endpoint correctly
client_options = ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")

# Initialize the Document AI client
client = documentai.DocumentServiceAsyncClient(client_options=client_options)

# Build the full dataset resource name
dataset_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/processors/{PROCESSOR_ID}/dataset"

# Prepare the dataset configuration
dataset = documentai.Dataset(
    name=dataset_name,
    gcs_managed_config=documentai.Dataset.GCSManagedConfig(
        gcs_prefix=documentai.GcsPrefix(
            gcs_uri_prefix=gcs_uri_prefix
        )
    ),
    spanner_indexing_config=documentai.Dataset.SpannerIndexingConfig()
)

# Prepare the update request
update_request = documentai.UpdateDatasetRequest(
    dataset=dataset
)

# Call the update_dataset API
operation = client.update_dataset(request=update_request)

response = (await operation).result()



In [8]:
type(response)

coroutine

In [8]:
def create_schema(project_id, location, processor_id, schema_def):
    client_options = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    client = documentai.DocumentServiceClient(client_options=client_options)

    # Parent
    parent = f"projects/{project_id}/locations/{location}/processors/{processor_id}/dataset/schema"

    # Define the schema (mapping user inputs)
    schema = documentai.DatasetSchema(
        document_schema=documentai.DocumentSchema(
            display_name="Custom Schema",
            entity_types=[
                documentai.DocumentSchema.EntityType(
                    display_name=field["name"],
                    entity_type=field["type"],
                    properties=[],  # Optional: sub-fields
                    base_types=[],  # Optional
                    # Auto-extend this if your schema has nested fields
                )
                for field in schema_def
            ]
        )
    )

    request = documentai.UpdateDatasetSchemaRequest(
        dataset_schema=schema,
        name=parent,
    )

    updated_schema = client.update_dataset_schema(request=request)
    print(f"Schema created successfully!")
    return updated_schema

res = create_schema(PROJECT_ID, LOCATION, PROCESSOR_ID, SCHEMA_FIELDS)



ValueError: Unknown field for EntityType: entity_type

In [3]:
from google.cloud.documentai_v1 import DocumentProcessorServiceClient
from google.cloud.documentai_v1.types import DocumentSchema, Processor
from google.protobuf.field_mask_pb2 import FieldMask

def set_schema(client, project_id, location, processor_id, schema_fields):
    parent = client.processor_path(project_id, location, processor_id)

    # 1) Build your list of Property messages
    properties = []
    for field in schema_fields:
        prop = DocumentSchema.EntityType.Property(
            name=field["name"].lower().replace(" ", "_"),
            display_name=field["name"],
            value_type=field["type"],  # just a string
            occurrence_type=(
                DocumentSchema.EntityType.Property.OccurrenceType.OPTIONAL_ONCE
            )
        )
        properties.append(prop)

    # 2) Create a single top-level EntityType with those properties
    entity_type = DocumentSchema.EntityType(
        name="custom_extraction_document_type",
        display_name="Custom Extraction Document Type",
        base_types=["document"],
        properties=properties
    )

    # 3) Wrap it into a DocumentSchema
    return DocumentSchema(entity_types=[entity_type])

    # # 4) Send it to the API
    # mask = FieldMask(paths=["document_schema"])
    # client.update_processor(
    #     processor=Processor(name=parent, document_schema=schema),
    #     update_mask=mask,
    # )

    # print("Schema updated successfully.")


client = DocumentProcessorServiceClient()
schema = set_schema(client, PROJECT_ID, LOCATION, PROCESSOR_ID, SCHEMA_FIELDS)



In [None]:
def create_schema(project_id, location, processor_id, schema_def):
    client_options = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    client = documentai.DocumentServiceClient(client_options=client_options)

    # Parent
    parent = f"projects/{project_id}/locations/{location}/processors/{processor_id}/dataset/schema"

    # Define the schema (mapping user inputs)
    schema = documentai.DatasetSchema(
        document_schema=documentai.DocumentSchema(
            display_name="Custom Schema",
            entity_types=[
                documentai.DocumentSchema.EntityType(
                    display_name=field["display_name"],
                    entity_type=field["entity_type"],
                    properties=[],  # Optional: sub-fields
                    base_types=[],  # Optional
                    # Auto-extend this if your schema has nested fields
                )
                for field in schema_def
            ]
        )
    )

    request = documentai.UpdateDatasetSchemaRequest(
        dataset_schema=schema,
        name=parent,
    )

    updated_schema = client.update_dataset_schema(request=request)
    print(f"Schema created successfully!")
    return updated_schema


In [None]:

# ---------------------------------------------------


def create_processor(client, display_name):
    parent = f"projects/{PROJECT_ID}/locations/{LOCATION}"
    processor = documentai.Processor(
        type_="CUSTOM_DOCUMENT_EXTRACTOR_PROCESSOR",  # Very important
        display_name=display_name,
    )
    response = client.create_processor(parent=parent, processor=processor)
    print(f"Processor created: {response.name}")
    return response.name


def set_schema(client, processor_id, schema_fields):
    schema_parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/processors/{processor_id}"

    fields = []
    for field in schema_fields:
        field_schema = documentai.DocumentSchema.EntityType(
            display_name=field["name"],
            entity_type=field["name"].lower().replace(" ", "_"),
            value_type=documentai.DocumentSchema.EntityType.ValueType.TEXT
            if field["type"] == "text"
            else documentai.DocumentSchema.EntityType.ValueType.DATE
            if field["type"] == "date"
            else documentai.DocumentSchema.EntityType.ValueType.NUMBER,
        )
        fields.append(field_schema)

    schema = documentai.DocumentSchema(entity_types=fields)
    request = documentai.UpdateProcessorRequest(
        processor=documentai.Processor(name=schema_parent, document_schema=schema),
        update_mask={"paths": ["document_schema"]}
    )
    client.update_processor(request=request)
    print("Schema set successfully.")


def upload_to_gcs(local_path, gcs_path):
    storage_client = storage.Client()
    bucket = storage_client.bucket(GCS_BUCKET)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)
    print(f"Uploaded {local_path} to gs://{GCS_BUCKET}/{gcs_path}")


def batch_import_documents(client, processor_id, gcs_documents_prefix, gcs_annotations_prefix):
    dataset_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/processors/{processor_id}/dataset"

    input_config = documentai.BatchDocumentsInputConfig(
        gcs_prefix=documentai.GcsPrefix(gcs_uri_prefix=f"gs://{GCS_BUCKET}/{gcs_documents_prefix}")
    )

    import_documents_request = documentai.ImportDocumentsRequest(
        dataset=dataset_name,
        batch_documents_input_config=input_config,
        document_schema=None,  # Optional if already set schema
        import_config=documentai.ImportDocumentsRequest.ImportConfig(
            gcs_source=f"gs://{GCS_BUCKET}/{gcs_annotations_prefix}",
            mime_type="application/json"
        )
    )

    operation = client.import_documents(request=import_documents_request)
    print("Importing documents... (This might take a while)")
    operation.result(timeout=600)
    print("Documents imported successfully.")


def train_processor(client, processor_id):
    parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/processors/{processor_id}"
    train_request = documentai.TrainProcessorVersionRequest(
        parent=parent,
        processor_version=documentai.ProcessorVersion(
            display_name="v1",
            document_schema=None  # Already assigned
        )
    )

    operation = client.train_processor_version(request=train_request)
    print("Training started...")
    result = operation.result(timeout=3600)  # up to 1 hour
    print("Training completed.")
    print(f"Trained Processor Version: {result.name}")


def main():
    client = documentai.DocumentProcessorServiceClient()

    # 1. Create Processor
    processor_name = create_processor(client, display_name="User Custom Processor")
    processor_id = processor_name.split("/")[-1]

    # 2. Set Schema
    set_schema(client, processor_id, SCHEMA_FIELDS)

    # 3. Upload Documents and Annotations
    for file_name in os.listdir(LOCAL_DOCUMENTS_DIR):
        upload_to_gcs(
            os.path.join(LOCAL_DOCUMENTS_DIR, file_name),
            f"documents/{file_name}"
        )

    for file_name in os.listdir(LOCAL_ANNOTATIONS_DIR):
        upload_to_gcs(
            os.path.join(LOCAL_ANNOTATIONS_DIR, file_name),
            f"annotations/{file_name}"
        )

    # 4. Import documents to Processor Dataset
    batch_import_documents(client, processor_id, "documents", "annotations")

    # 5. Train Processor
    train_processor(client, processor_id)


if __name__ == "__main__":
    main()




ValueError: Unknown field for EntityType: type_