Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 56 additions & 47 deletions comet/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from . import __version__
from .manager import Manager, TIMESTAMP_FORMAT
from .exception import CometError
from .exception import CometError, DatasetNotFoundError, StateNotFoundError


REQUESTED_STATE_TIMEOUT = 35
Expand Down Expand Up @@ -269,8 +269,9 @@ async def register_state(request):

# Lock states and check if the received state is already known.
async with lock_states:
state = await get_state(hash, wait=False)
if state is None:
try:
await get_state(hash, wait=False)
except StateNotFoundError:
# we don't know this state, did we request it already?
# After REQUEST_STATE_TIMEOUT we request it again.
request_time = await redis.execute("hget", "requested_states", hash)
Expand Down Expand Up @@ -326,8 +327,16 @@ async def send_state(request):

# Lock states and check if we know this state already.
async with lock_states:
found = await get_state(hash, wait=False)
if found is not None:
try:
found = await get_state(hash, wait=False)
except StateNotFoundError:
await redis.execute("hset", "states", hash, json.dumps(state))
reply["result"] = "success"
archive_state = True

# Notify anything waiting for this state to arrive
signal_created(hash, "state", lock_states, waiting_states)
else:
# if we know it already, does it differ?
if found != state:
reply["result"] = (
Expand All @@ -338,13 +347,6 @@ async def send_state(request):
logger.warning("send-state: {}".format(reply["result"]))
else:
reply["result"] = "success"
else:
await redis.execute("hset", "states", hash, json.dumps(state))
reply["result"] = "success"
archive_state = True

# Notify anything waiting for this state to arrive
signal_created(hash, "state", lock_states, waiting_states)

# Remove it from the set of requested states (if it's in there.)
try:
Expand Down Expand Up @@ -409,8 +411,19 @@ async def register_dataset(request):

# Lack datasets and check if dataset already known.
async with lock_datasets:
found = await get_dataset(hash, wait=False)
if found is not None:
try:
found = await get_dataset(hash, wait=False)
except DatasetNotFoundError:
if dataset_valid and root is not None:
# save the dataset
await redis.execute("hset", "datasets", hash, json.dumps(ds))

reply["result"] = "success"
archive_ds = True

# Notify anything waiting for this dataset to arrive
signal_created(hash, "dataset", lock_datasets, waiting_datasets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should happen if you get DatasetNotFoundError but you don't satisfy the if clause above? I appreciate this isn't covered in the original code either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result gets set to "dataset invalid", this branch gets skipped and the function returns. I agree it's not too readable.

else:
# if we know it already, does it differ?
if found != ds:
reply["result"] = (
Expand All @@ -420,15 +433,6 @@ async def register_dataset(request):
logger.warning("register-dataset: {}".format(reply["result"]))
else:
reply["result"] = "success"
elif dataset_valid and root is not None:
# save the dataset
await redis.execute("hset", "datasets", hash, json.dumps(ds))

reply["result"] = "success"
archive_ds = True

# Notify anything waiting for this dataset to arrive
signal_created(hash, "dataset", lock_datasets, waiting_datasets)

if archive_ds:
await asyncio.shield(archive("dataset", request.json))
Expand All @@ -453,10 +457,10 @@ async def register_dataset(request):
async def find_root(hash, ds):
"""Return the dataset Id of the root of this dataset."""
while not ds["is_root"]:
hash = ds["base_dset"]
try:
hash = ds["base_dset"]
ds = await get_dataset(hash)
except TimeoutError as err:
except DatasetNotFoundError as err:
logger.error("find_root: dataset {} not found: {}".format(hash, err))
return None
return hash
Expand All @@ -472,15 +476,15 @@ async def check_dataset(ds):
logger.debug("check_dataset: Checking dataset: {}".format(ds))
try:
await get_state(ds["state"])
except TimeoutError as err:
except StateNotFoundError as err:
logger.debug("check_dataset: State of dataset {} unknown: {}".format(ds, err))
return False
if ds["is_root"]:
logger.debug("check_dataset: dataset {} OK".format(ds))
return True
try:
await get_dataset(ds["base_dset"])
except TimeoutError as err:
except DatasetNotFoundError as err:
logger.debug(
"check_dataset: Base dataset of dataset {} unknown: {}".format(ds, err)
)
Expand Down Expand Up @@ -510,9 +514,10 @@ async def request_state(request):
logger.debug("request-state: waiting for state ID {}".format(id))
try:
reply["state"] = await get_state(id)
except TimeoutError as err:
reply["result"] = "state ID {} unknown to broker.".format(id)
logger.info("request-state: State {} unknown to broker: {}".format(id, err))
except StateNotFoundError as err:
msg = "request-state: State {} unknown to broker: {}".format(id, err)
reply["result"] = msg
logger.info(msg)
return response.json(reply)
logger.debug("request-state: found state ID {}".format(id))

Expand Down Expand Up @@ -629,7 +634,7 @@ async def wait_for_x(id, name, lock, redis_hash, event_dict):
)


@alru_cache(maxsize=10000)
@alru_cache(maxsize=10000, cache_exceptions=False)
async def get_dataset(ds_id, wait=True):
"""
Get a dataset by ID from redis (LRU cached).
Expand All @@ -644,25 +649,28 @@ async def get_dataset(ds_id, wait=True):
Returns
-------
Dataset
The dataset from cache or redis. `None` if the dataset doesn't exist and wait was `False`.
The dataset from cache or redis.

Raises
------
TimeoutError
If waiting for the dataset timed out.
DatasetNotFoundError
If the dataset doesn't exist or waiting for the dataset timed out.

"""
if wait:
# Check if existing and wait if not
found = await wait_for_dset(ds_id)
if not found:
raise TimeoutError("Dataset {} not found: Timeout.".format(ds_id))
raise DatasetNotFoundError("Dataset {} not found: Timeout.".format(ds_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason you would still need to differentiate between a time out and the dataset actually not being found?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for debugging.


# Get it from redis
ds = await redis.execute("hget", "datasets", ds_id)
return json.loads(ds) if ds is not None else None
if ds is None:
raise DatasetNotFoundError("Dataset {} unknown to broker.".format(ds_id))
return json.loads(ds)


@alru_cache(maxsize=1000)
@alru_cache(maxsize=1000, cache_exceptions=False)
async def get_state(state_id, wait=True):
"""
Get a state by ID from redis (LRU cached).
Expand All @@ -677,22 +685,24 @@ async def get_state(state_id, wait=True):
Returns
-------
State
The state from cache or redis. `None` if the state doesn't exist and wait was `False`.
The state from cache or redis.

Raises
------
TimeoutError
If waiting for the state timed out.
StateNotFoundError
If the state doesn't exist or waiting for the state timed out.
"""
if wait:
# Check if existing and wait if not
found = await wait_for_state(state_id)
if not found:
raise TimeoutError("State {} not found: Timeout.".format(state_id))
raise StateNotFoundError("State {} not found: Timeout.".format(state_id))

# Get it from redis
state = await redis.execute("hget", "states", state_id)
return json.loads(state) if state is not None else None
if state is None:
raise StateNotFoundError("State {} unknown to broker.".format(state_id))
return json.loads(state)


@app.route("/update-datasets", methods=["POST"])
Expand Down Expand Up @@ -721,11 +731,10 @@ async def update_datasets(request):
while True:
try:
ds = await get_dataset(ds_id)
except TimeoutError:
reply[
"result"
] = "update-datasets: Dataset ID {} unknown to broker.".format(ds_id)
logger.info("update-datasets: Dataset ID {} unknown.".format(ds_id))
except DatasetNotFoundError as err:
msg = "update-datasets: {}.".format(err)
reply["result"] = msg
logger.info(msg)
return response.json(reply)
reply["datasets"][ds_id] = ds
if ds["is_root"]:
Expand Down
2 changes: 1 addition & 1 deletion comet/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Dataset:
Type of the dataset state of this dataset.
dataset_id : str
(optional) ID (hash) of this dataset. If not supplied, it's generated internally.
base_dataset_id : str
base_dataset_id : str or None
ID of the base dataset or `None` if this is a root dataset (default `None`).
is_root : bool
`True`, if this is a root dataset (default `False`).
Expand Down
12 changes: 12 additions & 0 deletions comet/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,15 @@ class BrokerError(CometError):
"""There was an error registering states or datasets with the broker."""

pass


class DatasetNotFoundError(CometError):
"""The requested dataset was not found."""

pass


class StateNotFoundError(CometError):
"""The requested state was not found."""

pass
2 changes: 1 addition & 1 deletion tests/kotetest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -e

git clone https://github.com/kotekan/kotekan.git --branch develop --single-branch
cd kotekan/build
cmake -DBOOST_TESTS=ON ..
cmake -DWITH_TESTS=ON ..
make dataset_broker_producer dataset_broker_producer2 dataset_broker_consumer

more /proc/cpuinfo | grep flags
Expand Down
33 changes: 33 additions & 0 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from comet import Manager, BrokerError, State, Dataset
from comet.hash import hash_dictionary
from comet.manager import REGISTER_DATASET

import chimedb.dataset
import chimedb.core

Expand Down Expand Up @@ -357,3 +359,34 @@ def test_register_start_dataset_automated(manager_and_dataset, broker):
assert config_state.data == CONFIG
assert start_state.data["version"] == version
assert datetime.strptime(start_state.data["time"], "%Y-%m-%d-%H:%M:%S.%f") == now


def test_lru_cache(broker, manager_low_timeout):
"""Test the dataset cache doesn't cache unknown datasets as None."""

ds_id = "doesntexist"

# Request an unknown dataset
with pytest.raises(BrokerError):
manager_low_timeout.get_dataset(ds_id)
state = manager_low_timeout.register_state(data={"foo": "bar"}, state_type="test")
assert state is not None
assert isinstance(state, State)

# Now register it (manually, to set the ID)
ds = Dataset(
state_id=state.id,
state_type="test_lru_cache",
dataset_id=ds_id,
base_dataset_id=None,
is_root=True,
)
request = {"hash": ds_id, "ds": ds.to_dict()}
result = manager_low_timeout._send(REGISTER_DATASET, request)
assert result is not None
assert result == {"result": "success"}

# Request again, it should exist now.
same_ds = manager_low_timeout.get_dataset(ds_id)
assert same_ds is not None
assert same_ds.to_dict() == ds.to_dict()