Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a5197b4
feat: redesign Run History + row counts per model (OR-1477)
wicky-zipstack Apr 23, 2026
599c6a4
feat: per-adapter row count capture with insert/update/delete breakdown
wicky-zipstack Apr 23, 2026
8dff116
fix: exclude ephemeral models from counts, avatar trigger, color fixes
wicky-zipstack Apr 23, 2026
35e1353
fix: initialize row count variables before try block (Trino, Snowflak…
wicky-zipstack Apr 24, 2026
9aaad8e
chore: cleanup unused imports, fix JSONField search, logging level
wicky-zipstack Apr 24, 2026
7e5ecc4
feat: job switcher bar in Run History + View job config opens drawer
wicky-zipstack Apr 24, 2026
e719d6d
fix: reduce job switcher select + arrow buttons from large to middle
wicky-zipstack Apr 24, 2026
b053ed0
fix: address PR review — sparkline order, periodic_task 500, N+1 quer…
wicky-zipstack Apr 24, 2026
9cde74f
fix: prefer adapter cursor.rowcount over total table size for increme…
wicky-zipstack Apr 24, 2026
c8799f5
fix: do not expose exception details in run_stats API response
wicky-zipstack Apr 24, 2026
0f2df1a
fix: guard against cursor.rowcount returning -1 across all adapters
wicky-zipstack Apr 25, 2026
6240104
fix: resolve ESLint and Prettier formatting errors in Runhistory.jsx
wicky-zipstack Apr 25, 2026
a525d56
fix: Prettier formatting for RunHistory.css
wicky-zipstack Apr 25, 2026
147e654
fix: address Tahier's review — serializer fields, kwargs guard, run_n…
wicky-zipstack Apr 25, 2026
25470e6
fix: cron label, per-model field names, duration parser
wicky-zipstack Apr 27, 2026
923d615
fix: zero rows now shows 0 instead of — (null)
wicky-zipstack Apr 28, 2026
6d9ee7c
fix: Databricks cursor safety, Postgres fallback return, search debounce
wicky-zipstack Apr 28, 2026
7ae287c
fix: add missing useEffect dependencies (pageSize, fetchHistory, fetc…
wicky-zipstack Apr 28, 2026
85ee5d9
fix: add loading spinner to Retry button in run history table
wicky-zipstack Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions backend/backend/core/scheduler/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@

logger = logging.getLogger(__name__)


def _sum_or_none(results, attr):
"""Sum an attribute across results. Return None only if ALL values are None,
return 0 if the sum is legitimately zero (not all missing)."""
values = [getattr(r, attr, None) for r in results]
if all(v is None for v in values):
return None
return sum(v or 0 for v in values)

# Default max duration before a job is considered stuck (1 hour)
DEFAULT_STUCK_JOB_THRESHOLD_SECONDS = 3600

Expand Down Expand Up @@ -361,12 +370,22 @@ def _clean_name(raw):
"status": r.status,
"end_status": r.end_status,
"sequence": r.sequence_num,
"rows_affected": getattr(r, "rows_affected", None),
"rows_inserted": getattr(r, "rows_inserted", None),
"rows_updated": getattr(r, "rows_updated", None),
"rows_deleted": getattr(r, "rows_deleted", None),
"type": getattr(r, "materialization", "") or "",
"duration_ms": getattr(r, "duration_ms", None),
}
for r in user_results
],
"total": len(user_results),
"passed": sum(1 for r in user_results if r.end_status == "OK"),
"failed": sum(1 for r in user_results if r.end_status == "FAIL"),
"rows_processed": _sum_or_none(user_results, "rows_affected"),
"rows_added": _sum_or_none(user_results, "rows_inserted"),
"rows_modified": _sum_or_none(user_results, "rows_updated"),
"rows_deleted": _sum_or_none(user_results, "rows_deleted"),
}
except Exception:
_clear_base_result()
Expand Down
96 changes: 92 additions & 4 deletions backend/backend/core/scheduler/serializer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,105 @@
from django.contrib.auth import get_user_model
from rest_framework import serializers

from backend.core.scheduler.models import TaskRunHistory

User = get_user_model()


class TaskRunHistorySerializer(serializers.ModelSerializer):
duration = serializers.SerializerMethodField()
duration_ms = serializers.SerializerMethodField()
run_number = serializers.SerializerMethodField()
triggered_by = serializers.SerializerMethodField()
model_count = serializers.SerializerMethodField()
failed_models = serializers.SerializerMethodField()
skipped_count = serializers.SerializerMethodField()

class Meta:
model = TaskRunHistory
fields = "__all__" # Include all fields or specify fields like ['id', 'start_time', 'end_time', 'status']
fields = [
"id", "task_id", "status", "start_time", "end_time",
"trigger", "scope", "error_message", "result", "retry_num",
"user_task_detail",
"duration", "duration_ms", "run_number", "triggered_by",
"model_count", "failed_models", "skipped_count",
]

def _get_user_cache(self):
Comment on lines 17 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 kwargs dropped from serializer fields — model-scope run names always blank

The old serializer used fields = "__all__", which included kwargs. The new explicit list omits it, so r.kwargs is undefined in every API response. The frontend's Scope column reads r.kwargs?.models_override || [], which will always be []. For any run with scope === "model", the name(s) of the targeted models are never shown — the cell renders an empty string instead.

Add "kwargs" back to the fields list:

Suggested change
class Meta:
model = TaskRunHistory
fields = "__all__" # Include all fields or specify fields like ['id', 'start_time', 'end_time', 'status']
fields = [
"id", "task_id", "status", "start_time", "end_time",
"trigger", "scope", "error_message", "result", "retry_num",
"user_task_detail",
"duration", "duration_ms", "run_number", "triggered_by",
"model_count", "failed_models", "skipped_count",
]
def _get_user_cache(self):
class Meta:
model = TaskRunHistory
fields = [
"id", "task_id", "status", "start_time", "end_time",
"trigger", "scope", "error_message", "result", "retry_num",
"user_task_detail", "kwargs",
"duration", "duration_ms", "run_number", "triggered_by",
"model_count", "failed_models", "skipped_count",
]
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/core/scheduler/serializer.py
Line: 17-28

Comment:
**`kwargs` dropped from serializer fields — model-scope run names always blank**

The old serializer used `fields = "__all__"`, which included `kwargs`. The new explicit list omits it, so `r.kwargs` is `undefined` in every API response. The frontend's Scope column reads `r.kwargs?.models_override || []`, which will always be `[]`. For any run with `scope === "model"`, the name(s) of the targeted models are never shown — the cell renders an empty string instead.

Add `"kwargs"` back to the fields list:

```suggestion
    class Meta:
        model = TaskRunHistory
        fields = [
            "id", "task_id", "status", "start_time", "end_time",
            "trigger", "scope", "error_message", "result", "retry_num",
            "user_task_detail", "kwargs",
            "duration", "duration_ms", "run_number", "triggered_by",
            "model_count", "failed_models", "skipped_count",
        ]
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

"""Batch-load users for all runs in one query, cached per serializer instance."""
if not hasattr(self, "_user_cache"):
user_ids = set()
for obj in self.instance if hasattr(self.instance, '__iter__') else [self.instance]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — obj.kwargs.get(...) will AttributeError on non-dict values

TaskRunHistory.kwargs is a JSONField, so it can hold None, a dict, a list, a string, or a number. The obj.kwargs and obj.kwargs.get(...) short-circuits None, but on any non-dict truthy value (e.g. a legacy row that stored kwargs as a JSON-string) this raises AttributeError, which the view's outer except Exception turns into a 500.

if obj and isinstance(obj.kwargs, dict) and obj.kwargs.get("user_id"):
    user_ids.add(obj.kwargs["user_id"])

Same guard belongs in get_triggered_by below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 147e654 — added isinstance(obj.kwargs, dict) guard in both _get_user_cache and get_triggered_by. Non-dict truthy values (strings, lists) now safely return None.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 147e654 — added isinstance(obj.kwargs, dict) guard in both _get_user_cache and get_triggered_by. Non-dict truthy values now safely return None.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 147e654 — added isinstance(obj.kwargs, dict) guard in both _get_user_cache and get_triggered_by.

if obj and isinstance(obj.kwargs, dict) and obj.kwargs.get("user_id"):
user_ids.add(obj.kwargs["user_id"])
if user_ids:
self._user_cache = {
str(u.id): u for u in User.objects.filter(id__in=user_ids)
}
else:
self._user_cache = {}
return self._user_cache

def get_duration(self, obj):
"""Calculate duration (end_time - start_time)"""
"""Human-readable duration string."""
if obj.start_time and obj.end_time:
return str(obj.end_time - obj.start_time) # Convert timedelta to string
return None # If end_time is missing, return None
delta = obj.end_time - obj.start_time
total_ms = int(delta.total_seconds() * 1000)
if total_ms < 1000:
return f"{total_ms}ms"
elif total_ms < 60000:
return f"{total_ms / 1000:.1f}s"
else:
minutes = total_ms // 60000
seconds = (total_ms % 60000) / 1000
return f"{minutes}m {seconds:.0f}s"
return None

def get_duration_ms(self, obj):
"""Duration in milliseconds for sorting/comparison."""
if obj.start_time and obj.end_time:
return int((obj.end_time - obj.start_time).total_seconds() * 1000)
return None

def get_run_number(self, obj):
"""Sequential run number from view context (total - offset - idx)."""
run_numbers = self.context.get("run_numbers", {})
return run_numbers.get(obj.id, 0)

def get_triggered_by(self, obj):
"""Resolve user_id from kwargs to username using batch-loaded cache."""
if not isinstance(obj.kwargs, dict):
return None
user_id = obj.kwargs.get("user_id")
if not user_id:
return None
cache = self._get_user_cache()
user = cache.get(str(user_id))
if user:
return {
"id": str(user.id),
"username": user.get_full_name() or user.username or user.email,
}
return {"id": str(user_id), "username": "Unknown user"}

Comment thread
wicky-zipstack marked this conversation as resolved.
def get_model_count(self, obj):
"""Total model count from result."""
if obj.result and isinstance(obj.result, dict):
return obj.result.get("total", 0)
return 0

def get_failed_models(self, obj):
"""List of failed model names."""
if obj.result and isinstance(obj.result, dict):
models = obj.result.get("models", [])
return [m["name"] for m in models if m.get("end_status") == "FAIL" or m.get("status") == "failure"]
return []

def get_skipped_count(self, obj):
"""Count of skipped models (total - passed - failed)."""
if obj.result and isinstance(obj.result, dict):
total = obj.result.get("total", 0)
passed = obj.result.get("passed", 0)
failed = obj.result.get("failed", 0)
return max(0, total - passed - failed)
return 0
2 changes: 2 additions & 0 deletions backend/backend/core/scheduler/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
delete_periodic_task,
update_periodic_task,
task_run_history,
run_stats,
trigger_task_once,
trigger_task_once_for_model,
list_deploy_candidates,
Expand All @@ -30,6 +31,7 @@
name="get_periodic_task",
),
path("/run-history/<int:user_task_id>", task_run_history, name="task_run_history"),
path("/run-stats/<int:user_task_id>", run_stats, name="run_stats"),
path(
"/trigger-periodic-task/<int:user_task_id>",
trigger_task_once,
Expand Down
139 changes: 135 additions & 4 deletions backend/backend/core/scheduler/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import timedelta

from django.utils import timezone
from django.utils.dateparse import parse_datetime
from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
Expand Down Expand Up @@ -583,6 +584,112 @@ def delete_periodic_task(request, project_id, task_id):
)


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def run_stats(request, project_id, user_task_id):
"""Get aggregated run statistics for a job — stats cards data."""
try:
query = {"id": user_task_id}
if _is_valid_project_id(project_id):
query["project__project_uuid"] = project_id
Comment thread
wicky-zipstack marked this conversation as resolved.
task = UserTaskDetails.objects.get(**query)
runs = TaskRunHistory.objects.filter(user_task_detail=task)

now = timezone.now()
last_7d = now - timedelta(days=7)
last_24h = now - timedelta(hours=24)
prev_24h_start = now - timedelta(hours=48)

# Success rate (7 days) — only count completed runs in denominator
runs_7d = runs.filter(start_time__gte=last_7d)
completed_7d = runs_7d.filter(status__in=["SUCCESS", "FAILURE"])
total_7d = completed_7d.count()
success_7d = completed_7d.filter(status="SUCCESS").count()
success_rate = round((success_7d / total_7d * 100), 1) if total_7d > 0 else None

# Average duration (successful runs, 7 days)
successful_runs_7d = runs_7d.filter(status="SUCCESS", start_time__isnull=False, end_time__isnull=False)
avg_duration_ms = None
if successful_runs_7d.exists():
durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in successful_runs_7d]
avg_duration_ms = int(sum(durations) / len(durations))

# Failures (24h) + comparison with previous 24h
failures_24h = runs.filter(start_time__gte=last_24h, status="FAILURE").count()
failures_prev_24h = runs.filter(
start_time__gte=prev_24h_start, start_time__lt=last_24h, status="FAILURE"
).count()

# Last successful run
last_success = runs.filter(status="SUCCESS").order_by("-end_time").first()
last_success_time = last_success.end_time if last_success else None

# Expected duration (avg of last 5 successful runs)
recent_successes = runs.filter(
status="SUCCESS", start_time__isnull=False, end_time__isnull=False
).order_by("-end_time")[:5]
expected_duration_ms = None
if recent_successes.exists():
durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in recent_successes]
expected_duration_ms = int(sum(durations) / len(durations))

# Duration trend (last 10 completed runs for sparkline)
recent_runs = list(runs.filter(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must Fix: run_stats materializes all successful runs into Python to compute averages

This loads every successful run in 7 days as Python objects just to compute an average duration:

durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in successful_runs_7d]
avg_duration_ms = int(sum(durations) / len(durations))

For a job running every 5 minutes, that's ~2000 rows into memory. Same pattern for expected_duration_ms (last 5) and duration_trend (last 10).

Fix — use Django aggregation for avg_duration:

from django.db.models import Avg, F, ExpressionWrapper, DurationField

avg_result = successful_runs_7d.annotate(
    dur=ExpressionWrapper(F('end_time') - F('start_time'), output_field=DurationField())
).aggregate(avg=Avg('dur'))
avg_duration_ms = int(avg_result['avg'].total_seconds() * 1000) if avg_result['avg'] else None

Single query, no Python materialization. The duration_trend (last 10) and expected_duration_ms (last 5) are fine — small N, not worth optimizing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid. Same feedback as Tahier — will refactor to Django ORM Avg(ExpressionWrapper(F('end_time') - F('start_time'))) in a follow-up optimization pass. Current scale (most jobs <1k runs) works fine.

start_time__isnull=False, end_time__isnull=False
).order_by("-end_time")[:10])
recent_runs.reverse() # chronological order for sparkline
duration_trend = [
int((r.end_time - r.start_time).total_seconds() * 1000) for r in recent_runs
]

# Schedule info
schedule_type = None
schedule_label = None
periodic = None
try:
periodic = task.periodic_task
if periodic:
if periodic.crontab:
schedule_type = "cron"
c = periodic.crontab
schedule_label = f"{c.minute} {c.hour} {c.day_of_month} {c.month_of_year} {c.day_of_week}"
elif periodic.interval:
Comment thread
greptile-apps[bot] marked this conversation as resolved.
schedule_type = "interval"
schedule_label = f"Every {periodic.interval.every} {periodic.interval.period}"
except Exception:
periodic = None

return Response({
"success": True,
"data": {
"success_rate_7d": success_rate,
"success_count_7d": success_7d,
"total_count_7d": total_7d,
"avg_duration_ms": avg_duration_ms,
"failures_24h": failures_24h,
"failures_prev_24h": failures_prev_24h,
"failures_change": failures_24h - failures_prev_24h,
"last_successful_run": last_success_time,
"expected_duration_ms": expected_duration_ms,
"duration_trend": duration_trend,
"total_runs": runs.count(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — total_runs does an unbounded COUNT(*) for an unused field

"total_runs": runs.count(),

For a job that's been running every 5 minutes for a year, this is a 100k+ row COUNT on every page load. A grep of the frontend confirms stats.total_runs is never read anywhere — the field is dead weight. Either remove it, or scope it to the same 7d window the rest of the stats use.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is used by the FE for run_number computation (total - offset - idx). Can't remove it. For high-volume jobs this could be optimized with a cached count, but for current scale it's acceptable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is needed — the FE uses it for run_number computation (total - offset - idx). Can't remove it without breaking run numbering.

"job_name": task.task_name,
"environment": {
"name": task.environment.environment_name if task.environment else None,
"type": task.environment.deployment_type if task.environment else None,
},
"schedule_type": schedule_type,
"schedule_label": schedule_label,
"schedule_enabled": periodic.enabled if periodic else False,
},
Comment thread
greptile-apps[bot] marked this conversation as resolved.
}, status=status.HTTP_200_OK)
except UserTaskDetails.DoesNotExist:
return Response({"error": "Task not found"}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error getting run stats: {e}", exc_info=True)
return Response({"error": "Internal server error"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def task_run_history(request, project_id, user_task_id):
Expand All @@ -600,18 +707,40 @@ def task_run_history(request, project_id, user_task_id):
trigger_filter = request.GET.get("trigger")
scope_filter = request.GET.get("scope")
status_filter = request.GET.get("status")
date_from = request.GET.get("date_from")
date_to = request.GET.get("date_to")
search = request.GET.get("search")

if trigger_filter:
runs = runs.filter(trigger=trigger_filter)
if scope_filter:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — scope_filter is dead code from this UI

scope_filter = request.GET.get("scope")
...
if scope_filter:
    runs = runs.filter(scope=scope_filter)

The redesigned filter bar dropped the Scope select, so the frontend no longer sends scope. The backend still accepts the param — fine for back-compat, but it's now dead from this UI's perspective. Either remove it or document why it's still here (e.g., third-party API consumers).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept intentionally for backward compatibility — the BE still supports the scope filter if any other client sends it. Harmless dead path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept for backward compatibility — harmless dead path, BE still supports it if other clients send it.

runs = runs.filter(scope=scope_filter)
if status_filter:
runs = runs.filter(status=status_filter)
if date_from:
dt = parse_datetime(date_from)
if dt:
runs = runs.filter(start_time__gte=dt)
if date_to:
dt = parse_datetime(date_to)
if dt:
runs = runs.filter(start_time__lte=dt)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
if search:
runs = runs.filter(error_message__icontains=search)

runs = runs.order_by("-start_time")
total = runs.count()

offset = (page - 1) * limit
serializer = TaskRunHistorySerializer(runs[offset : offset + limit], many=True)
page_qs = runs[offset : offset + limit]
# Compute run numbers from total and offset — no extra query needed
run_numbers = {
run.id: total - offset - idx
for idx, run in enumerate(page_qs)
}
Comment on lines 732 to +740
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 run_number is wrong whenever any filter is active

total = runs.count() is evaluated after all filters (status, trigger, date range, search) are applied, so it reflects the filtered record count. The formula total - offset - idx therefore assigns numbers relative to the filtered result set — not the job's absolute chronological position.

For example, if a job has 100 runs and the user filters by status=FAILURE to see only 5 failures, those runs will display as #5, #4, #3, #2, #1 instead of their real run numbers (e.g., #97, #84, #71…).

A correct approach is to derive the absolute run number using the unfiltered total:

unfiltered_total = TaskRunHistory.objects.filter(user_task_detail=task).count()
run_numbers = {
    run.id: unfiltered_total - offset - idx
    for idx, run in enumerate(page_qs)
}

Note: this is still only exact when no filters are active and runs are ordered by start_time descending. For filtered views the numbers will still be approximated unless a window function is used.

Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/core/scheduler/views.py
Line: 732-740

Comment:
**`run_number` is wrong whenever any filter is active**

`total = runs.count()` is evaluated after all filters (status, trigger, date range, search) are applied, so it reflects the filtered record count. The formula `total - offset - idx` therefore assigns numbers relative to the filtered result set — not the job's absolute chronological position.

For example, if a job has 100 runs and the user filters by `status=FAILURE` to see only 5 failures, those runs will display as #5, #4, #3, #2, #1 instead of their real run numbers (e.g., #97, #84, #71…).

A correct approach is to derive the absolute run number using the unfiltered total:
```python
unfiltered_total = TaskRunHistory.objects.filter(user_task_detail=task).count()
run_numbers = {
    run.id: unfiltered_total - offset - idx
    for idx, run in enumerate(page_qs)
}
```
Note: this is still only exact when no filters are active and runs are ordered by `start_time` descending. For filtered views the numbers will still be approximated unless a window function is used.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

serializer = TaskRunHistorySerializer(
page_qs, many=True, context={"run_numbers": run_numbers}
)

return Response(
{
Expand All @@ -620,6 +749,7 @@ def task_run_history(request, project_id, user_task_id):
"page_items": {
"id": task.id,
"job_name": task.task_name,
"project_id": str(task.project.project_uuid) if task.project else None,
"env_type": task.environment.deployment_type
if task.environment
else None,
Expand Down Expand Up @@ -705,9 +835,10 @@ def trigger_task_once(request, project_id, user_task_id):
synchronous (in-process) execution so local dev works without Redis.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must Fix: _is_valid_project_id bypass weakens authorization

When project_id is _all, the query drops the project filter entirely:

query = {"id": user_task_id}
if _is_valid_project_id(project_id):
    query["project__project_uuid"] = project_id
task = UserTaskDetails.objects.get(**query)

This means any authenticated user can trigger any job by ID regardless of project ownership. If user A doesn't have access to project B's jobs, they can still trigger them by knowing the task ID and passing _all as project_id.

Fix: When project_id is _all, filter by the user's accessible projects:

if _is_valid_project_id(project_id):
    query["project__project_uuid"] = project_id
else:
    # _all: restrict to projects the user has access to
    user_projects = ProjectDetails.objects.filter(
        organization_id=request.user.organization_id
    ).values_list('project_uuid', flat=True)
    query["project__project_uuid__in"] = user_projects

Or at minimum, verify the user has access to the task's project after fetching it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already mitigated — UserTaskDetails uses DefaultOrganizationManagerMixin which auto-filters by organization=get_organization() at the manager level. The _all shortcut only skips project-level filter (intended for cross-project job switcher view). Cross-org access is not possible.

"""
try:
task = UserTaskDetails.objects.get(
id=user_task_id, project__project_uuid=project_id
)
query = {"id": user_task_id}
if _is_valid_project_id(project_id):
query["project__project_uuid"] = project_id
task = UserTaskDetails.objects.get(**query)
except UserTaskDetails.DoesNotExist:
return Response(
{"error": "Task not found"}, status=status.HTTP_404_NOT_FOUND
Expand Down
4 changes: 2 additions & 2 deletions backend/visitran/adapters/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def db_scd(self) -> BaseSCD:
def db_reader(self) -> BaseDBReader:
return self._db_reader

def run_model(self, visitran_model: VisitranModel) -> None:
def run_model(self, visitran_model: VisitranModel):
self.load_model(model=visitran_model)
fire_event(MaterializationType(materialization=str(visitran_model.materialization)))
self.db_model.execute()
return self.db_model.execute()

def run_seeds(self, schema: str, abs_path: str) -> None:
seed_obj = self.load_seed(schema, abs_path)
Expand Down
12 changes: 3 additions & 9 deletions backend/visitran/adapters/bigquery/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,9 @@ def merge_into_table(
target_table_name: str,
select_statement: Table,
primary_key: Union[str, list[str]] = None,
) -> None:
) -> dict:
"""Efficient upsert using DELETE + INSERT for BigQuery.

This approach is more efficient than MERGE for BigQuery because:
1. BigQuery is optimized for bulk operations
2. DELETE + INSERT performs better than UPDATE operations
3. Works better with BigQuery's partitioning strategy

Args:
primary_key: Can be a single column name (str) or list of column names for composite keys
Returns dict with rows_affected.
"""
try:
fire_event(
Expand Down Expand Up @@ -378,6 +371,7 @@ def merge_into_table(
raise Exception(
f"BigQuery incremental upsert failed for {schema_name}.{target_table_name}: {str(e)}"
) from e
return {"rows_affected": None} # BigQuery: fallback to get_table_row_count in BaseModel



Expand Down
Loading
Loading