In [7]:
atomJson={
    # each schema must define its own name; it is used as key in the schemaRegistry
    # which allows to access nested data
    '_schema':'atom',
    'identity':{
        # dtype is mandatory for "data" items
        # see https://docs.h5py.org/en/stable/faq.html for list of datatypes supported by h5py/numpy
        'element':{'dtype':'a2'}, # 2 characters ascii
        # inherent property: values stored in the schema and looked up on-demand
        'atomicNumber':{
            'dtype':'l', # long int
            'key':'identity.element', # lookup key
            'lookup':{'H':1,'C':6,'N':7,'Na':11,'Cl':17,'Fe':26} # lookup table
        },
        # for units, see here https://docs.astropy.org/en/stable/units/index.html
        # every unit has its base name plus aliases, so Anstrom is the same as AA and so on
        # whatever astropy.units.Unit('...') parses can be used here
        # we additionally import "e" for elementary charge (which is technically a constant, not unit)
        # data are stored and returned in the unit declared
        'atomicMass':{'dtype':'f','key':'identity.element','unit':'Dalton',
            'lookup':{'H':1.0079,'C':12.0107,'N':14.0067,'Na':22.9897,'Cl':35.453,'Fe':55.845},
        },
    },
    'properties':{
        'physical':{
            'partialCharge':{
                'neutral': {'dtype':'d','unit':'e'},
                'anion': {'dtype':'d','unit':'e'},
                'cation': {'dtype':'d','unit':'e'},
            },
            'polarizability':{
                'neutral': {'dtype':'d','unit':'AA^2 s^4 kg^-1'},
                'anion': {'dtype':'d','unit':'AA^2 s^4 kg^-1'},
                'cation': {'dtype':'d','unit':'AA^2 s^4 kg^-1'},
            }
        },
        'topology':{
            'parent':{'dtype':'l'},
            'type':{'dtype':'a','shape':'variable'},
            'name':{'dtype':'a','shape':'variable'},
            # shape is dimension for array variables, thus position and velocity are 3-vectors
            # the number of shape elements determines the rank, thus e.g. [3,3] would be
            # rank-2 tensor 
            'position':{'dtype':'d','shape':[3],'unit':'AA'},
            'velocity':{'dtype':'d','shape':[3],'unit':'AA/ps'},
            # variable-length array of long ints (not functional yet!)
            'structure':{'dtype':'l','shape':'variable'},
        }
    }
}



moleculeJson={
    '_schema':'molecule',
    'identity':{
        'chemicalName':{'dtype':'a','shape':'variable'},
        'molecularWeight':{'dtype':'d','unit':'Dalton'},
    },
    'properties':{
        'electrical':{
            'HOMO':{'dtype':'d','unit':'eV'},
            'LUMO':{'dtype':'d','unit':'eV'},
            'siteEnergy':{
                'orbital':{'dtype':'d','unit':'eV'},
                'electrostatic':{'dtype':'d','unit':'eV'},
                'polarization':{'dtype':'d','unit':'eV'},
            },
            'transferIntegrals':{'dtype':'d','shape':'variable'},
            'reorganizationEnergyInternal':{
                'anion':{'dtype':'d','unit':'eV'},
                'cation':{'dtype':'d','unit':'eV'}
            },
        },
        'physical':{
            'polarizability':{
                'neutral':{'dtype':'d','shape':[3,3],'unit':'AA^2 s^4 kg^-1'},
                'anion':{'dtype':'d','shape':[3,3],'unit':'AA^2 s^4 kg^-1'},
                'cation':{'dtype':'d','shape':[3,3],'unit':'AA^2 s^4 kg^-1'},
            }
        },
        'chemical':{},
    },
    'topology':{
        'parent':{'dtype':'l','unit':'none'},
        'centerOfMass':{'dtype':'d','shape':[3],'unit':'AA'},
        'symmetryAxis':{'dtype':'d','shape':[3],'unit':'AA'},
        'structureNeighbors':{'dtype':'l','shape':'variable'},
    },
    'implementation':{
        'forceFieldType':{'dtype':'a','shape':'variable'},
    },
    'atoms':{'path':'molecule_{ROW}/atoms','schema':'atom'}
}

grainJson={
    '_schema':'grain',
    'identity':{
        'material':{'dtype':'a','shape':'variable'}
    },
    'properties':{
        'eletrical':{
            'freeElectrons':{'dtype':'l','unit':'none'},
            'freeHoles':{'dtype':'l','unit':'none'},
        },
        'physical':{
            'reorganizationEnergyExternal':{'dtype':'d','unit':'eV'}
        },
        'chemical':{},
    },
    'topology':{
        'parent':{'dtype':'l'},
        'cellSize':{'dtype':'d','shape':[3],'unit':'m'},
    },
    'implementation':{
        'boundaryCondition':{'dtype':'a'}
    },
    'molecules':{'path':'grain_{ROW}/molecules','schema':'molecule'}
}


In [18]:

from dataclasses import dataclass
import dataclasses
import types
import astropy.units
import astropy.units as u
astropy.units.add_enabled_units([
    astropy.units.def_unit('e',astropy.constants.si.e),
    astropy.units.def_unit('none',astropy.units.dimensionless_unscaled)
])
import numpy as np
import h5py
from pprint import pprint
from typing import *


def cookSchema(desc,prefix='',schemaName=''):
    '''
    Transform dictionary-structured data schema into context access types.
    The access types are created using the "type" builtin and only stored
    in closures of the functions returning them. The top-level context is
    returned from this function to the user.
    
    get/set methods (and others) are not created on the fly but are instead
    put into those context types. This is substantially more efficient than
    hijacking __getattr__ and __setattr__.
    
    Closures in Python are somewhat unintuitive, since e.g. loop does not
    create a new scope (thus variable reference would later have the value
    in the last iteration step). Therefore local variables are captured via
    local function defaults, which makes some of the code less readable.
    '''
    
    @dataclass
    class CookedSchemaFragment:
        'Internal data used when cookSchema is called recursively'
        dtypes: list   # accumulates numpy dtypes for compound datatype 
        defaults: dict # default values, nan for floats and 0 for integers
        T: Any=None    # nested context type
            
    def dtypeUnitDefault(v):
        'Parse dictionary *v* (part of the schema) and return (dtype,unit,default) tuple'
        shape=v['shape'] if 'shape' in v else ()
        if isinstance(shape,list): shape=tuple(shape)
        unit=astropy.units.Unit(v['unit']) if 'unit' in v else None
        dtype=v['dtype']
        default=None
        if dtype=='a':
            dtype=h5py.string_dtype(encoding='utf-8')
            shape=None
        elif shape=='variable':
            dtype=h5py.vlen_dtype(np.dtype(dtype))
            shape=None
        else:
            dtype=np.dtype((dtype,shape))
            kind=(dtype.kind if not hasattr(dtype,'subtype') or dtype.subtype is None else dtype.subtype[0].kind)
            if kind=='f': default=np.nan
            elif kind in 'iu': default=0
        return dtype,unit,default

    def capitalize(k):
        'Turn the first letter into uppercase'
        return k[0].upper()+k[1:]  
    
    # top-level only
    if not schemaName: schemaName=desc['_schema']
        
    ret=CookedSchemaFragment(dtypes=[],defaults={})
    
    meth={} # accumulate attribute access methods
    
    for key,val in desc.items():
        # fully-qualified name: for messages and compound field name in h5py
        fq=(f"{prefix}.{key}" if prefix else key)
        # special keys start with underscore, so far only _schema is used
        if key.startswith('_'):
            if key=='_schema': continue
            else: raise ValueError(f"Unrecognized special key '{key}' in prefix '{prefix}'.")
        if not isinstance(val,dict): raise TypeError("{fq}: value is not a dictionary.")
        # attribute defined via lookup, not stored
        if 'lookup' in val:
            dtype,unit,default=dtypeUnitDefault(val)
            lKey,lDict=val['key'],val['lookup']
            if isinstance(lKey,bytes): lKey=lKey.decode('utf8')
            # bind local values via default args (closure)
            def inherentGetter(self,*,fq=fq,dtype=dtype,unit=unit,lKey=lKey,lDict=lDict):
                _T_assertDataset(self,f"when looking up '{fq}' based on '{lKey}'.")
                def _lookup(row):
                    k=self.ctx.dataset[lKey,row]
                    if isinstance(k,bytes): k=k.decode('utf8')
                    try: val=np.array(lDict[k],dtype=dtype)[()] # [()] unpacks rank-0 scalar
                    except KeyError: raise KeyError(f"{fq}: key '{k}' ({lKey}) not found in the lookup table with keys {list(lDict.keys())}") from None
                    return val
                # fake broadcasting
                if self.row is None: val=np.array([_lookup(r) for r in range(self.ctx.dataset.shape[0])])
                else: val=_lookup(self.row)
                if unit: return astropy.units.Quantity(value=val,unit=unit)
                else: return val    
            meth['get'+capitalize(key)]=inherentGetter
        # normal data attribute
        elif 'dtype' in val:
            dtype,unit,default=dtypeUnitDefault(val)
            ret.dtypes+=[(fq,dtype)] # add to the compound type
            if default: ret.defaults[fq]=default # add to the defaults
            def getter(self,*,fq=fq,unit=unit):
                _T_assertDataset(self,f"when getting the value of '{fq}'")
                if self.row is not None: value=self.ctx.dataset[fq,self.row]
                else: value=self.ctx.dataset[fq]
                if unit is None: return value
                return astropy.units.Quantity(value=value,unit=unit)
            def setter(self,val,*,fq=fq,unit=unit):
                _T_assertDataset(self,f"when setting the value of '{fq}'")
                if unit: val=(astropy.units.Quantity(val).to(unit)).value
                if self.row is not None: self.ctx.dataset[self.row,fq]=val
                else: self.ctx.dataset[fq]=val            
            meth['get'+capitalize(key)]=getter
            meth['set'+capitalize(key)]=setter
        elif 'path' in val:
            path,schema=val['path'],val['schema']
            for s in ('{ROW}','/'):
                if s not in path:
                    raise ValueError(f"'{fq}': schema ref path '{path}' does not contain '{s}'.")
            def subschemaGetter(self,row=None,*,fq=fq,path=path,schema=schema):
                if self.row is None: raise AttributeError(f"'{fq}': row index not set, unable to follow schema ref.")
                self.ctx.dataset[self.row] # catch invalid row index, data unused
                path=path.replace('{ROW}',str(self.row))
                dir,name=path.rsplit('/',1)
                subgrp=self.ctx.h5group.require_group(dir)
                SchemaT=self.ctx.schemaRegistry[schema]
                return SchemaT(RootContext(h5group=subgrp,h5name=name,schemaRegistry=self.ctx.schemaRegistry),row=row)
            meth['get'+capitalize(key)]=subschemaGetter
        else:
            # recurse
            cooked=cookSchema(val,prefix=fq,schemaName=schemaName)
            ret.dtypes+=cooked.dtypes
            ret.defaults.update(cooked.defaults)
            meth['get'+capitalize(key)]=lambda self, T=cooked.T: T(self)
    # define common methods for the context type
    def T_init(self,other,row=None):
        'Context constructor; copies h5 context and row from *other*, optionally sets *row* as well'
        if isinstance(other,RootContext): self.ctx,self.row=other,row
        else:
            if other.row is not None and row is not None: raise IndexError(f'Context already indexed, with row={row}.')
            self.ctx,self.row=other.ctx,(other.row if row is None else row)
    def T_str(self):
        'Context string representation'
        return F"<{self.__class__.__name__}, row={self.row}, ctx={self.ctx}>"
    def T_getitem(self,row):
        'Indexing access; checks index validity and returns new context with the row set'
        _T_assertDataset(self,msg=f'when trying to index row {row}')
        if(row<0 or row>=self.ctx.dataset.shape[0]): raise IndexError(f"{fq}: row index {row} out of range 0â€¦{self.ctx.dataset.shape[0]}.")
        # self.ctx.dataset[row] # this would raise ValueError but iteration protocol needs IndexError
        return self.__class__(self,row=row)
    def T_len(self):
        'Return sequence length'
        _T_assertDataset(self,msg=f'querying dataset length')
        if self.row is not None: return IndexError('Row index already set, not behaving as sequence.')
        return self.ctx.dataset.shape[0]    
    def _T_assertDataset(T,msg=''):
        'checks that the backing dataset it present/open. Raises exception otherwise.'
        if T.ctx.dataset is None:
            if T.ctx.h5name in T.ctx.h5group: T.ctx.dataset=T.ctx.h5group[T.ctx.h5name]
            else: raise RuntimeError(f'Dataset not yet initialized, use _allocate first{" ("+msg+")" if msg else ""}.')
    def T_allocate(self,size):
        'allocates the backing dataset, setting them to default values.'
        if self.ctx.dataset: raise RuntimeError(f'Dataset already exists (shape {self._values.shape}), re-allocation not supported.')
        self.ctx.dataset=self.ctx.h5group.create_dataset(self.ctx.h5name,shape=(size,),dtype=ret.dtypes,compression='gzip')
        # defaults broadcast to all rows
        for fq,val in ret.defaults.items(): self.ctx.dataset[fq]=val
        # TODO: store schema JSON into dataset attributes
            
    meth['__init__']=T_init
    meth['__str__']=meth['__repr__']=T_str
    meth['__getitem__']=T_getitem
    meth['__len__']=T_len
    meth['row']=None
    meth['ctx']=None

    # those are defined only for the "root" context
    if not prefix:
        meth['allocate']=T_allocate
        ret.dtypes=np.dtype(ret.dtypes)

    # create the context type with all its methods and attributes
    # the () are base classes; put mupif.MupifObject there if necessary
    ret.T=type('Ctx_'+schemaName+'_'+prefix.replace('.','_'),(),meth)
        
    if not prefix:
        ret.T.name=schemaName # schema knows its own name, for convenience of creating schema registry
        return ret.T
    return ret

@dataclass
class RootContext:
    h5group: Any
    h5name: str
    schemaRegistry: dict
    dataset: Any=None
    
    
schemaRegistry=dict([((T:=cookSchema(json)).name,T) for json in (atomJson,moleculeJson,grainJson)])

with h5py.File('/tmp/test-aa1.h5','w') as h5:
    #mols=Hdf5DataProxy(group=h5['/'],name='molTest',schemaName='molecule',schemaRegistry=schemaRegistry)
    atomSchema=schemaRegistry['atom']
    atom=atomSchema(RootContext(h5group=h5['/'],h5name='atom',schemaRegistry=schemaRegistry))
    atom.allocate(10)
    print(atom.getProperties().getPhysical().getPartialCharge().getAnion())
    print(atom.getIdentity().getElement())
    atom.getIdentity().setElement('H')
    print(atom.getIdentity().getElement())
    print(atom[0].getProperties().getTopology().getVelocity())
    # print(np.array(atom.ctx.dataset))
    print(atom.getIdentity().getAtomicMass())
    
    
    molSchema=schemaRegistry['molecule']
    mols=molSchema(RootContext(h5group=h5['/'],h5name='mol',schemaRegistry=schemaRegistry))
    mols.allocate(10)
    print(mols.ctx.dataset)
    # mol[50].getAtoms().getProperties()
    for mol in mols:
        print(mol.getProperties().getElectrical().getHOMO())
        # print(mol.getAtoms().getIdentity().getElement())
        # print(mol.getAtoms()[0])

        # test h5py
def test_grain2():
    import time, random
    t0=time.time()
    atomCounter=0
    with h5py.File('/tmp/test-grain2.h5','w') as h5:
        grp=h5.require_group('grains')
        grains=schemaRegistry['grain'](RootContext(h5group=grp,h5name='grains',schemaRegistry=schemaRegistry))
        grains.allocate(size=5)
        print(f"There is {len(grains)} grains.")
        for ig,g in enumerate(grains):
            g.getMolecules().allocate(size=random.randint(5,50))
            print(f"Grain #{ig} has {len(g.getMolecules())} molecules")
            for m in g.getMolecules():
                m.getIdentity().setMolecularWeight(random.randint(1,10)*u.yg)
                m.getAtoms().allocate(size=random.randint(30,60))
                for a in m.getAtoms():
                    a.getIdentity().setElement(random.choice(['H','N','Cl','Na','Fe']))
                    a.getProperties().getTopology().setPosition((1,2,3)*u.nm)
                    a.getProperties().getTopology().setVelocity((24,5,77)*u.m/u.s)
                    # not yet, see https://stackoverflow.com/q/67192725/761090
                    # a.getProperties().getTopology().setStructure([1,2,3])
                    # np.array([random.randint(1,20) for i in range(random.randint(5,20))],dtype='l')
                    atomCounter+=1
    t1=time.time()
    print(f'{atomCounter} atoms created in {t1-t0:g} sec ({atomCounter/(t1-t0):g}/sec).')

test_grain2()

[nan nan nan nan nan nan nan nan nan nan] e
[b'' b'' b'' b'' b'' b'' b'' b'' b'' b'']
[b'H' b'H' b'H' b'H' b'H' b'H' b'H' b'H' b'H' b'H']
[0. 0. 0.] Angstrom / ps
[1.0079 1.0079 1.0079 1.0079 1.0079 1.0079 1.0079 1.0079 1.0079 1.0079] u
<HDF5 dataset "mol": shape (10,), type "|V384">
nan eV
nan eV
nan eV
nan eV
nan eV
nan eV
nan eV
nan eV
nan eV
nan eV
There is 5 grains.
Grain #0 has 49 molecules
Grain #1 has 47 molecules
Grain #2 has 19 molecules
Grain #3 has 29 molecules
Grain #4 has 12 molecules
6999 atoms created in 6.74871 sec (1037.09/sec).


In [5]:
## writing variable-length items into structured data
## this is currently broken for the HDF5 proxies above
if 1:
    import h5py
    import numpy as np
    with h5py.File('/tmp/test-vla.h5','w') as h5:
        dt=np.dtype([('a',h5py.vlen_dtype(np.dtype('int32')))])
        dset=h5.create_dataset('test',(5,),dtype=dt)
        dset['a'][2]=[1,2,3] # does not write the value back
        dset[2]['a']=[1,2,3] # does not write the value back
        #dset['a',2]=[1,2,3]  # Cannot change data-type for object array
        #dset[2,'a']=[1,2,3]  # Cannot change data-type for object array
        #tmp=dset['a']; tmp[2]=[1,2,3]; dset['a']=tmp # Cannot change data-type for object array
        #tmp=dset[2]; tmp['a']=[1,2,3]; dset[2]=tmp # 'list' object has no attribute 'dtype'
        dset[2]=(np.array([1,2,3]),) # writing the whole row works
        print(np.array(dset))



[(array([], dtype=int32),) (array([], dtype=int32),)
 (array([1, 2, 3], dtype=int32),) (array([], dtype=int32),)
 (array([], dtype=int32),)]
