Skip to content

Commit

Permalink
[ProjectSummaries] Add distinct_scheduled_jobs_pending_count, distinc…
Browse files Browse the repository at this point in the history
…t_scheduled_pipelines_pending_count (mlrun#5535)
  • Loading branch information
roei3000b committed May 15, 2024
1 parent 652c12d commit 3e18472
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 39 deletions.
4 changes: 3 additions & 1 deletion mlrun/common/schemas/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class ProjectSummary(pydantic.BaseModel):
runs_completed_recent_count: int
runs_failed_recent_count: int
runs_running_count: int
schedules_count: int
distinct_schedules_count: int
distinct_scheduled_jobs_pending_count: int
distinct_scheduled_pipelines_pending_count: int
pipelines_completed_recent_count: typing.Optional[int] = None
pipelines_failed_recent_count: typing.Optional[int] = None
pipelines_running_count: typing.Optional[int] = None
Expand Down
43 changes: 27 additions & 16 deletions server/api/crud/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import fastapi.concurrency
import humanfriendly
import mlrun_pipelines
import sqlalchemy.orm

import mlrun.common.schemas
Expand Down Expand Up @@ -295,6 +294,8 @@ async def generate_projects_summaries(
(
project_to_files_count,
project_to_schedule_count,
project_to_schedule_pending_jobs_count,
project_to_schedule_pending_workflows_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_completed_runs_count,
Expand All @@ -310,7 +311,7 @@ async def generate_projects_summaries(
mlrun.common.schemas.ProjectSummary(
name=project,
files_count=project_to_files_count.get(project, 0),
schedules_count=project_to_schedule_count.get(project, 0),
distinct_schedules_count=project_to_schedule_count.get(project, 0),
feature_sets_count=project_to_feature_set_count.get(project, 0),
models_count=project_to_models_count.get(project, 0),
runs_completed_recent_count=project_to_recent_completed_runs_count.get(
Expand All @@ -320,7 +321,7 @@ async def generate_projects_summaries(
project, 0
),
runs_running_count=project_to_running_runs_count.get(project, 0),
# project_.*_pipelines_count is a defaultdict so it will return None if using dict.get()
# the following are defaultdict so it will return None if using dict.get()
# and the key wasn't set yet, so we need to use the [] operator to get the default value of the dict
pipelines_completed_recent_count=project_to_recent_completed_pipelines_count[
project
Expand All @@ -329,10 +330,24 @@ async def generate_projects_summaries(
project
],
pipelines_running_count=project_to_running_pipelines_count[project],
distinct_scheduled_jobs_pending_count=project_to_schedule_pending_jobs_count[
project
],
distinct_scheduled_pipelines_pending_count=project_to_schedule_pending_workflows_count[
project
],
)
)
return project_summaries

@staticmethod
def _failed_statuses():
return [
mlrun.run.RunStatuses.failed,
mlrun.run.RunStatuses.error,
mlrun.run.RunStatuses.canceled,
]

async def _get_project_resources_counters(
self,
) -> tuple[
Expand All @@ -343,6 +358,8 @@ async def _get_project_resources_counters(
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, typing.Union[int, None]],
dict[str, typing.Union[int, None]],
dict[str, typing.Union[int, None]],
Expand All @@ -364,6 +381,8 @@ async def _get_project_resources_counters(
(
project_to_files_count,
project_to_schedule_count,
project_to_schedule_pending_jobs_count,
project_to_schedule_pending_workflows_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_completed_runs_count,
Expand All @@ -378,6 +397,8 @@ async def _get_project_resources_counters(
self._cache["project_resources_counters"]["result"] = (
project_to_files_count,
project_to_schedule_count,
project_to_schedule_pending_jobs_count,
project_to_schedule_pending_workflows_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_completed_runs_count,
Expand Down Expand Up @@ -455,10 +476,7 @@ async def _calculate_pipelines_counters(
project_to_recent_completed_pipelines_count[
pipeline["project"]
] += 1
elif (
pipeline["status"]
in mlrun.run.RunStatuses.failed_statuses()
):
elif pipeline["status"] in self._failed_statuses():
project_to_recent_failed_pipelines_count[
pipeline["project"]
] += 1
Expand All @@ -473,16 +491,9 @@ async def _calculate_pipelines_counters(
)
# this function should return project_to_recent_completed_pipelines_count,
# project_to_recent_failed_pipelines_count, project_to_running_pipelines_count,
# in case of exception we want to return 3 * defaultdict of None because this function
# in case of exception we want to return 3 * defaultdict because this function
# returns 3 values
return [collections.defaultdict(lambda: None)] * 3

for pipeline in pipelines:
if (
pipeline["status"]
not in mlrun_pipelines.common.models.RunStatuses.stable_statuses()
):
project_to_running_pipelines_count[pipeline["project"]] += 1
return [collections.defaultdict(lambda: 0)] * 3

return (
project_to_recent_completed_pipelines_count,
Expand Down
2 changes: 2 additions & 0 deletions server/api/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ async def get_project_resources_counters(
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, int],
]:
pass

Expand Down
41 changes: 38 additions & 3 deletions server/api/db/sqldb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,8 @@ async def get_project_resources_counters(
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, int],
dict[str, int],
]:
results = await asyncio.gather(
fastapi.concurrency.run_in_threadpool(
Expand All @@ -2261,7 +2263,11 @@ async def get_project_resources_counters(
)
(
project_to_files_count,
project_to_schedule_count,
(
project_to_schedule_count,
project_to_schedule_pending_jobs_count,
project_to_schedule_pending_workflows_count,
),
project_to_feature_set_count,
project_to_models_count,
(
Expand All @@ -2273,6 +2279,8 @@ async def get_project_resources_counters(
return (
project_to_files_count,
project_to_schedule_count,
project_to_schedule_pending_jobs_count,
project_to_schedule_pending_workflows_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_completed_runs_count,
Expand All @@ -2291,7 +2299,9 @@ def _calculate_functions_counters(self, session) -> dict[str, int]:
}
return project_to_function_count

def _calculate_schedules_counters(self, session) -> dict[str, int]:
def _calculate_schedules_counters(
self, session
) -> [dict[str, int], dict[str, int], dict[str, int]]:
schedules_count_per_project = (
session.query(Schedule.project, func.count(distinct(Schedule.name)))
.group_by(Schedule.project)
Expand All @@ -2300,7 +2310,32 @@ def _calculate_schedules_counters(self, session) -> dict[str, int]:
project_to_schedule_count = {
result[0]: result[1] for result in schedules_count_per_project
}
return project_to_schedule_count

next_day = datetime.now(timezone.utc) + timedelta(hours=24)

schedules_pending_count_per_project = (
session.query(Schedule.project, Schedule.name, Schedule.Label)
.join(Schedule.Label, Schedule.Label.parent == Schedule.id)
.filter(Schedule.next_run_time < next_day)
.filter(Schedule.next_run_time >= datetime.now(timezone.utc))
.filter(Schedule.Label.name.in_(["workflow", "kind"]))
.all()
)

project_to_schedule_pending_jobs_count = collections.defaultdict(int)
project_to_schedule_pending_workflows_count = collections.defaultdict(int)

for result in schedules_pending_count_per_project:
if result[2].to_dict()["name"] == "workflow":
project_to_schedule_pending_workflows_count[result[0]] += 1
elif result[2].to_dict()["value"] == "job":
project_to_schedule_pending_jobs_count[result[0]] += 1

return (
project_to_schedule_count,
project_to_schedule_pending_jobs_count,
project_to_schedule_pending_workflows_count,
)

def _calculate_feature_sets_counters(self, session) -> dict[str, int]:
feature_sets_count_per_project = (
Expand Down
97 changes: 81 additions & 16 deletions tests/api/api/test_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,14 @@ def test_list_and_get_project_summaries(
)

# create schedules for the project
schedules_count = 3
_create_schedules(

(
schedules_count,
distinct_scheduled_jobs_pending_count,
distinct_scheduled_pipelines_pending_count,
) = _create_schedules(
client,
project_name,
schedules_count,
)

# mock pipelines for the project
Expand All @@ -395,7 +398,7 @@ def test_list_and_get_project_summaries(
)
for index, project_summary in enumerate(project_summaries_output.project_summaries):
if project_summary.name == empty_project_name:
_assert_project_summary(project_summary, 0, 0, 0, 0, 0, 0, 0, 0)
_assert_project_summary(project_summary, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
elif project_summary.name == project_name:
_assert_project_summary(
project_summary,
Expand All @@ -406,6 +409,8 @@ def test_list_and_get_project_summaries(
recent_failed_runs_count + recent_aborted_runs_count,
running_runs_count,
schedules_count,
distinct_scheduled_jobs_pending_count,
distinct_scheduled_pipelines_pending_count,
running_pipelines_count,
)
else:
Expand All @@ -423,6 +428,8 @@ def test_list_and_get_project_summaries(
recent_failed_runs_count + recent_aborted_runs_count,
running_runs_count,
schedules_count,
distinct_scheduled_jobs_pending_count,
distinct_scheduled_pipelines_pending_count,
running_pipelines_count,
)

Expand Down Expand Up @@ -465,6 +472,8 @@ def test_list_project_summaries_different_installation_modes(
0,
0,
0,
0,
0,
)

# Enterprise installation configuration pre 3.4.0
Expand All @@ -488,6 +497,8 @@ def test_list_project_summaries_different_installation_modes(
0,
0,
0,
0,
0,
)

# Kubernetes installation configuration (mlrun-kit)
Expand All @@ -511,6 +522,8 @@ def test_list_project_summaries_different_installation_modes(
0,
0,
0,
0,
0,
)

# Docker installation configuration
Expand All @@ -534,6 +547,8 @@ def test_list_project_summaries_different_installation_modes(
0,
0,
0,
0,
0,
)


Expand Down Expand Up @@ -1579,6 +1594,8 @@ def _assert_project_summary(
runs_failed_recent_count: int,
runs_running_count: int,
schedules_count: int,
distinct_scheduled_jobs_pending_count: int,
distinct_scheduled_pipelines_pending_count: int,
pipelines_running_count: int,
):
assert project_summary.files_count == files_count
Expand All @@ -1587,7 +1604,15 @@ def _assert_project_summary(
assert project_summary.runs_completed_recent_count == runs_completed_recent_count
assert project_summary.runs_failed_recent_count == runs_failed_recent_count
assert project_summary.runs_running_count == runs_running_count
assert project_summary.schedules_count == schedules_count
assert project_summary.distinct_schedules_count == schedules_count
assert (
project_summary.distinct_scheduled_jobs_pending_count
== distinct_scheduled_jobs_pending_count
)
assert (
project_summary.distinct_scheduled_pipelines_pending_count
== distinct_scheduled_pipelines_pending_count
)
assert project_summary.pipelines_running_count == pipelines_running_count


Expand Down Expand Up @@ -1688,19 +1713,59 @@ def _create_runs(
assert response.status_code == HTTPStatus.OK.value, response.json()


def _create_schedules(client: TestClient, project_name, schedules_count):
for index in range(schedules_count):
schedule_name = f"schedule-name-{str(uuid4())}"
schedule = mlrun.common.schemas.ScheduleInput(
name=schedule_name,
kind=mlrun.common.schemas.ScheduleKinds.job,
scheduled_object={"metadata": {"name": "something"}},
cron_trigger=mlrun.common.schemas.ScheduleCronTrigger(year=1999),
def _create_schedule(
client: TestClient,
project_name,
cron_trigger: mlrun.common.schemas.ScheduleCronTrigger,
labels: dict = None,
):
if not labels:
labels = {}

schedule_name = f"schedule-name-{str(uuid4())}"
schedule = mlrun.common.schemas.ScheduleInput(
name=schedule_name,
kind=mlrun.common.schemas.ScheduleKinds.job,
scheduled_object={"metadata": {"name": "something"}},
cron_trigger=cron_trigger,
labels=labels,
)
response = client.post(f"projects/{project_name}/schedules", json=schedule.dict())
assert response.status_code == HTTPStatus.CREATED.value, response.json()


def _create_schedules(client: TestClient, project_name):
schedules_count = 3
distinct_scheduled_jobs_pending_count = 5
distinct_scheduled_pipelines_pending_count = 7

for _ in range(schedules_count):
_create_schedule(
client, project_name, mlrun.common.schemas.ScheduleCronTrigger(year=1999)
)
response = client.post(
f"projects/{project_name}/schedules", json=schedule.dict()

for _ in range(distinct_scheduled_jobs_pending_count):
_create_schedule(
client,
project_name,
mlrun.common.schemas.ScheduleCronTrigger(minute=10),
{"kind": "job"},
)
assert response.status_code == HTTPStatus.CREATED.value, response.json()

for _ in range(distinct_scheduled_pipelines_pending_count):
_create_schedule(
client,
project_name,
mlrun.common.schemas.ScheduleCronTrigger(minute=10),
{"workflow": "workflow"},
)
return (
schedules_count
+ distinct_scheduled_jobs_pending_count
+ distinct_scheduled_pipelines_pending_count,
distinct_scheduled_jobs_pending_count,
distinct_scheduled_pipelines_pending_count,
)


def _mock_pipelines(project_name):
Expand Down
Loading

0 comments on commit 3e18472

Please sign in to comment.