Skip to content

Commit

Permalink
fix: Handle feature import processing during import (#3305)
Browse files Browse the repository at this point in the history
  • Loading branch information
zachaysan committed Jan 18, 2024
1 parent 03c8406 commit 28459c5
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
8 changes: 8 additions & 0 deletions api/features/import_export/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class FeatureImportUploadSerializer(serializers.Serializer):
strategy = serializers.ChoiceField(choices=[SKIP, OVERWRITE_DESTRUCTIVE])

def save(self, environment_id: int) -> FeatureImport:
if FeatureImport.objects.filter(
environment_id=environment_id,
status=PROCESSING,
).exists():
raise ValidationError(
"Can't import features, since already processing a feature import."
)

uploaded_file = self.validated_data["file"]
strategy = self.validated_data["strategy"]
file_content = uploaded_file.read().decode("utf-8")
Expand Down
25 changes: 25 additions & 0 deletions api/features/import_export/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ def clear_stale_feature_imports_and_exports() -> None:
FeatureImport.objects.filter(created_at__lt=two_weeks_ago).delete()


@register_recurring_task(
run_every=timedelta(minutes=10),
)
def retire_stalled_feature_imports_and_exports() -> None:
ten_minutes_ago = timezone.now() - timedelta(minutes=10)

feature_exports = []
for feature_export in FeatureExport.objects.filter(
created_at__lt=ten_minutes_ago,
status=PROCESSING,
):
feature_export.status = FAILED
feature_exports.append(feature_export)
FeatureExport.objects.bulk_update(feature_exports, ["status"])

feature_imports = []
for feature_import in FeatureImport.objects.filter(
created_at__lt=ten_minutes_ago,
status=PROCESSING,
):
feature_import.status = FAILED
feature_imports.append(feature_import)
FeatureImport.objects.bulk_update(feature_imports, ["status"])


def _export_features_for_environment(
feature_export: FeatureExport, tag_ids: Optional[list[int]]
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from environments.models import Environment
from features.feature_types import MULTIVARIATE, STANDARD
from features.import_export.constants import (
FAILED,
OVERWRITE_DESTRUCTIVE,
PROCESSING,
SKIP,
Expand All @@ -25,6 +26,7 @@
clear_stale_feature_imports_and_exports,
export_features_for_environment,
import_features_for_environment,
retire_stalled_feature_imports_and_exports,
)
from features.models import Feature, FeatureSegment, FeatureState
from features.multivariate.models import MultivariateFeatureOption
Expand Down Expand Up @@ -72,6 +74,50 @@ def test_clear_stale_feature_imports_and_exports(
kept_feature_export.refresh_from_db()


def test_retire_stalled_feature_imports_and_exports(
db: None, environment: Environment, freezer: FrozenDateTimeFactory
):
# Given
now = timezone.now()
freezer.move_to(now - timedelta(minutes=12))
to_fail_feature_export = FeatureExport.objects.create(
data="{}",
environment=environment,
status=PROCESSING,
)
to_fail_feature_import = FeatureImport.objects.create(
data="{}",
environment=environment,
status=PROCESSING,
)

freezer.move_to(now)
keep_processing_feature_export = FeatureExport.objects.create(
data="{}",
environment=environment,
status=PROCESSING,
)
keep_processing_feature_import = FeatureImport.objects.create(
data="{}",
environment=environment,
status=PROCESSING,
)

# When
retire_stalled_feature_imports_and_exports()

# Then
to_fail_feature_import.refresh_from_db()
to_fail_feature_export.refresh_from_db()
keep_processing_feature_import.refresh_from_db()
keep_processing_feature_export.refresh_from_db()

assert to_fail_feature_import.status == FAILED
assert to_fail_feature_export.status == FAILED
assert keep_processing_feature_import.status == PROCESSING
assert keep_processing_feature_export.status == PROCESSING


def test_export_and_import_features_for_environment_with_skip(
db: None,
environment: Environment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,40 @@ def test_feature_import(
assert feature_import.strategy == OVERWRITE_DESTRUCTIVE


def test_feature_import_already_processing(
admin_client: APIClient,
environment: Environment,
) -> None:
# Given
assert FeatureImport.objects.count() == 0

# Create a FeatureImport that's processing already.
FeatureImport.objects.create(
environment=environment,
strategy=OVERWRITE_DESTRUCTIVE,
status=PROCESSING,
data="{}",
)

url = reverse(
"api-v1:features:feature-import",
args=[environment.id],
)

file_data = b"[]"
uploaded_file = SimpleUploadedFile("test.23.json", file_data)
data = {"file": uploaded_file, "strategy": OVERWRITE_DESTRUCTIVE}

# When
response = admin_client.post(url, data=data, format="multipart")

# Then
assert response.status_code == 400
assert response.json() == [
"Can't import features, since already processing a feature import."
]


def test_feature_import_unauthorized(
staff_client: APIClient,
environment: Environment,
Expand Down

3 comments on commit 28459c5

@vercel
Copy link

@vercel vercel bot commented on 28459c5 Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 28459c5 Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

docs – ./docs

docs.flagsmith.com
docs-flagsmith.vercel.app
docs-git-main-flagsmith.vercel.app
docs.bullet-train.io

@vercel
Copy link

@vercel vercel bot commented on 28459c5 Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.