# UNSTRUCTURED DATA INTEGRATION

The `ibm-udi` SDK for Python enables developers and data engineers to programmatically interact with the `unstructured data integration service`, IBM’s next-generation data integration platform built to support modern, hybrid-cloud data pipelines.

With this SDK, users can automate and manage Flow lifecycles such as creating, configuring, starting, stopping, and monitoring Flows directly in code.

## 0. INSTALL UDI PACKAGE

Install the required packages.

In [None]:
# Update wheel file location below and install required packages
%pip install ibm-udi
%pip list | grep udi
%pip install requests
%pip install datetime
%pip onstall pydantic

# 1. Create a client for Unstructured Data Integration 


### Authentication Requirements

To create a `UDIClient`, **at least one** of the following authentication methods is required depending on the environment:

To create a `UDIClient`, **at least one** of the following authentication methods is required depending on the environment:

1. **For CPD -** 
    * Bearer Token
    * Username and Password
    * Username and API Key
2. **For SaaS -** 
    * Bearer Token
    * API Key
3. **For AWS -**
    * Bearen Token
    * API Key

### Base URL

You must also provide the URL of your environment:

- For SaaS you require an api path
  
  Example: `https://api.ca-tor.dai.cloud.ibm.com/`

- For **Cloud Pak for Data (Watsonx)**:  
  
  Example → `https://<cp4d-cluster-hostname>`

This will serve as the `base_url` to authenticate and manage your Unstructured data integration flows.


### Create a New Project

To work with Unstructured data integration flows, you need a project:

1. Create a new project in your Watsonx or SaaS.
2. Allocate the required **storage** for the project.
3. Once created, obtain the `project_id` from the **project URL**.  
   For example:  

https://..../projects/<project_id>/...

4. Import the documents you wish to ingest into the flow into your project and keep a note of their Asset ID(s) as it will be required for defining the flow operators for pipeline generation.

Once these elements are in place, you're ready to instantiate your `UDIClient` and start building and managing Unstructured data integration flows.

In [None]:
from udi import UDIClient
from udi.utils import get_attributes, get_features
import json
from datetime import datetime

In [None]:
import logging
import sys

# Configure logging to display in notebook
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)


In [None]:
# env options: cpd | cloud-prod | mcsp-prod
config = {
    'base_url' : "<ENTER-URL>",
    'token' : None,
    'project_id' : "<ENTER-PROJECT-ID>",
    'user_name' : None,
    'password' : None,
    'api_key' : None,
    'env' : "cpd"
}

uc = UDIClient(config=config)

# 2. Get List of Available Operators

This step helps present a **structured view** of all available operators, grouped by functionality.


In [None]:
ordered_operators = uc.get_available_operators()
print(json.dumps(ordered_operators, indent=2))

# 3. Get all Operator Metadata

This is required to build the operator. From this all operator names and their attributes, features and required values can be retrieved.


In [None]:
# Get all Operator Metadata
metadata = uc.get_metadata()
metadata

# 4. Get Operator-Specific Details

Fetch detailed metadata for a specific operator to view its attributes and features. This is essential for correctly building and validating flow components.


## 4.1 Get Attributes


In [None]:
operator_attributes = get_attributes(metadata, "extract_cpd")
print(json.dumps(operator_attributes, indent=2))

## 4.2. Get operator specific features

In [None]:
operator_features = get_features(metadata, "extract_cpd")
print(json.dumps(operator_features, indent=2))

# 5. Generate Pipeline for Execution

## 5.1 Create a sequence of operators based on operator metadata

The `operators` list defines the full sequence of steps your UDI flow will follow — from data ingestion to vector storage.

### How to Use It

You will need to update two main parts in the `operators` list:

- An **Asset List** is expected to be created by adding the asset_id(s) to the **data_assets** list which will be further used to retrieve the asset information. `(Optional - To Be Provided only for ingest_cpd_assets operator)`

-  **Connection** details in the `milvusdb_cp4d` operator  

### The other operators can be added or removed based on your requirements.

Example: the given operator list does not include the `sql_filter` operator. This can be included if you wish to filter your documents based on some constraints.


In [None]:
# list of input data assets
data_assets = ['<ENTER-YOUR-ASSET-ID(s)>']

In [None]:
asset_info = []

for data in data_assets:
    try:
        asset = uc.get_data_asset(data)
        # print(asset)
        asset_info.append(asset)
    except Exception as e:
        print(e)

print("Final collected assets:", asset_info)


In [None]:
operators = [
    {
        "type": "ingest_cpd_assets",
        "parameters": {
            "input_assets": asset_info
        }
    },
    {
        "type": "extract_cpd",
        "parameters": {
            "ocr_mode": "enabled",           # Available Values: [enabled, disabled, forced]    
            "extract_entity": True,           
            "custom_schema": "disabled"        
        }
    },
    {
        "type": "lang_detect"
    },
    {
        "type": "chunker",
        "parameters": {
            'chunk_type': 'watsonx',
            'chunk_size': 4000,
            'chunk_overlap': 200
        }
    },
    {
        "type": "embeddings",
        "parameters": {
            "embeddings_type": "watsonx",
            "embeddings_model_id": "<ENTER-MODEL-ID>"  # Example: ibm/slate-30m-english-rtrvr
        }
    },
    {
        "type": "milvusdb_cp4d",
        "parameters": {
            "connection_name": "<ENTER-CONNECTION-NAME>",
            "connection_id": "<ENTER-CONNECTION-ID>",
            "collection_name": "<ENTER-COLLECTION-NAME>",   # Provide a new collection name
            "milvus_feature_mappings": [
                {
                    "feature_name": "name",
                    "mapped_column_name": "document_name"
                },
                {
                    "feature_name": "doc_id_hash",
                    "mapped_column_name": "pk"
                },
                {
                    "feature_name": "id",
                    "mapped_column_name": "document_id"
                },
                {
                    "feature_name": "embeddings",
                    "mapped_column_name": "vector_embeddings"
                },
                {
                    "feature_name": "sparse_embeddings",
                    "mapped_column_name": "sparse_embeddings"
                },
                {
                    "feature_name": "content",
                    "mapped_column_name": "text"
                }
            ]
        }
    },
    {
        "type": "document_set",
        "parameters": {
            "cp4d_connection_id": "<ENTER-YOUR-CONNECTION-ID>",
            "catalog_name": "<ENTER-YOUR-CATALOG-NAME>",
            "schema_name": "<ENTER-YOUR-SCHEMA-NAME>",
            "connection_name": "<ENTER-YOUR-CONNECTION-NAME>",
            "document_set_name": "<ENTER-YOUR-DOCUMENT-SET-NAME>",
            "document_set_description": "The output document set",
            "table_name": "<ENTER-YOUR-TABLE-NAME>"
        }
    }     
]

# Replace the ingest above with below examples for different Ingest Types
# {
#     "type": "ingest_cpd_connections",
#         "parameters": 
#         {
#             "connection_id": "409fe4ee-d80f-4f94-a9bc-6585660c6544",
#             "paths": [
#                 "/tm-wkc-storage-1/2_small_files",
#                  "/tm-wkc-storage-1/8_pdf_small_files",
#                 "/tm-wkc-storage-1/_PLT_Demo_/test_invoice_01.pdf"
#             ],
#             "include_filter": 
#             [
#                 "pdf", "txt","md"
#             ],
#             "max_file_size": 100,
#             "max_files": 100
#         }
# }

# {
#       "type": "ingest_document_set",
#          "parameters": 
#           {
#              "document_set_id":"128e2fdf-e109-4bac-931f-2f8e9702ce4a",
#              "input_assets": 
#               {
#                  "document_set_name": "shivani_doc_set_11",
#                  "created_on": "2025-05-16T13:33:25Z",
#                  "document_set_id":"128e2fdf-e109-4bac-931f-2f8e9702ce4a"
#               }
#           }
# }

## 5.2 CREATE PIPELINE

Use the configured `operators` list to create the full pipeline dictionary, including the `flow_name`, `project_id`, `orchestrator`, and `global_config`. Set the `orchestrator` to `python` for small to medium flows, or `spark` if you're working with large documents.

global_config is used to keep reusable pipeline settings (like storage paths and types) separate from the code. It injects the values into the pipeline so downstream operators know where to store outputs or which storage method to use.


In [None]:
global_config = {
        "data_local_config": {
                "output_folder": "./test/flows/output"
        },
        "data_storage_type": "local"
}

In [None]:
flow_name = f"SDK_FLOW_{datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}"
pipeline = {
    "flow_name": flow_name,
    "project_id": config.get('project_id'),
    "orchestrator": "python",
    "flow": operators,
    "global_config": global_config
}
print("pipeline : ",json.dumps(pipeline, indent=2))


# 6. Create and Run Flow


Use the `Flow` class to create and run the pipeline by passing the `UDIClient` instance. Once executed, you can monitor its status and retrieve logs. Optional methods like `cancel()` and `delete()` are also available for managing the flow lifecycle.

In [None]:
from udi.flows import Flow
flow = Flow(uc)

try:
    flow.create(pipeline=pipeline)
    flow.run()
    print("Status:", flow.status())
    # flow.cancel()
    # flow.delete()
except Exception as e:
    print("Error:", e)

In [None]:
# Use this to get the execution logs of the flow
print("Logs:", flow.logs())

## 7.1 Poll Execution Status


In [None]:
flow.poll_flow_status()