## Assertions

### assert_type_value()

In [1]:
from nimbro_utils.lazy import assert_type_value
data = 5
assert_type_value(obj=data, type_or_value=[int, float], name="variable 'a'")
assert_type_value(obj=data, type_or_value=[float, 5, "test", None], name="variable 'a'")

### assert_attribute()

In [2]:
from nimbro_utils.lazy import assert_attribute
data = 5
assert_attribute(obj=data, attribute="test", exists=False)

### assert_keys()

In [3]:
from nimbro_utils.lazy import assert_keys
data = {
    'test': None,
    'name': None,
    'age': None
}
assert_keys(obj=data, keys=['test', 'name', 'age'], mode="match")
assert_keys(obj=data, keys=['test', 'name', 'age', 'profession'], mode="whitelist")
assert_keys(obj=data, keys=['name', 'age'], mode="required")
assert_keys(obj=data, keys=['profession'], mode="blacklist")

## JSON files

In [4]:
file_path = "./test.json"

### write_json()

In [5]:
from nimbro_utils.lazy import write_json
content = {'number': 0, 'null': None}
success, message = write_json(file_path=file_path, json_object=content)
print("success:", success)
print("message:", message)

success: True
message: Written file '/home/paetzoldb0/ws/jazzy/main/src/nimbro_utils/notebooks/utility/test.json' in '0.001s'.


### read_json()

In [6]:
from nimbro_utils.lazy import read_json
success, message, content = read_json(file_path=file_path)
print("success:", success)
print("message:", message)
print("content:", content)

success: True
message: Read file './test.json' in '0.001s'.
content: {'number': 0, 'null': None}


In [7]:
import os
os.remove(file_path)

## Miscellaneous utilities

### escape

In [8]:
from nimbro_utils.lazy import escape
for key in ['red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'gray', 'darkgray', 'darkred', 'darkgreen', 'darkyellow', 'darkblue', 'darkmagenta', 'darkcyan', 'bg_red']:
    print(f"{escape[key]}This is a test using color '{key}'.{escape['end']}")

[91mThis is a test using color 'red'.[0m
[92mThis is a test using color 'green'.[0m
[93mThis is a test using color 'yellow'.[0m
[94mThis is a test using color 'blue'.[0m
[95mThis is a test using color 'magenta'.[0m
[96mThis is a test using color 'cyan'.[0m
[37mThis is a test using color 'gray'.[0m
[90mThis is a test using color 'darkgray'.[0m
[31mThis is a test using color 'darkred'.[0m
[32mThis is a test using color 'darkgreen'.[0m
[33mThis is a test using color 'darkyellow'.[0m
[34mThis is a test using color 'darkblue'.[0m
[35mThis is a test using color 'darkmagenta'.[0m
[36mThis is a test using color 'darkcyan'.[0m
[101mThis is a test using color 'bg_red'.[0m


### update_dict()

In [9]:
from nimbro_utils.lazy import update_dict
old_dict = {
    'name': "Bastian",
    'age': 30
}
new_dict = {
    'profession': "playing with robots",
    'age': 31
}
result_dict = update_dict(old_dict=old_dict, new_dict=new_dict)
print(result_dict)

{'profession': 'playing with robots', 'age': 31, 'name': 'Bastian'}


### count_duplicates()

In [10]:
from nimbro_utils.lazy import count_duplicates
data = [0, 0, 1, 1, 2, 3, 4]
print(count_duplicates(data))
print(count_duplicates(data, include_unique=True))

{0: 2, 1: 2}
{0: 2, 1: 2, 2: 1, 3: 1, 4: 1}


### start_jobs()

In [11]:
import time
from nimbro_utils.lazy import start_jobs

def wait(seconds=0):
    if seconds == 2:
        raise ValueError("Cannot sleep for two seconds!")
    time.sleep(seconds)
    return seconds

print("Starting jobs...")
results = start_jobs(
    jobs=[wait, wait, wait, wait, wait],
    job_args=[None, {'seconds': 2}, {'seconds': 100}, {'seconds': 3}, ([5], {})],
    timeout=6,
    max_workers=None
)
print("Jobs done")

for i, result in enumerate(results):
    print(f"job '{i}':")
    print("\tsuccess:", result[0])
    print("\tmessage:", result[1])
    print("\tresult:", result[2])

Starting jobs...
Jobs done
job '0':
	success: True
	message: Job completed after '0.00s'.
	result: 0
job '1':
	success: False
	message: Job raised an exception after '0.00s': ValueError('Cannot sleep for two seconds!')
	result: None
job '2':
	success: False
	message: Job did not finish after timeout of '6.00s'.
	result: None
job '3':
	success: True
	message: Job completed after '3.00s'.
	result: 3
job '4':
	success: True
	message: Job completed after '5.00s'.
	result: 5


### try_callback()

In [12]:
import time
from rclpy.node import Node
from nimbro_utils.lazy import try_callback, start_and_spin_node, stop_node

class TestNode(Node):
    def __init__(self, context=None):
        super().__init__("test_node", context=context)
        # self.create_timer(0.2, self.callback)
        self.create_timer(0.2, try_callback(callback=self.callback, mode="error", logger=self.get_logger()))
        self.called = 0
    def callback(self):
        self.called += 1
        self.get_logger().info(f"timer callback {self.called}")
        if self.called == 3:
            raise ValueError("Callback cannot be called more then 3 times!")

node_env = start_and_spin_node(TestNode, blocking=False)
time.sleep(2)
stop_node(*node_env)

[32m>[36m Starting node 'TestNode'[0m
[0m2025-09-10 17:16:30.890 [INFO] [test_node]: timer callback 1[0m
[0m2025-09-10 17:16:30.990 [INFO] [test_node]: timer callback 2[0m
[0m2025-09-10 17:16:31.190 [INFO] [test_node]: timer callback 3[0m
[31m2025-09-10 17:16:31.192 [ERROR] [test_node]: ValueError('Callback cannot be called more then 3 times!')[0m
[0m2025-09-10 17:16:31.392 [INFO] [test_node]: timer callback 4[0m
[0m2025-09-10 17:16:31.590 [INFO] [test_node]: timer callback 5[0m
[0m2025-09-10 17:16:31.790 [INFO] [test_node]: timer callback 6[0m
[0m2025-09-10 17:16:31.990 [INFO] [test_node]: timer callback 7[0m
[0m2025-09-10 17:16:32.190 [INFO] [test_node]: timer callback 8[0m
[0m2025-09-10 17:16:32.393 [INFO] [test_node]: timer callback 9[0m
[0m2025-09-10 17:16:32.590 [INFO] [test_node]: timer callback 10[0m
[31m> [36mStopped node 'TestNode'[0m


In [13]:
from rclpy._rclpy_pybind11 import InvalidHandle

### convert_stamp()

In [14]:
import time
from nimbro_utils.lazy import convert_stamp
iso = convert_stamp(stamp=time.time(), target_format="iso")
print(iso)
msg = convert_stamp(stamp=iso, target_format="msg")
print(msg)
rclpy_time = convert_stamp(stamp=msg, target_format="rclpy")
print(rclpy_time)
seconds = convert_stamp(stamp=rclpy_time, target_format="seconds")
print(seconds)

2025-09-10T15:16:33.622260
builtin_interfaces.msg.Time(sec=1757510193, nanosec=622260093)
Time(nanoseconds=1757510193622260093, clock_type=SYSTEM_TIME)
1757510193.62226


### log_lines()

In [15]:
from nimbro_utils.lazy import log_lines
text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
log_lines(text=text, line_length=70, line_highlight="| ", block_format=True, allow_empty_lines=False, max_lines=100, logger=None, severity=None)


Lorem  ipsum  dolor  sit  amet,  consectetur  adipiscing  elit,  sed  do
| eiusmod  tempor  incididunt  ut labore et dolore magna aliqua. Ut enim
| ad  minim  veniam,  quis  nostrud exercitation ullamco laboris nisi ut
| aliquip   ex   ea   commodo   consequat.  Duis  aute  irure  dolor  in
| reprehenderit  in  voluptate  velit esse cillum dolore eu fugiat nulla
| pariatur.  Excepteur  sint  occaecat  cupidatat  non proident, sunt in
| culpa qui officia deserunt mollit anim id est laborum.


### get_package_path()

In [16]:
from nimbro_utils.lazy import get_package_path
print(get_package_path("nimbro_utils"))

/home/paetzoldb0/ws/jazzy/main/src/nimbro_utils


### in_jupyter_notebook()

In [17]:
from nimbro_utils.lazy import in_jupyter_notebook

In [18]:
print(in_jupyter_notebook())

True
