Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fs bad encryption revision in sync monitor #1730

Merged
merged 3 commits into from May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/1730.bugfix.rst
@@ -0,0 +1 @@
Fix possible crash when sync occurs right after a workspace reencryption.
15 changes: 11 additions & 4 deletions parsec/core/sync_monitor.py
Expand Up @@ -11,6 +11,7 @@
from parsec.core.types import EntryID, WorkspaceRole
from parsec.core.fs import (
FSBackendOfflineError,
FSBadEncryptionRevision,
FSWorkspaceNotFoundError,
FSWorkspaceNoReadAccess,
FSWorkspaceNoWriteAccess,
Expand Down Expand Up @@ -229,8 +230,11 @@ async def tick(self) -> float:
# modifications. Hence we can forget about this change given
# it's `self._local_changes` role to keep track of local changes.
pass
except FSWorkspaceInMaintenance:
# Not the right time for the sync, retry later
except (FSWorkspaceInMaintenance, FSBadEncryptionRevision):
# Not the right time for the sync, retry later.
# `FSBadEncryptionRevision` occurs if the reencryption is quick
# enough to start and finish before we process the sharing.reencrypted
# message so we try a sync with the old encryption revision.
min_due_time = now + MAINTENANCE_MIN_WAIT
self._remote_changes.add(entry_id)

Expand All @@ -257,8 +261,11 @@ async def tick(self) -> float:
# the write access in the future) but pretent it just accured
# to avoid a busy sync loop until `read_only` flag is updated.
self._local_changes[entry_id] = LocalChange(now)
except FSWorkspaceInMaintenance:
# Not the right time for the sync, retry later
except (FSWorkspaceInMaintenance, FSBadEncryptionRevision):
# Not the right time for the sync, retry later.
# `FSBadEncryptionRevision` occurs if the reencryption is quick
# enough to start and finish before we process the sharing.reencrypted
# message so we try a sync with the old encryption revision.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix seems OK, but I'm a bit worried that we can no longer tell the difference between an acceptable race condition (accessing the realm while the new encryption key hasn't been received) and bug in the reencryption logic. Is there a way to tell those appart and maybe log the latter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, however I don't see how we could tell each other apart...
We could try to check if the local encryption revision has changed between this sync and the next one, but it feel like a great way to add more complexity (and so more bugs !) into this code.

The good thing is we send at most 1 request every MAINTENANCE_MIN_WAIT (i.e. 30s) so we are not flooding the server.

min_due_time = now + MAINTENANCE_MIN_WAIT
self._local_changes[entry_id] = LocalChange(now)

Expand Down
21 changes: 20 additions & 1 deletion tests/conftest.py
Expand Up @@ -13,6 +13,7 @@
import contextlib
import pendulum
from unittest.mock import patch
import logging
import structlog
import trio
from trio.testing import MockClock
Expand Down Expand Up @@ -141,11 +142,29 @@ def _remove_colors(msg):

def _assert_occured(self, log):
__tracebackhide__ = True
assert any([r for r in self.records if log in _remove_colors(r.msg)])
record = next((r for r in self.records if log in _remove_colors(r.msg)), None)
assert record is not None
if not hasattr(self, "asserted_records"):
self.asserted_records = set()
self.asserted_records.add(record)

LogCaptureFixture.assert_occured = _assert_occured


@pytest.fixture(autouse=True)
def no_logs_gte_error(caplog):
yield
# The test should use `caplog.assert_occured` to indicate a log was expected,
# otherwise we consider error logs as *actual* errors.
asserted_records = getattr(caplog, "asserted_records", set())
errors = [
record
for record in caplog.get_records("call")
if record.levelno >= logging.ERROR and record not in asserted_records
]
assert not errors
touilleMan marked this conversation as resolved.
Show resolved Hide resolved


def patch_pytest_trio():
# Fix while waiting for
# https://github.com/python-trio/pytest-trio/issues/77
Expand Down
5 changes: 4 additions & 1 deletion tests/core/backend_connection/test_authenticated_conn.py
Expand Up @@ -104,7 +104,7 @@ async def test_init_with_backend_offline(event_bus, alice):

@pytest.mark.trio
@pytest.mark.parametrize("during_bootstrap", (True, False))
async def test_monitor_crash(running_backend, event_bus, alice, during_bootstrap):
async def test_monitor_crash(caplog, running_backend, event_bus, alice, during_bootstrap):
async def _bad_monitor(*, task_status=trio.TASK_STATUS_IGNORED):
if during_bootstrap:
raise RuntimeError("D'oh !")
Expand All @@ -123,6 +123,9 @@ async def _bad_monitor(*, task_status=trio.TASK_STATUS_IGNORED):
{"status": BackendConnStatus.CRASHED, "status_exc": spy.ANY},
)
assert conn.status == BackendConnStatus.CRASHED
caplog.assert_occured(
"[exception] Unhandled exception [parsec.core.backend_connection.authenticated]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems a bit sensitive with the arbitrary number of spaces _______.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes totally agree, I though about improving _remove_colors to also trip the spaces, but it seems a bit complicated to do so I consider we will handle this in another PR when we will hit the issue (typically if updating structlog version breaks the tests)

)

# Test command not possible
with pytest.raises(BackendNotAvailable) as exc:
Expand Down
6 changes: 6 additions & 0 deletions tests/core/gui/test_workspaces_reencrypt.py
Expand Up @@ -292,6 +292,7 @@ def _assert_error():
)
@customize_fixtures(logged_gui_as_admin=True)
async def test_workspace_reencryption_do_one_batch_error(
caplog,
aqtbot,
running_backend,
logged_gui,
Expand Down Expand Up @@ -332,6 +333,11 @@ def _assert_error():
assert wk_button.button_reencrypt.isVisible()

await aqtbot.wait_until(_assert_error)
# Unexpected error is logged
if error_type is Exception:
caplog.assert_occured(
"[exception] Uncatched error [parsec.core.gui.trio_thread]"
)


@pytest.mark.gui
Expand Down
7 changes: 6 additions & 1 deletion tests/core/mountpoint/test_file_failures.py
Expand Up @@ -69,7 +69,7 @@ def test_empty_read_then_reopen(tmpdir, mountpoint_service):
@pytest.mark.mountpoint
@pytest.mark.skipif(sys.platform == "darwin", reason="TODO : crash on macOS")
async def test_remote_error_event(
tmpdir, monkeypatch, running_backend, alice_user_fs, bob_user_fs, monitor
tmpdir, monkeypatch, caplog, running_backend, alice_user_fs, bob_user_fs, monitor
):
wid = await create_shared_workspace("w1", bob_user_fs, alice_user_fs)

Expand Down Expand Up @@ -126,6 +126,11 @@ def _crash(*args, **kwargs):
with alice_user_fs.event_bus.listen() as spy:
with pytest.raises(OSError):
os.mkdir(str(trio_w / "dummy"))
if sys.platform == "win32":
expected_log = "[exception] Unhandled exception in winfsp mountpoint [parsec.core.mountpoint.winfsp_operations]"
else:
expected_log = "[exception] Unhandled exception in fuse mountpoint [parsec.core.mountpoint.fuse_operations]"
caplog.assert_occured(expected_log)
spy.assert_event_occured(CoreEvent.MOUNTPOINT_UNHANDLED_ERROR)

await trio.to_thread.run_sync(_testbed_online)
50 changes: 50 additions & 0 deletions tests/core/test_sync_monitor.py
Expand Up @@ -386,3 +386,53 @@ async def test_sync_monitor_while_changing_roles(
await bob_core.wait_idle_monitors()
info = await bob_workspace.path_info("/this-should-not-fail")
assert not info["need_sync"]


@pytest.mark.trio
async def test_sync_with_concurrent_reencryption(
running_backend, alice_core, bob_user_fs, autojump_clock, monkeypatch
):
# Create a shared workspace
wid = await create_shared_workspace("w", bob_user_fs, alice_core)
alice_workspace = alice_core.user_fs.get_workspace(wid)
bob_workspace = bob_user_fs.get_workspace(wid)

# Alice creates a files, let it sync
await alice_workspace.write_bytes("/test.txt", b"v1")
await alice_core.wait_idle_monitors()
await bob_user_fs.sync()

# Freeze Alice message processing so she won't process `sharing.reencrypted` messages
allow_message_processing = trio.Event()

async def _mockpoint_sleep():
await allow_message_processing.wait()

monkeypatch.setattr(
"parsec.core.messages_monitor.freeze_messages_monitor_mockpoint", _mockpoint_sleep
)

# Now Bob reencrypt the workspace
reencryption_job = await bob_user_fs.workspace_start_reencryption(wid)
await bob_user_fs.process_last_messages()
total, done = await reencryption_job.do_one_batch()
assert total == done # Sanity check to make sure the encryption is finished

# Alice modify the workspace and try to do the sync...
await alice_workspace.write_bytes("/test.txt", b"v2")
# Sync monitor will try and fail to do the sync of the workspace
await trio.sleep(300) # autojump, so not *really* 300s
assert not alice_core.are_monitors_idle()

# Now let Alice process the `sharing.reencrypted` messages, this should
# allow to do the sync
allow_message_processing.set()
with trio.fail_after(60): # autojump, so not *really* 60s
await alice_core.wait_idle_monitors()

# Just make sure the sync is done
await bob_workspace.sync()
vxgmichel marked this conversation as resolved.
Show resolved Hide resolved
for workspace in (bob_workspace, alice_workspace):
info = await workspace.path_info("/test.txt")
assert not info["need_sync"]
assert info["base_version"] == 3
4 changes: 3 additions & 1 deletion tests/test_nursery.py
Expand Up @@ -15,7 +15,7 @@ async def test_open_service_nursery_exists():


@pytest.mark.trio
async def test_open_service_nursery_multierror_collapse():
async def test_open_service_nursery_multierror_collapse(caplog):
async def _raise(exc):
raise exc

Expand All @@ -34,6 +34,8 @@ async def _raise(exc):
nursery.start_soon(_raise, RuntimeError())
await _raise(ZeroDivisionError(1, 2, 3))

caplog.assert_occured("[exception] A MultiError has been detected [parsec.utils]")

exception = ctx.value
assert isinstance(exception, ZeroDivisionError)
assert exception.args == (1, 2, 3)
Expand Down