Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions src/ansys/fluent/core/data_model_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
"""Module to manage datamodel cache."""


from collections import defaultdict
from typing import Any, Dict, List, Union

from ansys.api.fluent.v0.variant_pb2 import Variant

StateType = Union[
bool,
int,
float,
str,
List[bool],
List[int],
List[float],
List[str],
List["StateType"],
Dict[str, "StateType"],
]


class DataModelCache:
"""Class to manage datamodel cache."""

class Empty:
"""Class representing unassigned cached state."""

@staticmethod
def is_unassigned(state: Any) -> bool:
"""Check whether a cached state is unassigned.

Parameters
----------
state : Any
state

Returns
-------
bool
whether a cached state is unassigned
"""
return state is DataModelCache.Empty

rules_str_to_cache = defaultdict(dict)
rules_str_to_config = {}

@staticmethod
def get_config(rules: str, name: str) -> Any:
"""Get datamodel cache configuration value.

Parameters
----------
rules : str
datamodel rules
name : str
configuration name

Returns
-------
Any
configuration value
"""
return DataModelCache.rules_str_to_config.get(rules, {}).get(name, False)

@staticmethod
def set_config(rules: str, name: str, value: Any):
"""Set datamodel cache configuration value.

Parameters
----------
rules : str
datamodel rules
name : str
configuration name
value : Any
configuration value
"""
if rules not in DataModelCache.rules_str_to_config:
DataModelCache.rules_str_to_config[rules] = {}
DataModelCache.rules_str_to_config[rules][name] = value

@staticmethod
def _update_cache_from_variant_state(
rules: str, source: Dict[str, StateType], key: str, state: Variant, updaterFn
):
if state.HasField("bool_state"):
updaterFn(source, key, state.bool_state)
elif state.HasField("int64_state"):
updaterFn(source, key, state.int64_state)
elif state.HasField("double_state"):
updaterFn(source, key, state.double_state)
elif state.HasField("string_state"):
updaterFn(source, key, state.string_state)
elif state.HasField("bool_vector_state"):
updaterFn(source, key, state.bool_vector_state.item)
elif state.HasField("int64_vector_state"):
updaterFn(source, key, state.int64_vector_state.item)
elif state.HasField("double_vector_state"):
updaterFn(source, key, state.double_vector_state.item)
elif state.HasField("string_vector_state"):
updaterFn(source, key, state.string_vector_state.item)
elif state.HasField("variant_vector_state"):
updaterFn(source, key, [])
for item in state.variant_vector_state.item:
DataModelCache._update_cache_from_variant_state(
rules, source, key, item, lambda d, k, v: d[k].append(v)
)
elif state.HasField("variant_map_state"):
internal_names_as_keys = DataModelCache.get_config(
rules, "internal_names_as_keys"
)
if ":" in key:
type_, iname = key.split(":", maxsplit=1)
for k1, v1 in source.items():
if (internal_names_as_keys and k1 == key) or (
(not internal_names_as_keys)
and isinstance(v1, dict)
and v1.get("__iname__") == iname
):
key = k1
break
else: # new named object
if internal_names_as_keys:
source[key] = {}
else:
name = state.variant_map_state.item["_name_"].string_state
key = f"{type_}:{name}"
source[key] = {"__iname__": iname}
else:
if key not in source:
source[key] = {}
source = source[key]
for k, v in state.variant_map_state.item.items():
DataModelCache._update_cache_from_variant_state(
rules, source, k, v, dict.__setitem__
)

@staticmethod
def update_cache(rules: str, state: Variant, deleted_paths: List[str]):
"""Update datamodel cache from streamed state.

Parameters
----------
rules : str
datamodel rules
state : Variant
streamed state
deleted_paths : List[str]
list of deleted paths
"""
cache = DataModelCache.rules_str_to_cache[rules]
internal_names_as_keys = DataModelCache.get_config(
rules, "internal_names_as_keys"
)
for deleted_path in deleted_paths:
comps = [x for x in deleted_path.split("/") if x]
sub_cache = cache
for i, comp in enumerate(comps):
if ":" in comp:
_, iname = comp.split(":", maxsplit=1)
key_to_del = None
for k, v in sub_cache.items():
if (internal_names_as_keys and k == comp) or (
(not internal_names_as_keys)
and isinstance(v, dict)
and v.get("__iname__") == iname
):
if i == len(comps) - 1:
key_to_del = k
else:
sub_cache = v
break
else:
break
if key_to_del:
del sub_cache[key_to_del]
else:
if comp in sub_cache:
sub_cache = sub_cache[comp]
else:
break
for k, v in state.variant_map_state.item.items():
DataModelCache._update_cache_from_variant_state(
rules, cache, k, v, dict.__setitem__
)

@staticmethod
def _dm_path_comp(comp):
return ":".join(comp if comp[1] else comp[0])

@staticmethod
def _dm_path_comp_list(obj):
return [DataModelCache._dm_path_comp(comp) for comp in obj.path]

@staticmethod
def get_state(rules: str, obj: object) -> Any:
"""Retrieve state from datamodel cache

Parameters
----------
rules : str
datamodel rules
obj : object
datamodel object

Returns
-------
_type_
_description_
"""
cache = DataModelCache.rules_str_to_cache[rules]
if not len(cache):
return DataModelCache.Empty
path_components = DataModelCache._dm_path_comp_list(obj)
for path_component in path_components:
cache = cache.get(path_component, None)
if not cache:
return DataModelCache.Empty
return cache

@staticmethod
def _set_state_at_path(cache, path, value):
if len(path) == 0:
return
path_component = path[0]
if len(path) == 1:
cache[path_component] = value
else:
next_cache = cache.get(path_component, None)
if not next_cache:
next_cache = cache[path_component] = dict()
DataModelCache._set_state_at_path(next_cache, path[1:], value)

@staticmethod
def set_state(rules: str, obj: object, value: Any):
"""Set datamodel cache state

Parameters
----------
rules : str
datamodel rules
obj : object
datamodel object
value : Any
state
"""
DataModelCache._set_state_at_path(
DataModelCache.rules_str_to_cache[rules],
DataModelCache._dm_path_comp_list(obj),
value,
)
4 changes: 4 additions & 0 deletions src/ansys/fluent/core/fluent_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from ansys.fluent.core.streaming_services.datamodel_event_streaming import (
DatamodelEvents,
)

# from ansys.fluent.core.streaming_services.datamodel_streaming import DatamodelStream
from ansys.fluent.core.streaming_services.events_streaming import EventsManager
from ansys.fluent.core.streaming_services.monitor_streaming import MonitorsManager
from ansys.fluent.core.streaming_services.transcript_streaming import Transcript
Expand Down Expand Up @@ -212,6 +214,8 @@ def __init__(
self.datamodel_service_se = DatamodelService_SE(self._channel, self._metadata)
self.datamodel_events = DatamodelEvents(self.datamodel_service_se)
self.datamodel_events.start()
# self.datamodel_stream = DatamodelStream(self.datamodel_service_se)
# self.datamodel_stream.start()

self._reduction_service = ReductionService(self._channel, self._metadata)
self.reduction = Reduction(self._reduction_service)
Expand Down
12 changes: 11 additions & 1 deletion src/ansys/fluent/core/services/datamodel_se.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ansys.api.fluent.v0 import datamodel_se_pb2 as DataModelProtoModule
from ansys.api.fluent.v0 import datamodel_se_pb2_grpc as DataModelGrpcModule
from ansys.api.fluent.v0.variant_pb2 import Variant
from ansys.fluent.core.data_model_cache import DataModelCache
from ansys.fluent.core.services.error_handler import catch_grpc_error
from ansys.fluent.core.services.interceptors import BatchInterceptor, TracingInterceptor
from ansys.fluent.core.services.streaming import StreamingService
Expand Down Expand Up @@ -308,14 +309,20 @@ def __init__(self, service: DatamodelService, rules: str, path: Path = None):

docstring = None

def get_state(self) -> Any:
def get_remote_state(self) -> Any:
"""Get state of the current object."""
request = DataModelProtoModule.GetStateRequest()
request.rules = self.rules
request.path = convert_path_to_se_path(self.path)
response = self.service.get_state(request)
return _convert_variant_to_value(response.state)

def get_state(self) -> Any:
state = DataModelCache.get_state(self.rules, self)
if DataModelCache.is_unassigned(state):
state = self.get_remote_state()
return state

getState = get_state

def set_state(self, state: Any = None, **kwargs) -> None:
Expand Down Expand Up @@ -1243,7 +1250,10 @@ def __getattr__(self, attr):
class DataModelType(Enum):
"""An enumeration over datamodel types."""

# Really???

# Tuple: Name, Solver object type, Meshing flag, Launcher options
# Really???
TEXT = (["String", "ListString", "String List"], PyTextualCommandArgumentsSubItem)
NUMBER = (
["Real", "Int", "ListReal", "Real List", "Integer"],
Expand Down
31 changes: 31 additions & 0 deletions src/ansys/fluent/core/session_pure_meshing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
"""


import functools

from ansys.api.fluent.v0 import datamodel_se_pb2
from ansys.fluent.core.data_model_cache import DataModelCache
from ansys.fluent.core.fluent_connection import FluentConnection
from ansys.fluent.core.services.streaming import StreamingService
from ansys.fluent.core.session import BaseSession
from ansys.fluent.core.session_base_meshing import BaseMeshing
from ansys.fluent.core.streaming_services.datamodel_streaming import DatamodelStream
from ansys.fluent.core.utils.data_transfer import transfer_case


Expand All @@ -17,9 +23,34 @@ class PureMeshing(BaseSession):
exposed here. No ``switch_to_solver`` method is available
in this mode."""

use_cache = True
rules = ["workflow", "meshing", "PartManagement", "PMFileManagement"]
for r in rules:
DataModelCache.set_config(r, "internal_names_as_keys", True)

def __init__(self, fluent_connection: FluentConnection):
super(PureMeshing, self).__init__(fluent_connection=fluent_connection)
self._base_meshing = BaseMeshing(self.execute_tui, fluent_connection)
datamodel_service_se = fluent_connection.datamodel_service_se
self.datamodel_streams = {}
if self.use_cache:
for rules in self.__class__.rules:
request = datamodel_se_pb2.DataModelRequest()
request.rules = rules
request.diffstate = (
datamodel_se_pb2.DIFFSTATE_NOCOMMANDS
) # DIFFSTATE_FULL?
streaming = StreamingService(
stub=datamodel_service_se._stub,
request=request,
metadata=datamodel_service_se._metadata,
)
stream = DatamodelStream(streaming)
stream.register_callback(
functools.partial(DataModelCache.update_cache, rules=rules)
)
self.datamodel_streams[rules] = stream
stream.start()

@property
def tui(self):
Expand Down
13 changes: 2 additions & 11 deletions src/ansys/fluent/core/streaming_services/datamodel_streaming.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from ansys.api.fluent.v0 import datamodel_se_pb2
from ansys.fluent.core.services.datamodel_se import _convert_variant_to_value
from ansys.fluent.core.streaming_services.streaming import StreamingService


Expand All @@ -19,19 +18,11 @@ def _process_streaming(self, started_evt):
while True:
try:
response: datamodel_se_pb2.DataModelResponse = next(responses)
print(response)
with self._lock:
self._streaming = True
for _, cb_list in self._service_callbacks.items():
state = (
_convert_variant_to_value(response.state)
if hasattr(response, "state")
else None
)
state = response.state if hasattr(response, "state") else None
deleted_paths = getattr(response, "deletedpaths", None)
events = getattr(response, "events", None)
cb_list[0](
state=state, deleted_paths=deleted_paths, events=events
)
cb_list[0](state=state, deleted_paths=deleted_paths)
except StopIteration:
break
Loading