Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ DATABASE_NAME=baserow
# BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES=
# BASEROW_EXTRA_ALLOWED_HOSTS=
# ADDITIONAL_APPS=
# ADDITIONAL_MODULES=
Expand Down
3 changes: 3 additions & 0 deletions backend/src/baserow/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,9 @@ def __setitem__(self, key, value):
AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES", 200)
)
AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES", 60)
)

TRASH_PAGE_SIZE_LIMIT = 200 # How many trash entries can be requested at once.

Expand Down
64 changes: 32 additions & 32 deletions backend/src/baserow/contrib/automation/workflows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.contrib.auth.models import AbstractUser
from django.core.files.storage import Storage
from django.db import transaction
from django.db.models import Q, QuerySet
from django.db.models import OuterRef, Q, QuerySet, Subquery
from django.utils import timezone

from celery.canvas import Signature, chain
Expand Down Expand Up @@ -854,40 +854,42 @@ def toggle_test_run(
# except if we are updating the trigger sample data by itself
self.async_start_workflow(workflow)

def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None:
def clear_old_history(self) -> None:
"""
Clear any old history entries related to the workflow.
Clears any old history entries across all workflows.

It will delete any history entries that are older than MAX_HISTORY_DAYS and only
keep the most recent MAX_HISTORY_ENTRIES entries.

TODO: refactor this once https://github.com/baserow/baserow/pull/5166
is merged in.
It will delete any history entries that are older than MAX_HISTORY_DAYS
and only keep the most recent MAX_HISTORY_ENTRIES entries.
"""

# Delete all history entries older than max days
oldest_history_date = timezone.now() - timedelta(
days=settings.AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS
)
original_workflow.workflow_histories.exclude(
AutomationWorkflowHistory.objects.exclude(
status=HistoryStatusChoices.STARTED
).filter(started_on__lt=oldest_history_date).delete()

history_ids_to_keep = list(
original_workflow.workflow_histories.order_by("-started_on").values_list(
"id", flat=True
)[: settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES]
# Delete all history entries older than max entries
max_entries = settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES
cutoff_date = Subquery(
AutomationWorkflowHistory.objects.filter(
original_workflow_id=OuterRef("original_workflow_id")
)
.order_by("-started_on")
# A Subquery must return a single value, so we return
# the started_on of the oldest history entry from the
# latest max_entries entries.
.values("started_on")[max_entries - 1 : max_entries]
)
original_workflow.workflow_histories.exclude(
AutomationWorkflowHistory.objects.exclude(
status=HistoryStatusChoices.STARTED
).exclude(id__in=history_ids_to_keep).delete()
).filter(started_on__lt=cutoff_date).delete()

# Clean up published automations that no longer have any history entries
active_published = self.get_published_workflow(
original_workflow, with_cache=False
)
empty_published = (
Automation.objects.filter(
published_from=original_workflow,
published_from__isnull=False,
)
.exclude(workflows__cloned_workflow_histories__isnull=False)
# _ensure_published_for_run() is called to potentially create a
Expand All @@ -898,14 +900,19 @@ def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None:
created_on__gte=timezone.now() - timedelta(seconds=5),
)
)
if active_published:
empty_published = empty_published.exclude(id=active_published.automation_id)

# Exclude any automation that is currently the active published workflow
active_published_ids = list(
AutomationWorkflow.objects.filter(
state=WorkflowState.LIVE,
).values_list("automation_id", flat=True)
)
if active_published_ids:
empty_published = empty_published.exclude(id__in=active_published_ids)

empty_published.delete()

def _mark_failure_for_timed_out_history(
self, original_workflow: AutomationWorkflow
) -> None:
def mark_failure_for_timed_out_history(self) -> None:
"""
If an history entry is still not finished after a certain duration, this execution
is marked as failed.
Expand All @@ -919,7 +926,7 @@ def _mark_failure_for_timed_out_history(
error = "This workflow took too long and was timed out."

workflow_history_ids = list(
original_workflow.workflow_histories.filter(
AutomationWorkflowHistory.objects.filter(
status=HistoryStatusChoices.STARTED,
started_on__lt=max_history_date,
).values_list("id", flat=True)
Expand Down Expand Up @@ -1107,13 +1114,6 @@ def before_run(self, workflow: AutomationWorkflow) -> None:
# another execution
self.reset_workflow_temporary_states(original_workflow)

# If we have history entries that are too old it probably means something
# went wrong with Celery so we mark these entries as failed.
self._mark_failure_for_timed_out_history(original_workflow)

# We remove old history entries to avoid storing too many entries.
self._clear_old_history(original_workflow)

self._check_too_many_errors(workflow)

self._check_is_rate_limited(workflow)
Expand Down
21 changes: 21 additions & 0 deletions backend/src/baserow/contrib/automation/workflows/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from datetime import timedelta
from typing import Optional

from django.conf import settings
from django.utils import timezone

from celery.canvas import Signature
Expand Down Expand Up @@ -74,3 +76,22 @@ def handle_workflow_dispatch_done(
sender=None,
workflow_history=history,
)


@app.task(queue="automation_workflow")
def automation_periodic_cleanup():
from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler

handler = AutomationWorkflowHandler()
handler.mark_failure_for_timed_out_history()
handler.clear_old_history()


@app.on_after_finalize.connect
def setup_periodic_automation_tasks(sender, **kwargs):
sender.add_periodic_task(
timedelta(
minutes=settings.AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES
),
automation_periodic_cleanup.s(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -1403,80 +1403,6 @@ def test_toggle_simulate_mode_on_immediate(
mock_async_start_workflow.assert_called_once()


@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=7)
@pytest.mark.django_db
def test_clear_old_history_deletes_history_older_than_max_days(data_fixture):
workflow = data_fixture.create_automation_workflow()

with freeze_time("2025-02-01 12:00:00"):
old_history = data_fixture.create_automation_workflow_history(
workflow=workflow,
status=HistoryStatusChoices.SUCCESS,
)

with freeze_time("2025-02-02 12:00:00"):
recent_history = data_fixture.create_automation_workflow_history(
workflow=workflow,
status=HistoryStatusChoices.SUCCESS,
)

# This is 8 days after old_history was created, so it should be deleted.
with freeze_time("2025-02-09 12:00:00"):
AutomationWorkflowHandler()._clear_old_history(workflow)

assert workflow.workflow_histories.filter(id=old_history.id).exists() is False
assert workflow.workflow_histories.filter(id=recent_history.id).exists() is True


@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=3)
@pytest.mark.django_db
def test_clear_old_history_keeps_only_max_entries(data_fixture):
workflow = data_fixture.create_automation_workflow()

histories = []
day = 10
for i in range(5):
day += i
with freeze_time(f"2025-02-{day} 12:00:00"):
histories.append(
data_fixture.create_automation_workflow_history(
workflow=workflow,
status=HistoryStatusChoices.SUCCESS,
)
)

with freeze_time(f"2025-02-16 12:00:00"):
AutomationWorkflowHandler()._clear_old_history(workflow)

assert workflow.workflow_histories.all().count() == 3

# The two oldest should be deleted
for history in histories[:2]:
assert workflow.workflow_histories.filter(id=history.id).exists() is False

# The three newest should be kept
for history in histories[2:]:
assert workflow.workflow_histories.filter(id=history.id).exists() is True


@override_settings(
AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=3,
AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=1,
)
@pytest.mark.django_db
def test_clear_old_history_keeps_entries(data_fixture):
workflow = data_fixture.create_automation_workflow()

with freeze_time("2025-02-01 12:00:00"):
history = data_fixture.create_automation_workflow_history(workflow=workflow)

with freeze_time("2025-02-02 12:00:00"):
AutomationWorkflowHandler()._clear_old_history(workflow)

# history is within limits, so it should be kept
assert workflow.workflow_histories.filter(id=history.id).exists() is True


@pytest.mark.django_db
@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run")
@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task")
Expand Down Expand Up @@ -1755,28 +1681,23 @@ def test_async_start_workflow_unexpected_error_creates_history(

@override_settings(AUTOMATION_WORKFLOW_TIMEOUT_HOURS=1)
@pytest.mark.django_db
def test_before_run_marks_timed_out_started_history_as_failed(data_fixture):
original_workflow = data_fixture.create_automation_workflow()
published_workflow = data_fixture.create_automation_workflow(
state=WorkflowState.LIVE
)
published_workflow.automation.published_from = original_workflow
published_workflow.automation.save()
def test_mark_failure_for_timed_out_history(data_fixture):
workflow = data_fixture.create_automation_workflow()

with freeze_time("2026-03-10 10:00:00"):
with freeze_time("2026-04-16 12:00:00"):
timed_out_history = data_fixture.create_automation_workflow_history(
workflow=original_workflow,
workflow=workflow,
status=HistoryStatusChoices.STARTED,
)
node_history = AutomationNodeHistory.objects.create(
workflow_history=timed_out_history,
node=original_workflow.get_trigger(),
node=workflow.get_trigger(),
started_on=timed_out_history.started_on,
status=HistoryStatusChoices.STARTED,
)

with freeze_time("2026-03-10 12:00:00"):
AutomationWorkflowHandler().before_run(published_workflow)
with freeze_time("2026-04-16 13:00:01"):
AutomationWorkflowHandler().mark_failure_for_timed_out_history()

error_message = "This workflow took too long and was timed out."

Expand Down Expand Up @@ -1817,60 +1738,6 @@ def test_async_start_workflow_unknown_exception(
)


@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=2)
@pytest.mark.django_db
def test_clear_old_history_excludes_started_workflows_max_entries(data_fixture):
workflow = data_fixture.create_automation_workflow()

# Create three history entries
with freeze_time("2026-03-10 12:00:00"):
started_history = data_fixture.create_automation_workflow_history(
workflow=workflow, status=HistoryStatusChoices.STARTED
)

with freeze_time("2026-03-10 13:00:00"):
data_fixture.create_automation_workflow_history(
workflow=workflow, status=HistoryStatusChoices.SUCCESS
)

with freeze_time("2026-03-10 14:00:00"):
data_fixture.create_automation_workflow_history(
workflow=workflow, status=HistoryStatusChoices.SUCCESS
)

# Although max entries is 2 and the oldest history should be deleted,
# the oldest one is still kept because its status is STARTED.
with freeze_time("2026-03-10 15:00:00"):
AutomationWorkflowHandler()._clear_old_history(workflow)

assert workflow.workflow_histories.filter(id=started_history.id).exists() is True
assert workflow.workflow_histories.count() == 3


@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=1)
@pytest.mark.django_db
def test_clear_old_history_excludes_started_workflows_max_days(data_fixture):
workflow = data_fixture.create_automation_workflow()

with freeze_time("2026-03-10 12:00:00"):
history_1 = data_fixture.create_automation_workflow_history(
workflow=workflow, status=HistoryStatusChoices.STARTED
)

with freeze_time("2026-03-11 12:00:00"):
history_2 = data_fixture.create_automation_workflow_history(
workflow=workflow, status=HistoryStatusChoices.SUCCESS
)

# After 2 days, both history entries are older than MAX_DAYS, but since
# history_1 hasn't finished yet it shouldn't be deleted.
with freeze_time("2026-03-13 12:00:00"):
AutomationWorkflowHandler()._clear_old_history(workflow)

assert workflow.workflow_histories.filter(id=history_1.id).exists() is True
assert workflow.workflow_histories.filter(id=history_2.id).exists() is False


@pytest.mark.django_db
def test_ensure_published_for_run_creates_new_clone(data_fixture):
workflow = data_fixture.create_automation_workflow()
Expand Down Expand Up @@ -2006,14 +1873,14 @@ def test_clear_old_history_deletes_orphaned_automations(data_fixture):

# 12 hours later but within 1 day, so history survives
with freeze_time("2026-04-21 00:00:00"):
handler._clear_old_history(workflow)
handler.clear_old_history()

assert Automation.objects.filter(id=clone_automation_id).exists()

# 2 days later, so history should have been deleted, and the cloned
# automation should be pruned as well.
with freeze_time("2026-04-22 12:00:00"):
handler._clear_old_history(workflow)
handler.clear_old_history()

assert not Automation.objects.filter(id=clone_automation_id).exists()

Expand All @@ -2033,7 +1900,7 @@ def test_clear_old_history_keeps_live_published_automation_when_newer_test_clone
assert test_clone_workflow.automation_id > published_workflow.automation_id

with freeze_time("2026-04-27 12:01:00"):
handler._clear_old_history(workflow)
handler.clear_old_history()

assert Automation.objects.filter(id=published_workflow.automation_id).exists()
assert not Automation.objects.filter(id=test_clone_workflow.automation_id).exists()
Expand Down
Loading
Loading