Skip to content

Commit

Permalink
K8s executor filter down to dagster events (#7000)
Browse files Browse the repository at this point in the history
<!--- Hello Dagster contributor! It's great to have you with us! -->
<!-- Make sure to read https://docs.dagster.io/community/contributing -->

## Summary
<!-- Describe your changes here, include the motivation/context, test coverage, -->
<!-- the type of change i.e. breaking change, new feature, or bug fix -->
<!-- and related GitHub issue or screenshots (if applicable). -->




## Test Plan
<!--- Please describe the tests you have added and your testing environment (if applicable). -->




## Checklist
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask in our Slack. -->

- [ ] My change requires a change to the documentation and I have updated the documentation accordingly.
- [ ] I have added tests to cover my changes.
  • Loading branch information
johannkm committed Mar 14, 2022
1 parent 6e7cd25 commit b60c070
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import pendulum

from dagster import check
from dagster.core.events import DagsterEvent, EngineEventData, MetadataEntry, log_step_event
from dagster.core.events import (
DagsterEvent,
DagsterEventType,
EngineEventData,
MetadataEntry,
log_step_event,
)
from dagster.core.execution.context.system import PlanOrchestrationContext
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.execution.plan.step import ExecutionStep
Expand Down Expand Up @@ -42,9 +48,11 @@ def retries(self):
return self._retries

def _pop_events(self, instance, run_id) -> List[DagsterEvent]:
events = instance.logs_after(run_id, self._event_cursor)
events = instance.logs_after(run_id, self._event_cursor, of_type=set(DagsterEventType))
self._event_cursor += len(events)
return [event.dagster_event for event in events if event.is_dagster_event]
dagster_events = [event.dagster_event for event in events]
check.invariant(None not in dagster_events, "Query should not return a non dagster event")
return dagster_events

def _get_step_handler_context(
self, plan_context, steps, active_execution
Expand Down

0 comments on commit b60c070

Please sign in to comment.