# Process Instance Migration Tool

This notebook provides a comprehensive tool for migrating process instances between process definition versions in Operaton/Camunda 7.

## Features
- Select source and target process definition versions
- View running instance counts per version
- Three migration modes:
  - **By Instance**: Select specific instances to migrate
  - **Sync Batch**: Migrate in synchronous batches
  - **Async Batch**: Migrate using async batch jobs
- Add custom migration instructions for activity mappings
- Validate migration plan before execution
- Dry run mode for safe testing

## Migration Instructions Format
```json
[{
  "sourceActivityIds": ["Activity_old"],
  "targetActivityIds": ["Activity_new"],
  "updateEventTriggers": true
}]
```

## Usage
1. Run all cells in order
2. Select a process definition
3. Choose source (from) and target (to) versions
4. Configure migration mode and options
5. Click "Validate Plan" to check the migration
6. Uncheck "Dry run" and click "Execute Migration" to perform the migration

In [None]:
# Initialize environment and imports
import operaton
from operaton import Operaton
import json

import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

# Initialize environment (loads API configuration from localStorage)
await operaton.load_env()
print("✅ Environment loaded successfully")

In [None]:
# Validation functions for Camunda 7 REST API structures

def validate_migration_instruction(instruction):
    """
    Validate a single migration instruction according to Camunda 7 REST API.
    
    Required structure:
    {
      "sourceActivityIds": ["string"],  # Required: list of source activity IDs
      "targetActivityIds": ["string"],  # Required: list of target activity IDs  
      "updateEventTrigger": boolean     # Optional: whether to update event triggers
    }
    """
    errors = []
    
    if not isinstance(instruction, dict):
        return ["Instruction must be an object/dictionary"]
    
    # Check sourceActivityIds
    if "sourceActivityIds" not in instruction:
        errors.append("Missing required field 'sourceActivityIds'")
    elif not isinstance(instruction["sourceActivityIds"], list):
        errors.append("'sourceActivityIds' must be an array")
    elif not all(isinstance(id, str) for id in instruction["sourceActivityIds"]):
        errors.append("All items in 'sourceActivityIds' must be strings")
    
    # Check targetActivityIds
    if "targetActivityIds" not in instruction:
        errors.append("Missing required field 'targetActivityIds'")
    elif not isinstance(instruction["targetActivityIds"], list):
        errors.append("'targetActivityIds' must be an array")
    elif not all(isinstance(id, str) for id in instruction["targetActivityIds"]):
        errors.append("All items in 'targetActivityIds' must be strings")
    
    # Check updateEventTrigger (optional)
    if "updateEventTrigger" in instruction:
        if not isinstance(instruction["updateEventTrigger"], bool):
            errors.append("'updateEventTrigger' must be a boolean")
    
    # Also accept updateEventTriggers (common typo/variant)
    if "updateEventTriggers" in instruction:
        if not isinstance(instruction["updateEventTriggers"], bool):
            errors.append("'updateEventTriggers' must be a boolean")
    
    # Check for unknown fields (warning only)
    known_fields = {"sourceActivityIds", "targetActivityIds", "updateEventTrigger", "updateEventTriggers"}
    unknown = set(instruction.keys()) - known_fields
    if unknown:
        errors.append(f"Warning: Unknown fields will be ignored: {unknown}")
    
    return errors


def validate_migration_instructions(instructions_json):
    """
    Validate migration instructions JSON string.
    
    Returns: (parsed_instructions, errors)
    """
    errors = []
    
    # Parse JSON
    try:
        instructions = json.loads(instructions_json)
    except json.JSONDecodeError as e:
        return None, [f"Invalid JSON: {e}"]
    
    # Must be a list
    if not isinstance(instructions, list):
        return None, ["Instructions must be an array of instruction objects"]
    
    # Validate each instruction
    valid_instructions = []
    for i, instruction in enumerate(instructions):
        inst_errors = validate_migration_instruction(instruction)
        if inst_errors:
            for err in inst_errors:
                if not err.startswith("Warning"):
                    errors.append(f"Instruction {i+1}: {err}")
        
        # Only include instructions with actual activity mappings
        if (instruction.get("sourceActivityIds") and 
            instruction.get("targetActivityIds") and
            len(instruction["sourceActivityIds"]) > 0 and
            len(instruction["targetActivityIds"]) > 0):
            valid_instructions.append(instruction)
    
    return valid_instructions, errors


def validate_variable_value(value):
    """
    Validate a variable value according to Camunda 7 REST API.
    
    Valid structure:
    {
      "value": any,           # The variable value
      "type": "String|Integer|Long|Double|Boolean|Date|Object|Json|...",
      "valueInfo": {}         # Optional: additional type-specific info
    }
    """
    errors = []
    
    if not isinstance(value, dict):
        return ["Variable value must be an object"]
    
    # Type is optional but if present must be valid
    valid_types = [
        "String", "Integer", "Short", "Long", "Double", "Boolean",
        "Date", "Bytes", "Object", "Json", "Xml", "File", "Null"
    ]
    
    if "type" in value:
        if value["type"] not in valid_types:
            errors.append(f"Invalid variable type '{value['type']}'. Valid types: {valid_types}")
    
    # valueInfo is optional but must be object if present
    if "valueInfo" in value and not isinstance(value["valueInfo"], dict):
        errors.append("'valueInfo' must be an object")
    
    return errors


print("✅ Validation functions loaded")

In [None]:
# Fetch process definitions (excluding system tenants)
all_definitions = Operaton.get("/process-definition?latestVersion=true")
definitions = [(d["name"] or d["key"], d["key"]) for d in all_definitions 
               if d.get("tenantId") not in ["SYSTEM", "TRANSIENT", None] or d.get("tenantId") is None]
definitions = sorted(set(definitions), key=lambda x: x[0])

print(f"✅ Found {len(definitions)} process definitions")

In [None]:
# State management
state = {
    'versions': [],
    'instances': [],
    'migration_plan': None
}

# === WIDGETS ===

# Process selection
w_key = widgets.Dropdown(
    options=[('-- Select Process --', None)] + definitions,
    value=None,
    description='Process:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px')
)

w_key_out = widgets.Output()

# Version selection
w_source = widgets.Dropdown(
    options=[],
    description='From version:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='250px'),
    disabled=True
)

w_target = widgets.Dropdown(
    options=[],
    description='To version:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='250px'),
    disabled=True
)

# Migration mode
w_mode = widgets.RadioButtons(
    options=[
        ('By Instance - Select specific instances', 'instance'),
        ('Sync Batch - Migrate in synchronous batches', 'sync'),
        ('Async Batch - Use async batch jobs', 'async')
    ],
    value='instance',
    description='Mode:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px')
)

# Instance selection (for 'instance' mode)
w_instances = widgets.SelectMultiple(
    options=[],
    description='Instances:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px', height='150px'),
    disabled=True
)

# Batch settings (for batch modes)
w_amount = widgets.IntText(
    value=10,
    description='Max instances:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='200px')
)

w_batch_size = widgets.IntText(
    value=5,
    description='Batch size:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='200px')
)

# Migration instructions
w_instructions = widgets.Textarea(
    value='''[{
  "sourceActivityIds": [],
  "targetActivityIds": [],
  "updateEventTrigger": true
}]''',
    description='Instructions:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px', height='120px'),
    placeholder='Additional migration instructions (JSON array)'
)

# Options
w_dry_run = widgets.Checkbox(
    value=True,
    description='Dry run (validate only, do not execute)',
    style={'description_width': 'initial'}
)

w_skip_listeners = widgets.Checkbox(
    value=False,
    description='Skip custom listeners',
    style={'description_width': 'initial'}
)

w_skip_io_mappings = widgets.Checkbox(
    value=False,
    description='Skip IO mappings',
    style={'description_width': 'initial'}
)

w_update_triggers = widgets.Checkbox(
    value=True,
    description='Update event triggers',
    style={'description_width': 'initial'}
)

# Buttons
btn_validate = widgets.Button(
    description='Validate Plan',
    button_style='info',
    icon='check',
    layout=widgets.Layout(width='150px')
)

btn_execute = widgets.Button(
    description='Execute Migration',
    button_style='primary',
    icon='play',
    layout=widgets.Layout(width='180px'),
    disabled=True
)

btn_refresh = widgets.Button(
    description='Refresh',
    button_style='',
    icon='refresh',
    layout=widgets.Layout(width='100px')
)

# Output areas
output_versions = widgets.Output()
output_validation = widgets.Output()
output_execution = widgets.Output()

print("✅ Widgets created")

In [None]:
# === EVENT HANDLERS ===

def on_process_change(change):
    """Handle process selection change."""
    if change['new'] is None:
        w_source.options = []
        w_target.options = []
        w_source.disabled = True
        w_target.disabled = True
        w_instances.options = []
        w_instances.disabled = True
        btn_execute.disabled = True
        with output_versions:
            clear_output()
        return
    
    refresh_versions()


def refresh_versions(*args):
    """Refresh version list for selected process."""
    if not w_key.value:
        return
    
    with output_versions:
        clear_output()
        print(f"Loading versions for {w_key.value}...")
    
    try:
        versions = Operaton.get(f"/process-definition?key={w_key.value}")
        versions.sort(key=lambda x: x["version"], reverse=True)
        
        # Get instance counts
        for v in versions:
            count_resp = Operaton.get(f'/process-instance/count?processDefinitionId={v["id"]}')
            v["count"] = count_resp.get("count", 0)
        
        state['versions'] = versions
        
        # Populate dropdowns
        # Source: versions with running instances (older versions)
        source_options = [(f'v{v["version"]} ({v["count"]} instances)', v["id"]) 
                          for v in sorted(versions, key=lambda x: x["version"]) 
                          if v["count"] > 0]
        
        # Target: all versions (typically latest)
        target_options = [(f'v{v["version"]}', v["id"]) for v in versions]
        
        w_source.options = source_options
        w_target.options = target_options
        w_source.disabled = len(source_options) == 0
        w_target.disabled = len(target_options) == 0
        
        # Display version table
        with output_versions:
            clear_output()
            html = ['<table style="border-collapse: collapse; width: 100%;">']
            html.append('<tr style="background-color: #4CAF50; color: white;">')
            html.append('<th style="padding: 8px; border: 1px solid #ddd;">Version</th>')
            html.append('<th style="padding: 8px; border: 1px solid #ddd;">Name</th>')
            html.append('<th style="padding: 8px; border: 1px solid #ddd;">Running Instances</th>')
            html.append('</tr>')
            
            for i, v in enumerate(versions):
                bg = '#f9f9f9' if i % 2 == 0 else 'white'
                html.append(f'<tr style="background-color: {bg};">')
                html.append(f'<td style="padding: 8px; border: 1px solid #ddd;">{v["version"]}</td>')
                html.append(f'<td style="padding: 8px; border: 1px solid #ddd;">{v.get("name", v["key"])}</td>')
                html.append(f'<td style="padding: 8px; border: 1px solid #ddd;">{v["count"]}</td>')
                html.append('</tr>')
            
            html.append('</table>')
            display(HTML(''.join(html)))
            
    except Exception as e:
        with output_versions:
            clear_output()
            print(f"❌ Error loading versions: {e}")


def on_source_change(change):
    """Handle source version change - load instances for 'instance' mode."""
    if not change['new']:
        w_instances.options = []
        w_instances.disabled = True
        return
    
    if w_mode.value == 'instance':
        try:
            instances = Operaton.get(f'/process-instance?processDefinitionId={change["new"]}')
            state['instances'] = instances
            w_instances.options = [(f'{i.get("businessKey", i["id"][:8])} ({i["id"][:8]}...)', i["id"]) 
                                   for i in instances]
            w_instances.disabled = False
        except Exception as e:
            w_instances.options = []
            with output_validation:
                clear_output()
                print(f"❌ Error loading instances: {e}")


def on_mode_change(change):
    """Handle migration mode change."""
    mode = change['new']
    
    # Show/hide instance selector vs batch controls
    if mode == 'instance':
        w_instances.disabled = False
        on_source_change({'new': w_source.value})
    else:
        w_instances.disabled = True
    
    btn_execute.disabled = True


def on_validate_click(button):
    """Validate the migration plan."""
    with output_validation:
        clear_output()
        
        # Check source and target
        if not w_source.value:
            print("❌ Please select a source version")
            return
        if not w_target.value:
            print("❌ Please select a target version")
            return
        if w_source.value == w_target.value:
            print("❌ Source and target versions must be different")
            return
        
        # Validate instructions JSON
        instructions, errors = validate_migration_instructions(w_instructions.value)
        if errors:
            print("❌ Invalid migration instructions:")
            for err in errors:
                print(f"   - {err}")
            return
        
        print("🔄 Generating migration plan...")
        
        try:
            # Generate migration plan
            plan = Operaton.post("/migration/generate", {
                "sourceProcessDefinitionId": w_source.value,
                "targetProcessDefinitionId": w_target.value,
                "updateEventTriggers": w_update_triggers.value
            })
            
            # Add custom instructions
            if instructions:
                plan["instructions"].extend(instructions)
            
            state['migration_plan'] = plan
            
            print("🔄 Validating migration plan...")
            
            # Validate plan
            validation = Operaton.post("/migration/validate", plan)
            
            # Display plan
            print("\n📋 Migration Plan:")
            print(f"   Source: {plan['sourceProcessDefinitionId'][:20]}...")
            print(f"   Target: {plan['targetProcessDefinitionId'][:20]}...")
            print(f"   Instructions: {len(plan['instructions'])}")
            
            # Show instructions
            if plan['instructions']:
                print("\n   Activity Mappings:")
                for inst in plan['instructions'][:5]:  # Show first 5
                    src = ', '.join(inst.get('sourceActivityIds', []))
                    tgt = ', '.join(inst.get('targetActivityIds', []))
                    print(f"      {src} → {tgt}")
                if len(plan['instructions']) > 5:
                    print(f"      ... and {len(plan['instructions']) - 5} more")
            
            # Check validation results
            has_errors = False
            
            if validation.get("instructionReports"):
                has_errors = True
                print("\n❌ Instruction Validation Errors:")
                for report in validation["instructionReports"]:
                    print(f"   - {report}")
            
            if validation.get("variableReports"):
                has_errors = True
                print("\n❌ Variable Validation Errors:")
                for report in validation["variableReports"]:
                    print(f"   - {report}")
            
            if has_errors:
                print("\n❌ Migration plan has validation errors. Please fix before executing.")
                btn_execute.disabled = True
            else:
                print("\n✅ Migration plan is valid!")
                
                # Show what will be migrated
                mode = w_mode.value
                if mode == 'instance':
                    selected = list(w_instances.value)
                    print(f"\n📊 Ready to migrate {len(selected)} selected instance(s)")
                else:
                    print(f"\n📊 Ready to migrate up to {w_amount.value} instances in batches of {w_batch_size.value}")
                
                if w_dry_run.value:
                    print("\n⚠️ Dry run mode is ON - execution will be simulated")
                    btn_execute.disabled = False
                else:
                    print("\n⚠️ Dry run mode is OFF - execution will be REAL")
                    btn_execute.disabled = False
                
        except Exception as e:
            print(f"\n❌ Error validating migration: {e}")
            btn_execute.disabled = True


def on_execute_click(button):
    """Execute the migration."""
    with output_execution:
        clear_output()
        
        if not state['migration_plan']:
            print("❌ No migration plan. Please validate first.")
            return
        
        plan = state['migration_plan']
        mode = w_mode.value
        
        if w_dry_run.value:
            print("🔍 DRY RUN - No actual migration will be performed\n")
        
        try:
            if mode == 'instance':
                # Migrate specific instances
                instance_ids = list(w_instances.value)
                if not instance_ids:
                    print("❌ No instances selected")
                    return
                
                print(f"🚀 Migrating {len(instance_ids)} instance(s)...")
                
                if not w_dry_run.value:
                    result = Operaton.post("/migration/execute", {
                        "migrationPlan": plan,
                        "processInstanceIds": instance_ids,
                        "skipCustomListeners": w_skip_listeners.value,
                        "skipIoMappings": w_skip_io_mappings.value
                    })
                    print(f"✅ Successfully migrated {len(instance_ids)} instance(s)")
                    # Refresh instance list
                    on_source_change({'new': w_source.value})
                else:
                    print(f"✅ [DRY RUN] Would migrate {len(instance_ids)} instance(s)")
                    for iid in instance_ids[:5]:
                        print(f"   - {iid}")
                    if len(instance_ids) > 5:
                        print(f"   ... and {len(instance_ids) - 5} more")
                        
            elif mode == 'sync':
                # Synchronous batch migration
                amount = w_amount.value
                batch_size = w_batch_size.value
                migrated_total = 0
                
                print(f"🚀 Migrating up to {amount} instances in sync batches of {batch_size}...")
                
                while amount > 0:
                    # Fetch batch
                    instances = Operaton.get(
                        f"/process-instance?processDefinitionId={w_source.value}&maxResults={min(batch_size, amount)}"
                    )
                    
                    if not instances:
                        print(f"\n📭 No more instances to migrate")
                        break
                    
                    instance_ids = [i["id"] for i in instances]
                    
                    if not w_dry_run.value:
                        Operaton.post("/migration/execute", {
                            "migrationPlan": plan,
                            "processInstanceIds": instance_ids,
                            "skipCustomListeners": w_skip_listeners.value,
                            "skipIoMappings": w_skip_io_mappings.value
                        })
                        print(f"   ✅ Batch: migrated {len(instance_ids)} instances")
                    else:
                        print(f"   ✅ [DRY RUN] Would migrate batch of {len(instance_ids)} instances")
                    
                    migrated_total += len(instance_ids)
                    amount -= len(instance_ids)
                
                print(f"\n✅ Total: {'would migrate' if w_dry_run.value else 'migrated'} {migrated_total} instance(s)")
                
            elif mode == 'async':
                # Asynchronous batch migration
                amount = w_amount.value
                batch_size = w_batch_size.value
                batches = []
                first = 0
                
                print(f"🚀 Preparing async migration of up to {amount} instances in batches of {batch_size}...")
                
                # Collect batches
                remaining = amount
                while remaining > 0:
                    instances = Operaton.get(
                        f"/process-instance?processDefinitionId={w_source.value}&firstResult={first}&maxResults={min(batch_size, remaining)}"
                    )
                    
                    if not instances:
                        break
                    
                    instance_ids = [i["id"] for i in instances]
                    batches.append(instance_ids)
                    remaining -= len(instance_ids)
                    first += len(instance_ids)
                
                print(f"   📦 Prepared {len(batches)} batch(es)")
                
                # Execute batches
                for i, batch_ids in enumerate(batches):
                    if not w_dry_run.value:
                        result = Operaton.post("/migration/executeAsync", {
                            "migrationPlan": plan,
                            "processInstanceIds": batch_ids,
                            "skipCustomListeners": w_skip_listeners.value,
                            "skipIoMappings": w_skip_io_mappings.value
                        })
                        batch_id = result.get("id", "unknown")
                        print(f"   ✅ Batch {i+1}: started async job {batch_id[:8]}... ({len(batch_ids)} instances)")
                    else:
                        print(f"   ✅ [DRY RUN] Batch {i+1}: would start async job ({len(batch_ids)} instances)")
                
                total = sum(len(b) for b in batches)
                print(f"\n✅ Total: {'would schedule' if w_dry_run.value else 'scheduled'} {total} instance(s) for async migration")
            
            # Refresh version counts
            refresh_versions()
            
        except Exception as e:
            print(f"\n❌ Error during migration: {e}")
            import traceback
            traceback.print_exc()


# Attach handlers
w_key.observe(on_process_change, names='value')
w_source.observe(on_source_change, names='value')
w_mode.observe(on_mode_change, names='value')
btn_validate.on_click(on_validate_click)
btn_execute.on_click(on_execute_click)
btn_refresh.on_click(refresh_versions)

print("✅ Event handlers attached")

In [None]:
# === DISPLAY UI ===

# Section 1: Process Selection
section_process = widgets.VBox([
    widgets.HTML('<h2>📋 1. Select Process Definition</h2>'),
    widgets.HBox([w_key, btn_refresh]),
    output_versions
])

# Section 2: Version Selection
section_versions = widgets.VBox([
    widgets.HTML('<h2>🔄 2. Select Source and Target Versions</h2>'),
    widgets.HBox([w_source, w_target])
])

# Section 3: Migration Mode
section_mode = widgets.VBox([
    widgets.HTML('<h2>⚙️ 3. Migration Mode</h2>'),
    w_mode,
    widgets.HTML('<p><strong>Instance Selection</strong> (for "By Instance" mode):</p>'),
    w_instances,
    widgets.HTML('<p><strong>Batch Settings</strong> (for batch modes):</p>'),
    widgets.HBox([w_amount, w_batch_size])
])

# Section 4: Advanced Options
section_options = widgets.VBox([
    widgets.HTML('<h2>🛠️ 4. Options</h2>'),
    widgets.HTML('<p><strong>Migration Instructions</strong> (additional activity mappings):</p>'),
    w_instructions,
    widgets.HTML('<p><strong>Execution Options:</strong></p>'),
    w_update_triggers,
    w_skip_listeners,
    w_skip_io_mappings,
    w_dry_run
])

# Section 5: Execute
section_execute = widgets.VBox([
    widgets.HTML('<h2>🚀 5. Validate and Execute</h2>'),
    widgets.HBox([btn_validate, btn_execute]),
    widgets.HTML('<h3>Validation Result:</h3>'),
    output_validation,
    widgets.HTML('<h3>Execution Result:</h3>'),
    output_execution
])

# Main UI
display(widgets.VBox([
    widgets.HTML('<h1>🔀 Process Instance Migration Tool</h1>'),
    widgets.HTML('<hr>'),
    section_process,
    widgets.HTML('<hr>'),
    section_versions,
    widgets.HTML('<hr>'),
    section_mode,
    widgets.HTML('<hr>'),
    section_options,
    widgets.HTML('<hr>'),
    section_execute
]))