In [5]:
from box import Box
from rich import print as rprint


# Create a nested Box object for better demonstration
# Add a list to the variable

a = Box({
    "length": 10,
    "width": 5,
    "height": 3,
    "color": "blue",
    "meta": {
        "owner": "Alice",
        "location": {"city": "Paris", "country": "France"}
    },
    "tags": ["storage", "blue", {"nam":{"large":0,"a":9}}]
})

rprint(a.to_yaml())

In [6]:
from box import Box

def merge_boxes(a: Box, b: Box, unique_strings_in_lists: bool = True) -> Box:
    """
    Deep merge two Box objects:
    - b overrides a.
    - Dicts/Boxes are merged recursively.
    - Lists:
        * Single-key dicts/Boxes with same key → merge recursively
        * Strings → append only if not already present (controlled by unique_strings_in_lists)
        * Other elements → append
    - Scalars/other values: overwritten by b
    """
    result = Box(a.to_dict())

    for key, b_val in b.items():
        if key not in result:
            result[key] = b_val
            continue

        a_val = result[key]

        # Case 1: Both dicts/Boxes → merge recursively
        if isinstance(a_val, (Box, dict)) and isinstance(b_val, (Box, dict)):
            result[key] = merge_boxes(Box(a_val), Box(b_val), unique_strings_in_lists)

        # Case 2: Both lists → merge elements
        elif isinstance(a_val, list) and isinstance(b_val, list):
            # Build id_map for single-key dicts/Boxes
            id_map = {next(iter(item)): i for i, item in enumerate(a_val)
                      if isinstance(item, (Box, dict)) and len(item) == 1}

            for b_item in b_val:
                if isinstance(b_item, (Box, dict)) and len(b_item) == 1:
                    b_id = next(iter(b_item))
                    if b_id in id_map:
                        idx = id_map[b_id]
                        a_val[idx] = merge_boxes(Box(a_val[idx]), Box(b_item), unique_strings_in_lists)
                    else:
                        a_val.append(b_item)
                elif isinstance(b_item, str) and unique_strings_in_lists:
                    if b_item not in a_val:
                        a_val.append(b_item)
                else:
                    a_val.append(b_item)

        # Case 3: Otherwise → overwrite
        else:
            result[key] = b_val

    return result


In [7]:
# merge2 
import copy
import json
from collections.abc import Mapping
from typing import Any, cast


def _clone(value: Any) -> Any:
    """Return a deep copy limited to dict/list primitives."""
    if isinstance(value, dict):
        return {k: _clone(v) for k, v in value.items()}
    if isinstance(value, list):
        return [_clone(item) for item in value]
    return copy.deepcopy(value)


def _identify_list_item(item, identifier_keys):
    """Return a stable identifier for list entries so we can merge compatible items."""
    if isinstance(item, dict):
        for key in identifier_keys:
            if key in item:
                return ("key", key, item[key])
        if len(item) == 1:
            sole_key = next(iter(item))
            return ("single-key", sole_key)
        try:
            return ("shape", json.dumps(item, sort_keys=True))
        except TypeError:
            return None
    return None


def _merge_lists(
    target: list,
    source: list,
    identifier_keys: tuple[str, ...],
) -> list:
    """Merge two lists with identifier-aware reconciliation."""
    id_map: dict[tuple, int] = {}
    for idx, existing in enumerate(target):
        ident = _identify_list_item(existing, identifier_keys)
        if ident is not None and ident not in id_map:
            id_map[ident] = idx  # Remember where matching dict entries live

    for item in source:
        ident = _identify_list_item(item, identifier_keys)
        if ident is not None and ident in id_map:
            idx = id_map[ident]
            existing = target[idx]
            if isinstance(existing, dict) and isinstance(item, dict):
                # Merge matching dict entries in place
                target[idx] = merge2(
                    existing,
                    item,
                    identifier_keys,
                )
                continue
        if isinstance(item, str):
            # Deduplicate string tags by default
            if item not in target:
                target.append(item)
        else:
            # Numbers or nested structures fall here → clone and append
            appended = _clone(item)
            target.append(appended)
            if ident is not None and ident not in id_map and isinstance(appended, dict):
                id_map[ident] = len(target) - 1  # Enable future merges for appended dicts
    return target


def merge2(
    a: Mapping,
    b: Mapping,
    identifier_keys: tuple[str, ...] = ("id", "name", "uuid"),
) -> dict:
    """
    Deep merge two dictionaries with deterministic list reconciliation and string deduplication.
    """
    if not isinstance(a, Mapping) or not isinstance(b, Mapping):
        raise TypeError("merge2 expects mapping inputs")

    result: dict[str, Any] = cast(dict[str, Any], _clone(dict(a)))  # preserve caller input by cloning
    other = dict(b)  # shallow copy is fine; we immediately clone elements

    for key, b_val in other.items():
        if key not in result:
            result[key] = _clone(b_val)
            continue

        a_val = result[key]

        if isinstance(a_val, dict) and isinstance(b_val, Mapping):
            # Merge nested dictionaries recursively
            result[key] = merge2(
                a_val,
                dict(b_val),
                identifier_keys,
            )
        elif isinstance(a_val, list) and isinstance(b_val, list):
            result[key] = _merge_lists(
                a_val,
                b_val,
                identifier_keys,
            )
        else:
            # Scalars or mismatched types → overwrite with b
            result[key] = _clone(b_val)

    return result

In [8]:
import copy
from box import Box


def _new_box() -> Box:
    """Convenience factory to enable dot-based assignments."""
    return Box(default_box=True, box_dots=True)


def _assert_merge(name: str, base: Box, incoming: Box, expected: Box) -> None:
    """Helper asserting merge2 results and immutability when driven by Box inputs."""
    base_snapshot = copy.deepcopy(base)
    incoming_snapshot = copy.deepcopy(incoming)
    result = merge2(base, incoming)
    assert result == expected.to_dict(), f"{name} → unexpected merge result: {result}"
    assert base == base_snapshot, f"{name} → base mutated"
    assert incoming == incoming_snapshot, f"{name} → incoming mutated"


# 1. Deduplicate string tags while preserving originals
tags_base = _new_box()
tags_base.meta.tags = ["blue", "storage"]
tags_incoming = _new_box()
tags_incoming.meta.tags = ["new", "blue"]
tags_expected = _new_box()
tags_expected.meta.tags = ["blue", "storage", "new"]
_assert_merge("tags_merge", tags_base, tags_incoming, tags_expected)


# 2. Merge dict items by identifier and recurse into nested dicts
service_base = _new_box()
service_api = _new_box()
service_api.id = "api"
service_api.port = 80
service_api.env.DEBUG = False
service_base.services = [service_api]

service_incoming = _new_box()
incoming_api = _new_box()
incoming_api.id = "api"
incoming_api.env.DEBUG = True
incoming_api.env.CACHE = False
incoming_worker = _new_box()
incoming_worker.id = "worker"
incoming_worker.port = 9000
service_incoming.services = [incoming_api, incoming_worker]

service_expected = _new_box()
expected_api = _new_box()
expected_api.id = "api"
expected_api.port = 80
expected_api.env.DEBUG = True
expected_api.env.CACHE = False
expected_worker = _new_box()
expected_worker.id = "worker"
expected_worker.port = 9000
service_expected.services = [expected_api, expected_worker]

_assert_merge("service_merge", service_base, service_incoming, service_expected)


# 3. Single-key dicts merge via their key name even without explicit ids
feature_base = _new_box()
feature_entry = _new_box()
feature_entry.feature.enabled = False
feature_entry.feature.level = "beta"
feature_base.policies = [feature_entry]

feature_incoming = _new_box()
feature_update = _new_box()
feature_update.feature.level = "stable"
feature_incoming.policies = [feature_update]

feature_expected = _new_box()
feature_expected_entry = _new_box()
feature_expected_entry.feature.enabled = False
feature_expected_entry.feature.level = "stable"
feature_expected.policies = [feature_expected_entry]

_assert_merge("feature_single_key", feature_base, feature_incoming, feature_expected)


# 4. Scalars without identifiers append in order (duplicates allowed)
numeric_base = _new_box()
numeric_base.thresholds = [1, 2]
numeric_incoming = _new_box()
numeric_incoming.thresholds = [2, 3]
numeric_expected = _new_box()
numeric_expected.thresholds = [1, 2, 2, 3]
_assert_merge("numeric_append", numeric_base, numeric_incoming, numeric_expected)


# 5. Identical dict entries are deduplicated via identifier or shape matching
duplicate_rule = _new_box()
duplicate_rule.rule.allow = True
duplicate_base = _new_box()
duplicate_base.rules = [duplicate_rule]
duplicate_incoming = _new_box()
duplicate_incoming.rules = [duplicate_rule]
duplicate_expected = _new_box()
expected_rule = _new_box()
expected_rule.rule.allow = True
duplicate_expected.rules = [expected_rule]
_assert_merge("duplicate_dict_dedup", duplicate_base, duplicate_incoming, duplicate_expected)


# 6. Nested mapping replaced when incoming type differs
override_base = _new_box()
override_base.settings.timeout = 30
override_base.settings.retries = 2
override_incoming = _new_box()
override_incoming.settings = 42
override_expected = _new_box()
override_expected.settings = 42
_assert_merge("type_override", override_base, override_incoming, override_expected)


# 7. Non-identifiable dict entries append when content diverges
unidentifiable_base = _new_box()
item_one = _new_box()
item_one.x = 1
item_one.y = 2
unidentifiable_base.items = [item_one]

unidentifiable_incoming = _new_box()
item_two = _new_box()
item_two.x = 2
item_two.y = 3
unidentifiable_incoming.items = [item_two]

unidentifiable_expected = _new_box()
expected_item_one = _new_box()
expected_item_one.x = 1
expected_item_one.y = 2
expected_item_two = _new_box()
expected_item_two.x = 2
expected_item_two.y = 3
unidentifiable_expected.items = [expected_item_one, expected_item_two]
_assert_merge("unidentifiable_append", unidentifiable_base, unidentifiable_incoming, unidentifiable_expected)


print("merge2 sanity checks passed ✔")

BoxKeyError: 'Key name "items" is protected'

In [9]:
from __future__ import annotations
from typing import Any, TypeVar
from pydantic import BaseModel

ModelT = TypeVar("ModelT", bound=BaseModel)


def merge_basemodels(
    base: ModelT,
    incoming: ModelT,
    identifier_keys: tuple[str, ...] = ("id", "name", "uuid"),
) -> ModelT:
    """Merge two Pydantic models by delegating to merge2 on their Python payloads."""
    target_type = type(incoming) if type(base) is not type(incoming) else type(base)
    merged_payload = merge2(
        base.model_dump(mode="python"),
        incoming.model_dump(mode="python"),
        identifier_keys,
    )
    return target_type.model_validate(merged_payload)


class Service(BaseModel):
    id: str
    port: int
    env: dict[str, Any] = {}


class AppConfig(BaseModel):
    services: list[Service]
    tags: list[str] = []


defaults = AppConfig(
    services=[Service(id="api", port=80, env={"DEBUG": False})],
    tags=["prod"],
)

override = AppConfig(
    services=[
        Service(id="api", port=80, env={"DEBUG": True, "CACHE": False}),
        Service(id="worker", port=9000, env={}),
    ],
    tags=["prod", "blue"],
)

merged_config = merge_basemodels(defaults, override)
merged_config

AppConfig(services=[Service(id='api', port=80, env={'DEBUG': True, 'CACHE': False}), Service(id='worker', port=9000, env={})], tags=['prod', 'blue'])

In [10]:
# Deep merge two Pydantic BaseModel instances while preserving model attributes.
import copy
from typing import Any
from pydantic import BaseModel



def _clone_model_value(value: Any) -> Any:
    if isinstance(value, BaseModel):
        return value.model_copy(deep=True)
    if isinstance(value, list):
        return [_clone_model_value(item) for item in value]
    if isinstance(value, dict):
        return {key: _clone_model_value(val) for key, val in value.items()}
    return copy.deepcopy(value)



def _identify_model_item(item: Any, identifier_keys: tuple[str, ...]) -> Any:
    if isinstance(item, BaseModel):
        return _identify_model_item(item.model_dump(mode="python"), identifier_keys)
    if isinstance(item, dict):
        for key in identifier_keys:
            if key in item:
                return ("id", key, item[key])
        if len(item) == 1:
            return ("single", next(iter(item)))
    return None



def _merge_model_list(
    current: list[Any],
    incoming: list[Any],
    identifier_keys: tuple[str, ...],
) -> list[Any]:
    merged = [_clone_model_value(item) for item in current]
    indexed: dict[Any, int] = {}
    for idx, item in enumerate(merged):
        ident = _identify_model_item(item, identifier_keys)
        if ident is not None and ident not in indexed:
            indexed[ident] = idx

    for item in incoming:
        ident = _identify_model_item(item, identifier_keys)
        if ident is not None and ident in indexed:
            existing_idx = indexed[ident]
            merged[existing_idx] = _merge_model_value(
                merged[existing_idx],
                item,
                identifier_keys,
            )
            continue
        if isinstance(item, str):
            if item not in merged:
                merged.append(item)
            continue
        cloned = _clone_model_value(item)
        merged.append(cloned)
        if ident is not None and ident not in indexed:
            indexed[ident] = len(merged) - 1
    return merged



def _merge_model_value(
    current: Any,
    incoming: Any,
    identifier_keys: tuple[str, ...],
) -> Any:
    if incoming is None:
        return None
    if current is None:
        return _clone_model_value(incoming)
    if isinstance(current, BaseModel) and isinstance(incoming, BaseModel):
        return _merge_models_preserving(current, incoming, identifier_keys)
    if isinstance(current, dict) and isinstance(incoming, dict):
        return merge2(current, incoming, identifier_keys)
    if isinstance(current, list) and isinstance(incoming, list):
        return _merge_model_list(current, incoming, identifier_keys)
    return _clone_model_value(incoming)



def _merge_models_preserving(
    base: BaseModel,
    incoming: BaseModel,
    identifier_keys: tuple[str, ...],
) -> BaseModel:
    if type(base) is not type(incoming):
        return incoming.model_copy(deep=True)
    result = base.model_copy(deep=True)
    for field_name in type(result).model_fields:
        if not hasattr(incoming, field_name):
            continue
        base_value = getattr(result, field_name)
        incoming_value = getattr(incoming, field_name)
        merged_value = _merge_model_value(
            base_value,
            incoming_value,
            identifier_keys,
        )
        setattr(result, field_name, merged_value)
    return result



def merge_basemodels_preserving(
    base: ModelT,
    incoming: ModelT,
    identifier_keys: tuple[str, ...] = ("id", "name", "uuid"),
) -> ModelT:
    """Deep merge two BaseModel instances while keeping model attributes intact."""
    merged = _merge_models_preserving(base, incoming, identifier_keys)
    return type(merged).model_validate(merged.model_dump(mode="python"))



merged_config_preserving = merge_basemodels_preserving(defaults, override)
merged_config_preserving

AppConfig(services=[Service(id='api', port=80, env={'DEBUG': True, 'CACHE': False}), Service(id='worker', port=9000, env={})], tags=['prod', 'blue'])

In [12]:
from typing import Any
from pydantic import BaseModel
import yaml



def _dump_model(model: BaseModel) -> dict[str, Any]:
    return model.model_dump(mode="python")



def _assert_model_merge(
    name: str,
    base_model: BaseModel,
    incoming_model: BaseModel,
    expected_dump: dict[str, Any],
) -> None:
    base_snapshot = _dump_model(base_model)
    incoming_snapshot = _dump_model(incoming_model)
    merged = merge_basemodels_preserving(base_model, incoming_model)
    merged_dump = _dump_model(merged)
    assert merged_dump == expected_dump, f"{name}: unexpected merge result: {merged_dump}"
    assert _dump_model(base_model) == base_snapshot, f"{name}: base mutated"
    assert _dump_model(incoming_model) == incoming_snapshot, f"{name}: incoming mutated"



class Service(BaseModel):
    id: str
    port: int
    env: dict[str, Any] = {}



class Rollout(BaseModel):
    feature: dict[str, Any]



class AppSettings(BaseModel):
    payload: dict[str, Any] | int



class ComplexConfig(BaseModel):
    services: list[Service]
    policies: list[Rollout]
    thresholds: list[int]
    tags: list[str]
    settings: AppSettings
    metadata: dict[str, Any]
    alerts: list[dict[str, Any]]
    rules: list[dict[str, Any]]



COMPLEX_DEFAULTS_YAML = """
services:
  - id: api
    port: 80
    env:
      DEBUG: false
      RETRIES: 1
policies:
  - feature:
      enabled: false
      level: beta
thresholds: [1, 2]
tags: [prod, legacy]
settings:
  payload:
    timeout: 30
    retries: 2
metadata:
  region:
    primary: us-east
    backup: us-west
  owners:
    - Alice
alerts:
  - severity: low
    code: A
rules:
  - rule:
      allow: true
      threshold: 5
"""



COMPLEX_OVERRIDE_YAML = """
services:
  - id: api
    port: 80
    env:
      DEBUG: true
      CACHE: false
  - id: worker
    port: 9000
    env: {}
policies:
  - feature:
      level: stable
      extra: true
thresholds: [2, 3]
tags: [prod, blue]
settings:
  payload: 42
metadata:
  region:
    backup: eu-west
  owners:
    - Alice
    - Bob
  notes:
    cycling: true
alerts:
  - severity: medium
    code: B
rules:
  - rule:
      allow: true
      notes: monitored
  - rule:
      allow: false
      severity: critical
"""



EXPECTED_COMPLEX_YAML = """
services:
  - id: api
    port: 80
    env:
      DEBUG: true
      RETRIES: 1
      CACHE: false
  - id: worker
    port: 9000
    env: {}
policies:
  - feature:
      enabled: false
      level: stable
      extra: true
thresholds: [1, 2, 2, 3]
tags: [prod, legacy, blue]
settings:
  payload: 42
metadata:
  region:
    primary: us-east
    backup: eu-west
  owners:
    - Alice
    - Bob
  notes:
    cycling: true
alerts:
  - severity: low
    code: A
  - severity: medium
    code: B
rules:
  - rule:
      allow: false
      threshold: 5
      notes: monitored
      severity: critical
"""



complex_defaults = ComplexConfig.model_validate(yaml.safe_load(COMPLEX_DEFAULTS_YAML))


complex_override = ComplexConfig.model_validate(yaml.safe_load(COMPLEX_OVERRIDE_YAML))


expected_complex = yaml.safe_load(EXPECTED_COMPLEX_YAML)


_assert_model_merge(
    "complex_configuration",
    complex_defaults,
    complex_override,
    expected_complex,
)


print("merge_basemodels_preserving comprehensive scenario passed ✔")

merge_basemodels_preserving comprehensive scenario passed ✔


In [3]:
# Create another Box object
b2 = Box({
    "length": 20,
    "width": 8,
    "material": "wood",
    "meta": {
        "owner": "Bob",
        "location": {"city": "Berlin"}
    },
    "tags": ["new", "wooden"]
})

# Merge b2 into a, overwriting in case of collision
#c= Box(a)
#c.merge_update(b2)
c= a + b2  # This will concatenate lists instead of overwriting
d = merge_boxes(a, b2)  # This will do a deep merge with custom logic
rprint(c.to_yaml())
rprint(d.to_yaml()) 

In [4]:
tempbox=Box(default_box=True,box_dots=True)

tempbox["a.e.f"]=11
rprint(tempbox.to_yaml())
