# Getting Started with Unstructured API and IBM watsonx.data
[Unstructured](https://unstructured.io) is an ETL+ platform purpose-built for preprocessing unstructured data for GenAI and retrieval-based applications. It helps teams:

* Connect to a wide range of enterprise systems—from cloud storage providers like Azure Blob Storage or Amazon S3, to collaboration platforms like Confluence and Dropbox, to business tools like Salesforce, Jira, and more.
* Continuously ingest data from these systems in a scalable, automated way.
* Preprocess the raw content using a unified, modular pipeline: partitioning, enriching, chunking, and embedding your documents in a consistent format.
* Output clean, structured results into your downstream stack—such as a vector database, search engine, or data warehouse.

You can manage connectors and workflows via the Unstructured UI or the headless API.

In this hands-on notebook, we’ll walk through how to use the Unstructured Python SDK to define and run a full data processing workflow—taking unstructured files from **Azure Blob Storage** and landing the structured output into **IBM watsonx.data**.

While we’re using Azure Blob Storage as our data source, you can substitute any of the [supported sources](https://docs.unstructured.io/api-reference/workflow/sources/overview) to fit your stack. The pipeline we'll build is modular and extensible by design.

So, let's get started!

## Step 1 Install the Unstructured API Python SDK

All functionality available in the the UI of the [Unstructured](https://unstructured.io/) product is also available programmatically via Unstructured API. You can interact with Unstructured API either by sending direct requests via curl or postman, or using Unstructured API [Python SDK](https://docs.unstructured.io/api-reference/workflow/overview#unstructured-python-sdk). Here, we'll be using the latter.


> **Note:**
The Unstructured API has two endpoints:
* The Unstructured Partition Endpoint: intended for rapid prototyping of Unstructured's various partitioning strategies. It works only with processing of local files, one file at a time.
* The Unstructured Workflow Endpoint: enables a full range of partitioning, chunking, embedding, and enrichment options for your data. It is designed to batch-process data from any data source to any destination. This is what we're using in this notebook.


Run the following cell to install the Unstructured API Python SDK.

In [None]:
!pip install --upgrade "unstructured-client>=0.30.6"

Collecting unstructured-client>=0.30.6
  Downloading unstructured_client-0.34.0-py3-none-any.whl.metadata (21 kB)
Collecting aiofiles>=24.1.0 (from unstructured-client>=0.30.6)
  Downloading aiofiles-24.1.0-py3-none-any.whl.metadata (10 kB)
Collecting eval-type-backport>=0.2.0 (from unstructured-client>=0.30.6)
  Downloading eval_type_backport-0.2.2-py3-none-any.whl.metadata (2.2 kB)
Collecting pypdf>=4.0 (from unstructured-client>=0.30.6)
  Downloading pypdf-5.4.0-py3-none-any.whl.metadata (7.3 kB)
Downloading unstructured_client-0.34.0-py3-none-any.whl (189 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m189.4/189.4 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading aiofiles-24.1.0-py3-none-any.whl (15 kB)
Downloading eval_type_backport-0.2.2-py3-none-any.whl (5.8 kB)
Downloading pypdf-5.4.0-py3-none-any.whl (302 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m302.3/302.3 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling 

## Step 2: Create all Connectors

Firstly, let's set up the Unstructured API. For this, you will reqiure an Unstructured API Key.

[Learn how to get one](https://docs.unstructured.io/platform-api/api/overview).

In [None]:
import os
import time
from google.colab import userdata
from unstructured_client import UnstructuredClient

os.environ["UNSTRUCTURED_API_KEY"] = userdata.get("UNSTRUCTURED_API_KEY")
client = UnstructuredClient(api_key_auth=os.environ["UNSTRUCTURED_API_KEY"])

# helper function to format outputs
def pretty_print_model(response_model):
    print(response_model.model_dump_json(indent=4))

### Create Microsoft Azure Blob Storage Source Connector

You'll need an **Azure account** with access to **Azure Blob Storage**, along with your **storage account name** and a **shared access signature (SAS)** token for authentication. Make sure you've created a **container** within your storage account and that it has the appropriate access permissions. Upload a few files to your blob container so there's something to play with! 😉 Take a look at [this list](https://docs.unstructured.io/api-reference/supported-file-types) of supported file types and [this video](https://www.youtube.com/watch?time_continue=211&v=Vl3KCphlh9Y&embeds_referring_euri=https%3A%2F%2Fdocs.unstructured.io%2F&source_ve_path=MjM4NTE) on how you can set yours up.


In [None]:
os.environ['AZURE_REMOTE_URL'] = userdata.get('AZURE_REMOTE_URL')
os.environ['AZURE_ACCOUNT_NAME'] = userdata.get('AZURE_ACCOUNT_NAME')
os.environ['AZURE_SAS_TOKEN'] = userdata.get('AZURE_SAS_TOKEN')

In [None]:
from unstructured_client.models.operations import CreateSourceRequest
from unstructured_client.models.shared import (
    CreateSourceConnector,
    SourceConnectorType,
    AzureSourceConnectorConfigInput
)


response = client.sources.create_source(
    request=CreateSourceRequest(
        create_source_connector=CreateSourceConnector(
            name="azure_souce_connector",
            type=SourceConnectorType.AZURE,
            config=AzureSourceConnectorConfigInput(
                remote_url=os.environ['AZURE_REMOTE_URL'],
                recursive=True,
                account_name=os.environ['AZURE_ACCOUNT_NAME'],
                sas_token=os.environ['AZURE_SAS_TOKEN']

            )
        )
    )
)

source_connector_id = response.source_connector_information.id

### Create IBM WatsonX Destination Connector


To use IBM watsonx.data as your destination in a Unstructured workflow, you’ll need a few components configured ahead of time. This setup allows Unstructured to push processed, structured content directly into a table within your watsonx.data environment.

Here’s what you’ll need to have ready:

- An **IBM Cloud account** with an active **watsonx.data instance**
- An **IBM Cloud API key** for authentication
- A **Cloud Object Storage (COS)** instance, including:
  - A target **bucket** for data storage
  - The **public endpoint**, **region**, and **bucket name**
  - **HMAC credentials** (access key ID and secret key)
- An **Apache Iceberg catalog** associated with your watsonx.data instance and linked to the COS bucket
- A **namespace** (also called a schema) and a **target table** inside the catalog
- A column in the table that uniquely identifies records (usually `record_id`)

You’ll also need to configure your catalog in the watsonx.data console via the Infrastructure Manager, ensuring it's:
- Connected to COS with correct access credentials
- Associated with an engine (e.g., IBM Presto)
- Activated and tested with a successful connection status

Make sure your table matches the expected Unstructured output schema—any extra fields in the data that don’t map to table columns will be dropped.

> 📘 **Tip:** For performance, it's recommended to enable regular metadata cleanup on the table using a small Python script (provided in the official docs).

Once your watsonx.data environment is configured, you can create a destination connector using the Python SDK, UI, or API.


| **Key** | **Required** | **Description** |
|--------|------------|----------------|
| `iceberg_endpoint` | ✅ Required | The metastore REST endpoint of the Iceberg catalog (exclude `https://`) |
| `object_storage_endpoint` | ✅ Required | Public endpoint of the COS bucket (exclude `https://`) |
| `object_storage_region` | ✅ Required | Short region ID of the COS bucket (e.g. `us-east`) |
| `iam_api_key` | ✅ Required | API key for your IBM Cloud account |
| `access_key_id` | ✅ Required | HMAC access key ID for the COS instance |
| `secret_access_key` | ✅ Required | HMAC secret access key paired with `access_key_id` |
| `catalog` | ✅ Required | Name of the Iceberg catalog in watsonx.data |
| `namespace` | ✅ Required | The schema (namespace) within the catalog |
| `table` | ✅ Required | Name of the destination table within the namespace |
| `record_id_key` | ❌ Optional | Name of the table column that uniquely identifies records (default: `record_id`) |
| `max_retries` | ❌ Optional | Max upload retries (default: 50; allowed: 2–500) |
| `max_retries_connection` | ❌ Optional | Max connection retries (default: 10; allowed: 2–100) |

> 📘 **Tip:** All endpoint values should be raw (e.g., `s3.us-east.cloud-object-storage.appdomain.cloud`) without any URL schemes (`https://`).

To review the full configuration steps, including the table schema and example connector creation code, refer to the [Unstructured IBM watsonx.data destination documentation](https://docs.unstructured.io/api-reference/workflow/destinations/ibm-watsonxdata).



Fetching all credentials from Secrets!

In [None]:
os.environ["IBM_WX_ICEBERG_ENDPOINT"] = userdata.get("IBM_WX_ICEBERG_ENDPOINT")
os.environ["IBM_WX_OBJECT_STORAGE_ENDPOINT"] = userdata.get("IBM_WX_OBJECT_STORAGE_ENDPOINT")
os.environ["IBM_WX_OBJECT_STORAGE_REGION"] = userdata.get("IBM_WX_OBJECT_STORAGE_REGION")
os.environ["IBM_WX_IAM_API_KEY"] = userdata.get("IBM_WX_IAM_API_KEY")
os.environ["IBM_WX_ACCESS_KEY_ID"] = userdata.get("IBM_WX_ACCESS_KEY_ID")
os.environ["IBM_WX_SECRET_ACCESS_KEY"] = userdata.get("IBM_WX_SECRET_ACCESS_KEY")
os.environ["IBM_WX_CATALOG"] = userdata.get("IBM_WX_CATALOG")
os.environ["IBM_WX_NAMESPACE"] = userdata.get("IBM_WX_NAMESPACE")
os.environ["IBM_WX_TABLE"] = userdata.get("IBM_WX_TABLE")

In [None]:
from unstructured_client.models.operations import CreateDestinationRequest
from unstructured_client.models.shared import (
    CreateDestinationConnector,
    DestinationConnectorType,
    IbmWatsonxDestinationConnectorConfigInput
)


response = client.destinations.create_destination(
    request=CreateDestinationRequest(
        create_destination_connector=CreateDestinationConnector(
            name=f"IBM watsonx.data destination {time.time()}",
            type=DestinationConnectorType.IBM_WATSONX_S3,
            config=IbmWatsonxDestinationConnectorConfigInput(
                iceberg_endpoint=os.environ["IBM_WX_ICEBERG_ENDPOINT"],
                object_storage_endpoint=os.environ["IBM_WX_OBJECT_STORAGE_ENDPOINT"],
                object_storage_region=os.environ["IBM_WX_OBJECT_STORAGE_REGION"],
                iam_api_key=os.environ["IBM_WX_IAM_API_KEY"],
                access_key_id=os.environ["IBM_WX_ACCESS_KEY_ID"],
                secret_access_key=os.environ["IBM_WX_SECRET_ACCESS_KEY"],
                catalog=os.environ["IBM_WX_CATALOG"],
                namespace=os.environ["IBM_WX_NAMESPACE"],
                table=os.environ["IBM_WX_TABLE"],
                max_retries=50,
                max_retries_connection=10,
                record_id_key="record_id"
            )
        )
    )
)

destination_connector_id = response.destination_connector_information.id

## Step 3: Designing Your Data Workflow

Once your connectors are in place, it's time to define *how* your data will be processed. This is where workflows come in.

In the Unstructured platform, a **workflow** is a directed acyclic graph (DAG) that connects a series of processing steps—each one represented by a `WorkflowNode`. Think of each node as a small, focused operation in a larger data prep pipeline. These steps can include things like turning a PDF into structured JSON, generating image captions, or creating embeddings for search.

Let’s walk through the most common node types you’ll use to shape your workflow.

### Partitioning the Raw Data

Every workflow starts with a `PARTITION` node. This is the required first step and forms the foundation for everything else. Its job is to take in raw documents—PDFs, markdown files, emails, you name it—and convert them into a standardized JSON format that the rest of the pipeline can understand.

Under the hood, Unstructured offers several partitioning strategies. Here’s a quick overview:

- **Auto** : A smart mode that chooses the best strategy based on the page. It balances performance and cost by dynamically selecting between VLM, High Res, or Fast.
- **VLM**: Uses vision-language models to extract content from hard-to-read documents—like scans with handwriting or complex layouts.
- **High Res**: A solid choice for scanned image-based documents that need strong OCR plus layout understanding.
- **Fast**: Ideal for well-structured text files like markdown or Word docs. Lightweight and efficient.

If you’re curious about the structure of the output, you can explore the JSON schema [here](https://docs.unstructured.io/api-reference/partition/document-elements).


### Breaking It Down with Chunking

The next node type you’ll usually add is the `CHUNK` node. This node helps divide the document into smaller, coherent pieces of text. Why do this? Because most embedding models (and downstream tools like vector databases) work best with bite-sized chunks that fit within token limits.

You can read more about chunking strategies [here](https://docs.unstructured.io/ui/chunking).

### Generating Embeddings

Finally, there’s the `EMBED` node. This is where your clean, chunked content gets converted into numerical vector representations—aka *embeddings*. These vectors can then power similarity search, clustering, or RAG pipelines.

To go deeper on how embeddings work in Unstructured, check out the guide [here](https://docs.unstructured.io/ui/embedding).

---

Once you’ve defined your workflow steps, it’s time to create the workflow and send it off to the platform.

Run the next cell to spin it up.


In [None]:
from unstructured_client.models.shared import (
    WorkflowNode,
    WorkflowType,
    Schedule
)

parition_node = WorkflowNode(
    name="Partitioner",
    subtype="vlm",
    type="partition",
    settings={
        "provider": "anthropic",
        "model": "claude-3-7-sonnet-20250219",
        }
    )

chunk_node = WorkflowNode(
    name='Chunker',
    subtype='chunk_by_title',
    type="chunk",
    settings={
        'new_after_n_chars': 1500,
        'max_characters': 2048,
        'overlap': 0
        }
    )

embedder_node = WorkflowNode(
    name='Embedder',
    subtype='azure_openai',
    type="embed",
    settings={
        'model_name': 'text-embedding-3-small'
        }
    )


response = client.workflows.create_workflow(
    request={
        "create_workflow": {
            "name": "Azure-to-snowflake-table-custom-workflow_1373",
            "source_id": source_connector_id,
            "destination_id": destination_connector_id,
            "workflow_type": WorkflowType.CUSTOM,
            "workflow_nodes": [
                parition_node,
                chunk_node,
                embedder_node
            ],
            "schedule": Schedule("monthly")
        }
    }
)

workflow_id = response.workflow_information.id

## Step 4: Run the workflow

Run the following cell to start running the workflow.

In [None]:
res = client.workflows.run_workflow(
    request={
        "workflow_id": workflow_id,
    }
)

## Step 5: Get the workflow run's job ID

Run the next cell to get the workflow run's job ID, which is needed to poll for job completion later. If successful, Unstructured prints the job's ID.

In [None]:
response = client.jobs.list_jobs(
    request={
        "workflow_id": workflow_id
    }
)

last_job = response.response_list_jobs[0]
job_id = last_job.id

## Step 6: Poll for job completion

Run the below cell to confirm the job has finished running. If successful, Unstructured prints `"status": "COMPLETED"` within the information about the job.

In [None]:
def poll_job_status(job_id, wait_time=30):
    while True:
        response = client.jobs.get_job(
            request={
                "job_id": job_id
            }
        )

        job = response.job_information

        if job.status == "SCHEDULED":
            print(f"Job is scheduled, polling again in {wait_time} seconds...")
            time.sleep(wait_time)
        elif job.status == "IN_PROGRESS":
            print(f"Job is in progress, polling again in {wait_time} seconds...")
            time.sleep(wait_time)
        else:
            print("Job is completed")
            break

    return job

job = poll_job_status(job_id)

## Step 7: View the processed data


Once the job is completed, your data is processed, and you can find it in your table under the schema you've specified:

![](https://framerusercontent.com/images/V9MIETDCnhERvPsqJmSEaTYEw.png)