In [1]:
from dfttools import *
from dfttools import g
from PyMCP2221A import PyMCP2221A
mcp = PyMCP2221A.PyMCP2221A()
mcp.I2C_Init()

def i2c_write_callback( device_address: int, register_address: int, value: int, register: dict) -> bool:
    if mcp is None:
        print("I2C bridge not available")
        return False
    try:
        # read_data_bytes  = self.i2c_read_callback( device_address, register_address, register)
        # Write the register address to prepare for reading
        mcp.I2C_Write(device_address, bytearray([0x00,register_address]))
        
        # # Read one byte from the device
        read_data_bytes = mcp.I2C_Read(device_address, 1)
        if not read_data_bytes or len(read_data_bytes) < 1:
            print("Failed to read data from I2C device")
            return False
        
        read_data_int = read_data_bytes[0]  # get integer value of first byte

        mask = int(register['Mask'], 16) if 'Mask' in register else 0xFF
        lsb = int(register.get('POS', 0))  # default to 0 if POS not given

        # Prepare new value to write with masked bits updated
        # Shift value by LSB to align bits, then mask and combine
        new_value = (read_data_int & ~mask) | (value  )
        # Write back register address plus new_value to device
        # Depending on device protocol, address might need to be sent before data
        mcp.I2C_Write(device_address, bytearray([0x00,register_address, new_value]))
        mcp.I2C_Write(device_address, bytearray([0x00,register_address]))
        
        # # Read one byte from the device
        read_data = mcp.I2C_Read(device_address, 1)[-1]
        print(f"Writing 0x{new_value:02X} to device 0x{device_address:X}, register 0x{register_address:X}, post read data 0x{read_data:X}")

        return True
    except Exception as e:
        print(f"i2c_write_callback error: {e}")
        return False
# I2C read callback with optional bit masking
def i2c_read_callback( device_address: int, register_address: int, register: dict = None) -> int:
    if mcp is None:
        print("I2C bridge not available")
        return 0
    try:
        # append 0x00 for the 16bit register address 
        mcp.I2C_Write(device_address, bytearray([0x00,register_address]))
        read_data = mcp.I2C_Read(device_address, 1)
        if not read_data:
            print("I2C read returned no data")
            return 0
        raw_value = read_data[0]
        # print(raw_value)
        # if register:
        #     mask = int(register['Mask'], 16)
        #     LSB = register['POS']
        #     value = (raw_value & mask) 
        # else:
        #     value = raw_value
        print(f"Read 0x{raw_value:02X} from device 0x{device_address:X}, register 0x{register_address:X}")
        return raw_value
    except Exception as e:
        print(f"i2c_read_callback error: {e}")
        return 0
g.hardware_callbacks['i2c_write'] = i2c_write_callback
g.hardware_callbacks['i2c_read'] = i2c_read_callback
I2C_REG_WRITE(0x38,0xFE,0,0)
I2C_WRITE(device_address="0x38",field_info={'fieldname': 'island_bias_connect_time', 'length': 2, 'registers': [{'REG': '0x2B', 'POS': 2, 'RegisterName': 'ISLAND_BIAS_SETTINGS_1', 'RegisterLength': 8, 'Name': 'island_bias_connect_time[1:0]', 'Mask': '0xC', 'Length': 2, 'FieldMSB': 1, 'FieldLSB': 0, 'Attribute': '000NNNNN', 'Default': '0x02', 'User': 'YYYYYYYY', 'Clocking': 'SMB', 'Reset': 'C', 'PageName': 'PAG0'}]},write_value=0x2)
ext_offset_comp = I2C_READ(device_address="0x38",field_info={'fieldname': 'island_bias_connect_time', 'length': 2, 'registers': [{'REG': '0x2B', 'POS': 2, 'RegisterName': 'ISLAND_BIAS_SETTINGS_1', 'RegisterLength': 8, 'Name': 'island_bias_connect_time[1:0]', 'Mask': '0xC', 'Length': 2, 'FieldMSB': 1, 'FieldLSB': 0, 'Attribute': '000NNNNN', 'Default': '0x02', 'User': 'YYYYYYYY', 'Clocking': 'SMB', 'Reset': 'C', 'PageName': 'PAG0'}]},expected_value=0x1)
print(ext_offset_comp)

Writing 0x09 to device 0x38, register 0x2B, post read data 0x9
Read 0x09 from device 0x38, register 0x2B
9
1


In [1]:
import sys

if __name__ == '__main__':
    print(sys.argv)

['/home/harkum/Documents/myCode/dfttools/env/lib64/python3.11/site-packages/ipykernel_launcher.py', '--f=/run/user/1000/jupyter/runtime/kernel-v3a682542f63112f7e150d2e09e57f0f49fb59f4a8.json']


In [7]:
import uuid
from collections import defaultdict

class GlobalContext:
    def __init__(self):
        # Main cache: { unique_key (str) : { subkey: subvalue, ... } }
        self.cache = {}
        # Reverse index: { subkey : { subvalue: set_of_keys } }
        self.reverse_index = defaultdict(lambda: defaultdict(set))

    def clear_cache(self):
        """Clear all cached data and reverse indices."""
        self.cache.clear()
        self.reverse_index.clear()

    def set_cache(self, subkey, value, key=None):
        """
        Set a value in cache under a unique key and subkey.
        Generates a unique key if none supplied.
        Updates reverse lookup index.
        Returns the key used (new or given).
        """
        if key is None:
            key = str(uuid.uuid4())
        if key not in self.cache:
            self.cache[key] = {}

        # Remove old reverse index entry if key and subkey existed before
        old_value = self.cache[key].get(subkey)
        if old_value is not None:
            self.reverse_index[subkey][old_value].discard(key)

        # Store new value
        self.cache[key][subkey] = value
        self.reverse_index[subkey][value].add(key)

        return key

    def get_cache(self, key, subkey=None):
        """
        Retrieve cache data by key and optional subkey.
        Returns None if key not found.
        """
        if key not in self.cache:
            return None
        if subkey is None:
            return self.cache[key]
        return self.cache[key].get(subkey)

    def get_keys_by_subkey_value(self, subkey, value):
        """
        Retrieve all keys where given subkey has the specified value.
        Returns an empty list if none.
        """
        return list(self.reverse_index.get(subkey, {}).get(value, []))

    def dump_cache(self):
        """
        Dump current cache to global variable and clear internal cache.
        """
        global dumped_cache
        dumped_cache = self.cache.copy()
        self.clear_cache()


# Example usage
if __name__ == "__main__":
    g = GlobalContext()

    # Create entries with automatic keys
    key1 = g.set_cache('name', 'Alice')
    g.set_cache('instruction', 'run', key=key1)

    key2 = g.set_cache('name', 'Bob')
    g.set_cache('instruction', 'jump', key=key2)
    key3 = g.get_keys_by_subkey_value('name', 'Bob')
    g.set_cache('instruction', 'walk', key=key3[0])

    # User only remembers 'name' == 'Alice', search keys:
    keys_found = g.get_keys_by_subkey_value('name', 'Bob')
    print("Keys matching name='Bob':", keys_found[0])

    # Retrieve full data using keys
    for k in keys_found:
        print(f"Data for key {k}:", g.get_cache(k))

    # Dump cache to global variable and clear
    g.dump_cache()
    print("Dumped cache:", dumped_cache)


Keys matching name='Bob': c14f8f38-a396-4049-9bf5-729355afe590
Data for key c14f8f38-a396-4049-9bf5-729355afe590: {'name': 'Bob', 'instruction': 'walk'}
Dumped cache: {'23373faa-824f-4684-9b84-ec11096369cb': {'name': 'Alice', 'instruction': 'run'}, 'c14f8f38-a396-4049-9bf5-729355afe590': {'name': 'Bob', 'instruction': 'walk'}}


In [8]:
import uuid
from collections import defaultdict

class GlobalContext:
    def __init__(self):
        # Cache: {uuid_key: {subkey: [value_history]}}
        self.cache = {}

        # Reverse index: {subkey: {value: set(keys)}}
        self.reverse_index = defaultdict(lambda: defaultdict(set))

    def clear_cache(self):
        self.cache.clear()
        self.reverse_index.clear()

    def set_cache(self, subkey, value, key=None):
        """
        Store value as a new historical entry under key and subkey.
        If key is None, generate a new uuid key.
        """
        if key is None:
            key = str(uuid.uuid4())
        if key not in self.cache:
            self.cache[key] = {}

        # Initialize history list for subkey if not exists
        history = self.cache[key].setdefault(subkey, [])

        # Add new value to history
        history.append(value)

        # Update reverse index for current value
        self.reverse_index[subkey][value].add(key)

        return key

    def get_cache(self, key, subkey=None):
        """
        If subkey specified, return latest value of that subkey history.
        Else return all subkey histories for the key.
        """
        if key not in self.cache:
            return None
        if subkey is None:
            return self.cache[key]
        history = self.cache[key].get(subkey)
        if not history:
            return None
        return history[-1]  # latest value

    def get_keys_by_subkey_value(self, subkey, value):
        """
        Retrieve all keys where subkey has current value.
        """
        return list(self.reverse_index.get(subkey, {}).get(value, []))

    def dump_cache(self):
        global dumped_cache
        dumped_cache = self.cache.copy()
        self.clear_cache()

# Example usage
if __name__ == "__main__":
    g = GlobalContext()

    key1 = g.set_cache('name', 'Alice')
    g.set_cache('instruction', 'run', key=key1)

    key2 = g.set_cache('name', 'Bob')
    g.set_cache('instruction', 'jump', key=key2)

    # Add a historical update
    g.set_cache('instruction', 'walk', key=key2)

    # Fetch latest 'instruction' for Bob
    print("Latest instruction for Bob:", g.get_cache(key2, 'instruction'))  # prints 'walk'

    # Fetch all history for Bob's 'instruction'
    print("Full instruction history for Bob:", g.cache[key2]['instruction'])  # prints ['jump', 'walk']

    # Fetch keys for name='Bob'
    keys_bob = g.get_keys_by_subkey_value('name', 'Bob')
    print("Keys for name=Bob:", keys_bob)

    # Dump at end
    g.dump_cache()
    print("Dumped cache with history:", dumped_cache)


Latest instruction for Bob: walk
Full instruction history for Bob: ['jump', 'walk']
Keys for name=Bob: ['bc6b763f-52e5-4985-a487-1f3127ebe1a6']
Dumped cache with history: {'e8dc54ad-8af1-49b2-8d2c-d541c11cb0af': {'name': ['Alice'], 'instruction': ['run']}, 'bc6b763f-52e5-4985-a487-1f3127ebe1a6': {'name': ['Bob'], 'instruction': ['jump', 'walk']}}


In [None]:
import uuid
from collections import defaultdict

class GlobalContext:
    def __init__(self):
        # Cache: {uuid_key: {subkey: [history_of_values]}}
        self.cache = {}
        # Reverse index: {subkey: {value: set(keys)}}
        self.reverse_index = defaultdict(lambda: defaultdict(set))

    def clear_cache(self):
        self.cache.clear()
        self.reverse_index.clear()
    
    # ---------- Create ----------
    def create(self, subkey_values):
        key = str(uuid.uuid4())
        self.cache[key] = {}
        for sk, val in subkey_values.items():
            self.cache[key][sk] = [val]
            self.reverse_index[sk][val].add(key)
        return key

    # ---------- Read ----------
    def read(self, key, subkey=None):
        if key not in self.cache:
            return None
        if subkey:
            history = self.cache[key].get(subkey)
            return history[-1] if history else None
        else:
            return {k: v[-1] for k, v in self.cache[key].items()}

    def find_keys(self, subkey, value):
        return list(self.reverse_index.get(subkey, {}).get(value, []))

    # ---------- Update ----------
    def update(self, key, subkey, value):
        if key not in self.cache:
            raise KeyError("Record key not found")
        history = self.cache[key].setdefault(subkey, [])
        history.append(value)
        self.reverse_index[subkey][value].add(key)

    # ---------- Delete ----------
    def delete(self, key, subkey=None):
        if key not in self.cache:
            return False
        if subkey is None:
            record = self.cache.pop(key)
            for sk, hist in record.items():
                current = hist[-1] if hist else None
                if current and key in self.reverse_index[sk][current]:
                    self.reverse_index[sk][current].remove(key)
            return True
        else:
            if subkey in self.cache[key]:
                hist = self.cache[key].pop(subkey)
                current = hist[-1] if hist else None
                if current and key in self.reverse_index[subkey][current]:
                    self.reverse_index[subkey][current].remove(key)
                return True
            return False
    
    # ---------- Dump ----------
    def dump_cache(self):
        global dumped_cache
        dumped_cache = self.cache.copy()
        self.clear_cache()

    # -------- Additional Easy Methods --------
    def add_or_update(self, name, subkey, value):
        """
        Add or update an entry by 'name' subkey.
        If name exists in cache, update subkey; else create new record.
        Returns the key.
        """
        keys = self.find_keys('name', name)
        if keys:
            key = keys[0]
            self.update(key, subkey, value)
        else:
            key = self.create({'name': name, subkey: value})
        return key

    def get_latest_by_name(self, name, subkey=None):
        """
        Get latest subkey value or full record for given 'name' subkey value.
        """
        keys = self.find_keys('name', name)
        if not keys:
            return None
        return self.read(keys[0], subkey)

    def delete_by_name(self, name):
        """
        Delete entire record(s) matching given 'name' subkey.
        """
        keys = self.find_keys('name', name)
        success = False
        for key in keys:
            res = self.delete(key)
            success = success or res
        return success

    def get_all_keys(self):
        """
        Get a list of all keys currently cached.
        """
        return list(self.cache.keys())

    def get_all_records(self):
        """
        Get a list of latest values of all cached records.
        """
        return [self.read(key) for key in self.cache.keys()]

    
# Example usage
if __name__ == "__main__":
    g = GlobalContext()
    
    # Use easier methods
    key1 = g.add_or_update('I2C', 'W', {'Reg':0x0f,'data':0x1E,'comments':''})
    key2 = g.add_or_update('I2C', 'R', {'Reg':0x0E,'data':0x2F,'comments':''})
    g.add_or_update('Force', 'V', {'P+':'IODATA0','P-':'GND','value':0.5e-3,'comments':''})
    g.add_or_update('Measure', 'A', {'P+':'SW','P-':'GNDP','expected_value':0.5e-3,'comments':''})
    g.add_or_update('Trig', 'LH', {'P+':'ADD0','P-':'AGND','Target':0.5e-3,'value':0.45e-3,'comments':''})
    g.add_or_update('I2C', 'W', {'Reg':0x0f,'data':0x1f,'comments':' last bit spk_en=1'})



Alice's instruction: run
Bob's record: {'name': 'Bob', 'instruction': 'jump'}
Bob's updated instruction: walk
All keys: ['d14f436d-0f34-4f19-a42c-dc9cef28d590', '1395146c-d078-4733-adea-9d1e236e3175']
All records: [{'name': 'Alice', 'instruction': 'run'}, {'name': 'Bob', 'instruction': 'walk'}]
After deleting Alice, all keys: ['1395146c-d078-4733-adea-9d1e236e3175']


In [21]:
import uuid
from collections import defaultdict
from copy import deepcopy

class NestedDB:
    def __init__(self):
        self.records = {}  # _id:str -> record:dict
        self.reverse_index = defaultdict(lambda: defaultdict(set))  # field -> value -> set of _ids

    def _flatten_dict(self, d, parent_key=''):
        items = {}
        for k, v in d.items():
            new_key = f"{parent_key}.{k}" if parent_key else k
            if isinstance(v, dict):
                items.update(self._flatten_dict(v, new_key))
            else:
                items[new_key] = v
        return items

    def _index_record(self, record):
        flat_record = self._flatten_dict(record)
        for key, val in flat_record.items():
            if key != '_id':
                self.reverse_index[key][val].add(record['_id'])

    def _deindex_record(self, record):
        flat_record = self._flatten_dict(record)
        for key, val in flat_record.items():
            if key != '_id':
                self.reverse_index[key][val].discard(record['_id'])

    def create(self, record_data):
        rec = deepcopy(record_data)
        rec['_id'] = str(uuid.uuid4())
        self.records[rec['_id']] = rec
        self._index_record(rec)
        return rec['_id']

    def read(self, _id):
        rec = self.records.get(_id)
        return deepcopy(rec) if rec else None

    def update(self, _id, update_data):
        if _id not in self.records:
            raise KeyError(f"Record with _id {_id} not found")
        old_rec = self.records[_id]
        self._deindex_record(old_rec)
        old_rec.update(update_data)
        self.records[_id] = old_rec
        self._index_record(old_rec)

    def delete(self, _id):
        rec = self.records.pop(_id, None)
        if not rec:
            return False
        self._deindex_record(rec)
        return True

    def _match_criteria(self, record, criteria):
        flat_record = self._flatten_dict(record)
        for key, val in criteria.items():
            if flat_record.get(key) != val:
                return False
        return True

    def find(self, criteria):
        flat_criteria = criteria
        candidate_sets = []
        for ck, cv in flat_criteria.items():
            ids = self.reverse_index.get(ck, {}).get(cv)
            if ids:
                candidate_sets.append(ids)
            else:
                return []
        candidate_ids = set.intersection(*candidate_sets) if candidate_sets else set(self.records.keys())
        results = [deepcopy(self.records[_id]) for _id in candidate_ids if self._match_criteria(self.records[_id], criteria)]
        return results

    def get_last_record(self, criteria):
        recs = self.find(criteria)
        return recs[-1] if recs else None

    def dump(self):
        return deepcopy(list(self.records.values()))

# Example usage
if __name__ == "__main__":
    db = NestedDB()

    rec1 = db.create({'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 0x0f, 'data': 0x1E}, 'comments': ''})
    rec2 = db.create({'Instruction': 'Force', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.5e-3}, 'comments': ''})
    rec2 = db.create({'Instruction': 'Measure', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.5e-3}, 'comments': ''})
    rec3 = db.create({'Instruction': 'I2C', 'unit': 'R', 'data': {'Reg': 0x0f, 'data': 0x2F}, 'comments': 'test'})

    print("Read rec1:", db.read(rec1))

    # Find using flattened keys for nested data fields
    results = db.find({'Instruction': 'Measure', 'data.P+': 'IODATA0'})
    print(f"Found {len(results)} record(s) matching criteria 'Force' with data.P+ == 'IODATA0':")
    for r in results:
        print(r)

    last = db.get_last_record({'Instruction': 'I2C', 'data.Reg': 0x0f})
    print("Last record matching 'I2C' with data.Reg==0x0f:", last)

    db.update(rec1, {'comments': 'Updated comment', 'data': {'Reg': 0x0f, 'data': 0x1F}})
    print("Updated rec1:", db.read(rec1))

    db.delete(rec2)
    print("All records after deleting rec2:")
    for r in db.dump():
        print(r)


Read rec1: {'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': 'ef44bcdb-0a63-4fa1-8808-01ef4fde72df'}
Found 1 record(s) matching criteria 'Force' with data.P+ == 'IODATA0':
{'Instruction': 'Measure', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.0005}, 'comments': '', '_id': '82555f23-6ea3-4373-ac9d-9b726d4d988d'}
Last record matching 'I2C' with data.Reg==0x0f: {'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': 'ef44bcdb-0a63-4fa1-8808-01ef4fde72df'}
Updated rec1: {'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 31}, 'comments': 'Updated comment', '_id': 'ef44bcdb-0a63-4fa1-8808-01ef4fde72df'}
All records after deleting rec2:
{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 31}, 'comments': 'Updated comment', '_id': 'ef44bcdb-0a63-4fa1-8808-01ef4fde72df'}
{'Instruction': 'Force', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.0005}, 'comments'

In [22]:
from tinydb import TinyDB, Query
from tinydb.storages import MemoryStorage
import uuid

# Initialize an in-memory TinyDB (use filename instead of MemoryStorage for file persistence)
db = TinyDB(storage=MemoryStorage)
Record = Query()

# ---------- Create ----------
def insert_record(data):
    """Add a record with an automatic unique _id field"""
    rec = dict(data)  # shallow copy
    rec['_id'] = str(uuid.uuid4())
    return db.insert(rec)

# ---------- Read ----------
def get_record_by_id(doc_id):
    """Get a record by TinyDB document id"""
    return db.get(doc_id=doc_id)

def find_records(criteria):
    """
    Find records matching criteria dict.
    Supports nested keys via dot notation.
    Example: {'Instruction': 'Force', 'data.P+': 'IODATA0'}
    """
    # Build TinyDB query dynamically
    query = None
    for key, value in criteria.items():
        # Support nested key querying using where().test or direct attribute access
        keys = key.split('.')
        condition = build_nested_condition(keys, value)
        query = condition if query is None else (query & condition)
    return db.search(query) if query else db.all()

def build_nested_condition(keys, value):
    """Build tinydb query for nested keys"""
    if len(keys) == 1:
        return (Record[keys[0]] == value)
    else:
        # recursively build query
        return (Record[keys[0]].test(lambda d: isinstance(d, dict) and get_nested_value(d, keys[1:]) == value))

def get_nested_value(d, keys):
    for k in keys:
        d = d.get(k, None)
        if d is None:
            return None
    return d

# ---------- Update ----------
def update_record(doc_id, update_vals):
    """Update fields of a record with given doc_id"""
    db.update(update_vals, doc_ids=[doc_id])

# ---------- Delete ----------
def delete_record(doc_id):
    """Delete record by doc_id"""
    db.remove(doc_ids=[doc_id])

# ---------- Dump ----------
def dump_records():
    """Return all records as list"""
    return db.all()

# ---------- Example Usage ----------

if __name__ == "__main__":
    # Insert sample data
    id1 = insert_record({'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 0x0f, 'data': 0x1E}, 'comments': ''})
    id2 = insert_record({'Instruction': 'Force', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.5e-3}, 'comments': ''})
    id3 = insert_record({'Instruction': 'I2C', 'unit': 'R', 'data': {'Reg': 0x0f, 'data': 0x2F}, 'comments': 'test'})

    print("Get record by doc_id:")
    print(get_record_by_id(id1))

    print("\nFind records by nested criteria:")
    found = find_records({'Instruction': 'Force', 'data.P+': 'IODATA0'})
    for rec in found:
        print(rec)

    print("\nUpdate record:")
    update_record(id1, {'comments': 'Updated comment'})
    print(get_record_by_id(id1))

    print("\nDelete record:")
    delete_record(id2)
    print("Records after deletion:", dump_records())


Get record by doc_id:
{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': '0fc7a61a-2a9f-4cb3-b811-ae301163ef87'}

Find records by nested criteria:
{'Instruction': 'Force', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.0005}, 'comments': '', '_id': 'ab289714-1034-486a-980f-0e4eb34d450e'}

Update record:
{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': 'Updated comment', '_id': '0fc7a61a-2a9f-4cb3-b811-ae301163ef87'}

Delete record:
Records after deletion: [{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': 'Updated comment', '_id': '0fc7a61a-2a9f-4cb3-b811-ae301163ef87'}, {'Instruction': 'I2C', 'unit': 'R', 'data': {'Reg': 15, 'data': 47}, 'comments': 'test', '_id': 'ac926382-3809-4702-b4b1-b61b7aad12ce'}]


In [None]:
import uuid
import time
from collections import defaultdict
from copy import deepcopy

class NestedDB:
    def __init__(self):
        self.records = {}
        self.reverse_index = defaultdict(lambda: defaultdict(set))

    def _flatten_dict(self, d, parent_key=''):
        items = {}
        for k, v in d.items():
            new_key = f"{parent_key}.{k}" if parent_key else k
            if isinstance(v, dict):
                items.update(self._flatten_dict(v, new_key))
            else:
                items[new_key] = v
        return items

    def _index_record(self, record):
        flat_record = self._flatten_dict(record)
        for key, val in flat_record.items():
            if key != '_id':
                self.reverse_index[key][val].add(record['_id'])

    def _deindex_record(self, record):
        flat_record = self._flatten_dict(record)
        for key, val in flat_record.items():
            if key != '_id':
                self.reverse_index[key][val].discard(record['_id'])

    def create(self, record_data):
        rec = deepcopy(record_data)
        rec['_id'] = str(uuid.uuid4())
        rec['_created'] = time.time()
        self.records[rec['_id']] = rec
        self._index_record(rec)
        return rec['_id']

    def read(self, _id):
        rec = self.records.get(_id)
        return deepcopy(rec) if rec else None

    def update(self, _id, update_data):
        if _id not in self.records:
            raise KeyError(f"Record with _id {_id} not found")
        old_rec = self.records[_id]
        self._deindex_record(old_rec)
        old_rec.update(update_data)
        self.records[_id] = old_rec
        self._index_record(old_rec)

    def delete(self, _id):
        rec = self.records.pop(_id, None)
        if not rec:
            return False
        self._deindex_record(rec)
        return True

    def _match_criteria(self, record, criteria):
        flat_record = self._flatten_dict(record)
        for key, val in criteria.items():
            if flat_record.get(key) != val:
                return False
        return True

    def find(self, criteria):
        candidate_sets = []
        for ck, cv in criteria.items():
            ids = self.reverse_index.get(ck, {}).get(cv)
            if ids:
                candidate_sets.append(ids)
            else:
                return []
        candidate_ids = set.intersection(*candidate_sets) if candidate_sets else set(self.records.keys())
        return [deepcopy(self.records[_id]) for _id in candidate_ids if self._match_criteria(self.records[_id], criteria)]

    def find_last_record(self, criteria):
        matched = self.find(criteria)
        if not matched:
            return None
        return max(matched, key=lambda r: r.get('_created', 0))

    def dump(self):
        return deepcopy(list(self.records.values()))

    def count(self, criteria=None):
        if criteria:
            return len(self.find(criteria))
        return len(self.records)
    
    def delete_by_criteria(self, criteria):
        records_to_delete = self.find(criteria)
        count = 0
        for rec in records_to_delete:
            if self.delete(rec['_id']):
                count += 1
        return count
    
    def find_modify_update(self, criteria, modify_func):
        """
        Finds all records matching criteria, applies modify_func(record) to each,
        updates and returns updated records list.
        """
        records = self.find(criteria)
        updated = []
        for rec in records:
            modified = deepcopy(rec)
            modified = modify_func(modified)
            if modified and modified != rec:
                self.update(rec['_id'], modified)
                updated.append(self.read(rec['_id']))
        return updated
    
    def clear(self):
        self.records.clear()
        self.reverse_index.clear()


# Example usage demonstrating all APIs
if __name__ == "__main__":
    db = NestedDB()

    # Create
    id1 = db.create({'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 0x0f, 'data': 0x1E}, 'comments': ''})
    id2 = db.create({'Instruction': 'Force', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.5e-3}, 'comments': ''})
    
    print("All records after creation:")
    print(db.dump())

    # Read
    print("\nRead record id1:")
    print(db.read(id1))

    # Find
    criteria = {'Instruction': 'I2C', 'data.Reg': 0x0f}
    found = db.find(criteria)
    print("\nFind records matching criteria", criteria)
    for item in found:
        print(item)

    # Find last record
    last = db.find_last_record(criteria)
    print("\nLast record matching criteria:")
    print(last)

    # Update
    print("\nUpdate record id1 - add comment and change data:")
    db.update(id1, {'comments': 'Updated comment', 'data': {'Reg': 0x0f, 'data': 0x2F}})
    print(db.read(id1))

    # Find-modify-update in one step
    print("\nFind-modify-update each 'Force' record by appending '[MODIFIED]' to comments:")
    def modify_func(rec):
        rec['comments'] = rec.get('comments', '') + ' [MODIFIED]'
        return rec
    updated_list = db.find_modify_update({'Instruction': 'Force'}, modify_func)
    for rec in updated_list:
        print(rec)

    # Count
    print("\nCount records with Instruction='I2C':", db.count({'Instruction': 'I2C'}))

    # Delete by id
    print("\nDelete record id2:")
    db.delete(id2)
    print(db.dump())

    # Delete by criteria
    print("\nDelete remaining 'I2C' records by criteria:")
    deleted_count = db.delete_by_criteria({'Instruction': 'I2C'})
    print("Deleted count:", deleted_count)
    print(db.dump())


All records after creation:
[{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': 'c97c4bd2-44b8-4615-a653-83f1aa0fa9b3', '_created': 1762701165.390533}, {'Instruction': 'Force', 'unit': 'V', 'data': {'P+': 'IODATA0', 'P-': 'GND', 'value': 0.0005}, 'comments': '', '_id': 'b6b65d51-f492-4f89-a872-cb8a6bc56cbe', '_created': 1762701165.3905737}]

Read record id1:
{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': 'c97c4bd2-44b8-4615-a653-83f1aa0fa9b3', '_created': 1762701165.390533}

Find records matching criteria {'Instruction': 'I2C', 'data.Reg': 15}
{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': 'c97c4bd2-44b8-4615-a653-83f1aa0fa9b3', '_created': 1762701165.390533}

Last record matching criteria:
{'Instruction': 'I2C', 'unit': 'W', 'data': {'Reg': 15, 'data': 30}, 'comments': '', '_id': 'c97c4bd2-44b8-4615-a653-83f1aa0fa9b3', '_created': 1762701165.390533}

Updat

In [1]:
from dfttools import *
from dfttools import g
otp_burnt_flag = I2C_READ(device_address=0x38, field_info={'fieldname': 'otp_burnt', 'length': 1, 'registers': [{'REG': '0xB7', 'POS': 7, 'RegisterName': 'OTP FIELDS 7', 'RegisterLength': 8, 'Name': 'otp_burnt', 'Mask': '0x80', 'Length': 1, 'FieldMSB': 7, 'FieldLSB': 7, 'Attribute': 'NNNNNNNN', 'Default': '0xC0', 'User': '00000000', 'Clocking': 'FRO', 'Reset': 'C', 'PageName': 'PAG1 -  TEST'}]}, expected_value=0x1)
print(f'otp_burnt_flag {hex(otp_burnt_flag)}')
I2C_WRITE(device_address=0x38, field_info={'fieldname': 'otp_burnt', 'length': 1, 'registers': [{'REG': '0xB7', 'POS': 7, 'RegisterName': 'OTP FIELDS 7', 'RegisterLength': 8, 'Name': 'otp_burnt', 'Mask': '0x80', 'Length': 1, 'FieldMSB': 7, 'FieldLSB': 7, 'Attribute': 'NNNNNNNN', 'Default': '0xC0', 'User': '00000000', 'Clocking': 'FRO', 'Reset': 'C', 'PageName': 'PAG1 -  TEST'}]}, write_value=0x1)
vsns_value =   I2C_READ("0x38", field_info={'fieldname': 'tst_data_dwa', 'length': 9, 'registers': [{'REG': '0x11', 'POS': 0, 'RegisterName': 'DAC test 1', 'RegisterLength': 8, 'Name': 'tst_data_dwa[8]', 'Mask': '0x1', 'Length': 1, 'FieldMSB': 8, 'FieldLSB': 8, 'Attribute': 'N000000N', 'Default': '0x00', 'User': '00000000', 'Clocking': 'REF', 'Reset': 'C', 'PageName': 'PAG1'}, {'REG': '0x12', 'POS': 0, 'RegisterName': 'DAC test 2', 'RegisterLength': 8, 'Name': 'tst_data_dwa[7:0]', 'Mask': '0xFF', 'Length': 8, 'FieldMSB': 7, 'FieldLSB': 0, 'Attribute': 'NNNNNNNN', 'Default': '0x00', 'User': '00000000', 'Clocking': 'REF', 'Reset': 'C', 'PageName': 'PAG1'}]}, expected_value=0x100)
print(f'vsns_value {hex(vsns_value)}')
vsns_value =   I2C_WRITE("0x38", field_info={'fieldname': 'tst_data_dwa', 'length': 9, 'registers': [{'REG': '0x11', 'POS': 0, 'RegisterName': 'DAC test 1', 'RegisterLength': 8, 'Name': 'tst_data_dwa[8]', 'Mask': '0x1', 'Length': 1, 'FieldMSB': 8, 'FieldLSB': 8, 'Attribute': 'N000000N', 'Default': '0x00', 'User': '00000000', 'Clocking': 'REF', 'Reset': 'C', 'PageName': 'PAG1'}, {'REG': '0x12', 'POS': 0, 'RegisterName': 'DAC test 2', 'RegisterLength': 8, 'Name': 'tst_data_dwa[7:0]', 'Mask': '0xFF', 'Length': 8, 'FieldMSB': 7, 'FieldLSB': 0, 'Attribute': 'NNNNNNNN', 'Default': '0x00', 'User': '00000000', 'Clocking': 'REF', 'Reset': 'C', 'PageName': 'PAG1'}]}, write_value=0x100)

otp_burnt_flag 0x1
vsns_value 0x100


In [4]:
(1 & ((1 << 1) - 1))

1