Skip to content

Commit

Permalink
Wire in min cardinality (#8)
Browse files Browse the repository at this point in the history
* Wire up minimum gang cardinality

* Wire up minimum gang cardinality

* Wire up minimum gang cardinality

* Wire up minimum gang cardinality

* Bump armada airflow operator to version 0.5.4 (#2961)

* Bump armada airflow operator to version 0.5.4

Signed-off-by: Rich Scott <richscott@sent.com>

* Regenerate Airflow Operator Markdown doc.

Signed-off-by: Rich Scott <richscott@sent.com>

* Fix regenerated Airflow doc error.

Signed-off-by: Rich Scott <richscott@sent.com>

* Pin versions of all modules, especially around docs generation.

Signed-off-by: Rich Scott <richscott@sent.com>

* Regenerate Airflow docs using Python 3.10

Signed-off-by: Rich Scott <richscott@sent.com>

---------

Signed-off-by: Rich Scott <richscott@sent.com>

* Infer failed jobs from job context, tidy up

* Infer failed jobs from job context, tidy up

* Magefile: Clean all Makefile refernces (#2957)

* tiny naming change

* clean all make refernces

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>

---------

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>

* Infer failed jobs from job context, tidy up

* Revert to previous unpinned airflow version spec. (#2967)

* Revert to previous unpinned airflow version spec.

Signed-off-by: Rich Scott <richscott@sent.com>

* Increment armada-airflow module version.

Signed-off-by: Rich Scott <richscott@sent.com>

---------

Signed-off-by: Rich Scott <richscott@sent.com>

* Only fail gang jobs when the overall gang min cardinality is set. Fix error handling

* Only fail gang jobs when the overall gang min cardinality is set. Fix error handling

* Only fail gang jobs when the overall gang min cardinality is set. Fix error handling

* Update jobdb with any excess gang jobs that failed

* ArmadaContext.Log Improvements (#2965)

* log error

* context log

* context log

* add cycle id

* typo

* lint

* refactor armadacontext to implement a FieldLogger

---------

Co-authored-by: Chris Martin <chris@cmartinit.co.uk>

* Fix-up existing tests before adding new ones

* Add new tests for minimum gang sizes

* Test that excess failed gang jobs are committed to jobdb

* Run `on.push` only for master (#2968)

* Run On Push only for master

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>

* remove not-workflows

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>

---------

Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>

* Add test for failed job pulsar messages

* Tidy tests

* WIP: Airflow: fix undefined poll_interval in Deferrable Operator (#2975)

* Airflow: handle poll_interval attr in ArmadaJobCompleteTrigger

Fix incomplete handling of 'poll_interval' attribute in
ArmadaJobCompleteTrigger, used by the Armada Deferrable Operator for
Airflow.

Signed-off-by: Rich Scott <richscott@sent.com>

* Airflow - add unit test for armada deferrable operator

Run much of the same tests for the deferrable operator as for the
regular operator, plus test serialization.  Also, update interval
signifier in examples. A full test of the deferrable operator that
verifies the trigger handling is still needed.

Signed-off-by: Rich Scott <richscott@sent.com>

---------

Signed-off-by: Rich Scott <richscott@sent.com>

* Release Airflow Operator v0.5.6 (#2979)

Signed-off-by: Rich Scott <richscott@sent.com>

* #2905 - fix indentation (#2971)

Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>

Signed-off-by: Rich Scott <richscott@sent.com>
Signed-off-by: mohamed <mohamedabdelfatah2027@gmail.com>
Co-authored-by: Rich Scott <rich@gr-oss.io>
Co-authored-by: Mohamed Abdelfatah <39927413+Mo-Fatah@users.noreply.github.com>
Co-authored-by: Chris Martin <council_tax@hotmail.com>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Dave Gantenbein <dave@gr-oss.io>
Co-authored-by: Adam McArthur <46480158+Sharpz7@users.noreply.github.com>
  • Loading branch information
7 people authored and GitHub Enterprise committed Sep 19, 2023
1 parent d616feb commit b22b7ca
Show file tree
Hide file tree
Showing 55 changed files with 1,394 additions and 644 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/autoupdate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: docker://chinthakagodawita/autoupdate-action:v1
env:
GITHUB_TOKEN: '${{ secrets.GITHUB_TOKEN }}'
PR_LABELS: "auto-update"
PR_LABELS: "auto-update"
MERGE_MSG: "Branch was auto-updated."
RETRY_COUNT: "5"
RETRY_SLEEP: "300"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: CI

on:
push:
branches:
- master
tags:
- v*
branches-ignore:
- gh-pages
pull_request:
branches-ignore:
- gh-pages
Expand Down
47 changes: 0 additions & 47 deletions .github/workflows/not-airflow-operator.yml

This file was deleted.

42 changes: 0 additions & 42 deletions .github/workflows/not-python-client.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ jobs:
echo -e "### Git status" >> $GITHUB_STEP_SUMMARY
if [[ "$changed" -gt 0 ]]; then
echo -e "Generated proto files are out of date. Please run 'make proto' and commit the changes." >> $GITHUB_STEP_SUMMARY
echo -e "Generated proto files are out of date. Please run 'mage proto' and commit the changes." >> $GITHUB_STEP_SUMMARY
git status -s -uno >> $GITHUB_STEP_SUMMARY
Expand Down
2 changes: 1 addition & 1 deletion client/python/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ workflow for contributing. First time contributors can follow the guide below to
Unlike most python projects, the Armada python client contains a large quantity of generated code. This code must be
generated in order to compile and develop against the client.

From the root of the repository, run `make python`. This will generate python code needed to build
From the root of the repository, run `mage buildPython`. This will generate python code needed to build
and use the client. This command needs to be re-run anytime an API change is committed (e.g. a change to a `*.proto`
file).

Expand Down
2 changes: 1 addition & 1 deletion client/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ Before beginning, ensure you have:
- Network access to fetch docker images and go dependencies.

To generate all needed code, and install the python client:
1) From the root of the repository, run `make python`
1) From the root of the repository, run `mage buildPython`
2) Install the client using `pip install client/python`. It's strongly recommended you do this inside a virtualenv.
4 changes: 2 additions & 2 deletions client/python/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ Usage

Easy way:
- Ensure all protobufs files needed for the client are generated by running
`make python` from the repository root.
`mage buildPython` from the repository root.
- `tox -e docs` will create a valid virtual environment and use it to generate
documentation. The generated files will be placed under `build/jekyll/*.md`.

Manual way:
- Ensure all protobufs files needed for the client are generated by running
`make python` from the repository root.
`mage buildPython` from the repository root.
- Create a virtual environment containing all the deps listed in `tox.ini`
under `[testenv:docs]`.
- Run `poetry install -v` from inside `client/python` to install the client
Expand Down
4 changes: 2 additions & 2 deletions cmd/scheduler/cmd/prune_database.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package cmd

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
)
Expand Down Expand Up @@ -57,7 +57,7 @@ func pruneDatabase(cmd *cobra.Command, _ []string) error {
return errors.WithMessagef(err, "Failed to connect to database")
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), timeout)
defer cancel()
return schedulerdb.PruneDb(ctx, db, batchSize, expireAfter, clock.RealClock{})
}
2 changes: 1 addition & 1 deletion docs/developer/manual-localdev.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mage BootstrapTools
# Compile .pb.go files from .proto files
# (only necessary after changing a .proto file).
mage proto
make dotnet
mage dotnet

# Build the Docker images containing all Armada components.
# Only the main "bundle" is needed for quickly testing Armada.
Expand Down
30 changes: 27 additions & 3 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,27 @@ Reports the result of the job and returns.



#### serialize()
Get a serialized version of this object.


* **Returns**

A dict of keyword arguments used when instantiating



* **Return type**

dict


this object.


#### template_fields(_: Sequence[str_ _ = ('job_request_items',_ )

### _class_ armada.operators.armada_deferrable.ArmadaJobCompleteTrigger(job_id, job_service_channel_args, armada_queue, job_set_id, airflow_task_name)
### _class_ armada.operators.armada_deferrable.ArmadaJobCompleteTrigger(job_id, job_service_channel_args, armada_queue, job_set_id, airflow_task_name, poll_interval=30)
Bases: `BaseTrigger`

An airflow trigger that monitors the job state of an armada job.
Expand Down Expand Up @@ -269,6 +287,9 @@ Triggers when the job is complete.
belongs.


* **poll_interval** (*int*) – How often to poll jobservice to get status.



* **Returns**

Expand All @@ -281,7 +302,7 @@ Runs the trigger. Meant to be called by an airflow triggerer process.


#### serialize()
Returns the information needed to reconstruct this Trigger.
Return the information needed to reconstruct this Trigger.


* **Returns**
Expand Down Expand Up @@ -664,7 +685,7 @@ A terminated event is SUCCEEDED, FAILED or CANCELLED



### _async_ armada.operators.utils.search_for_job_complete_async(armada_queue, job_set_id, airflow_task_name, job_id, job_service_client, log, time_out_for_failure=7200)
### _async_ armada.operators.utils.search_for_job_complete_async(armada_queue, job_set_id, airflow_task_name, job_id, job_service_client, log, poll_interval, time_out_for_failure=7200)
Poll JobService cache asyncronously until you get a terminated event.

A terminated event is SUCCEEDED, FAILED or CANCELLED
Expand All @@ -689,6 +710,9 @@ A terminated event is SUCCEEDED, FAILED or CANCELLED
It is optional only for testing


* **poll_interval** (*int*) – How often to poll jobservice to get status.


* **time_out_for_failure** (*int*) – The amount of time a job
can be in job_id_not_found
before we decide it was a invalid job
Expand Down
12 changes: 6 additions & 6 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
lastSeen,
)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf(
logging.WithStacktrace(ctx, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetClusterId(),
)
continue
Expand Down Expand Up @@ -566,7 +566,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
if q.SchedulingContextRepository != nil {
sctx.ClearJobSpecs()
if err := q.SchedulingContextRepository.AddSchedulingContext(sctx); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to store scheduling context")
logging.WithStacktrace(ctx, err).Error("failed to store scheduling context")
}
}

Expand Down Expand Up @@ -641,7 +641,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
jobIdsToDelete := util.Map(jobsToDelete, func(job *api.Job) string { return job.Id })
log.Infof("deleting preempted jobs: %v", jobIdsToDelete)
if deletionResult, err := q.jobRepository.DeleteJobs(jobsToDelete); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to delete preempted jobs from Redis")
logging.WithStacktrace(ctx, err).Error("failed to delete preempted jobs from Redis")
} else {
deleteErrorByJobId := armadamaps.MapKeys(deletionResult, func(job *api.Job) string { return job.Id })
for jobId := range preemptedApiJobsById {
Expand Down Expand Up @@ -704,7 +704,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
}
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil {
logging.WithStacktrace(ctx.Log, err).Errorf("failed to update cluster usage")
logging.WithStacktrace(ctx, err).Errorf("failed to update cluster usage")
}

allocatedByQueueAndPriorityClassForPool = q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool)
Expand All @@ -728,7 +728,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
v := node.Labels[q.schedulingConfig.Preemption.NodeIdLabel]
Expand Down Expand Up @@ -764,7 +764,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
podSpec.NodeName = node.Name
Expand Down
8 changes: 4 additions & 4 deletions internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error {
sequence, err := eventutil.UnmarshalEventSequence(ctxWithLogger, msg.Payload())
if err != nil {
srv.ack(ctx, msg)
logging.WithStacktrace(ctxWithLogger.Log, err).Warnf("processing message failed; ignoring")
logging.WithStacktrace(ctxWithLogger, err).Warnf("processing message failed; ignoring")
numErrored++
break
}

ctxWithLogger.Log.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
// TODO: Improve retry logic.
srv.ProcessSequence(ctxWithLogger, sequence)
srv.ack(ctx, msg)
Expand All @@ -155,11 +155,11 @@ func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *
for i < len(sequence.Events) && time.Since(lastProgress) < timeout {
j, err := srv.ProcessSubSequence(ctx, i, sequence)
if err != nil {
logging.WithStacktrace(ctx.Log, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
logging.WithStacktrace(ctx, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
}

if j == i {
ctx.Log.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")
ctx.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")

// We should only get here if a transient error occurs.
// Sleep for a bit before retrying.
Expand Down
Loading

0 comments on commit b22b7ca

Please sign in to comment.