Skip to content

Commit

Permalink
tweaks for snakemake work with flux operator executor (#117)
Browse files Browse the repository at this point in the history
* tweaks for snakemake work with flux operator executor
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Mar 15, 2023
1 parent 737b577 commit f565d1f
Show file tree
Hide file tree
Showing 21 changed files with 142 additions and 238 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
matrix:
# This is a matrix of test commands and containers, MiniKube is assumed to be running
test: [["python ./sdk/python/v1alpha1/examples/port-forward.py", "ghcr.io/flux-framework/flux-restful-api:latest"],
["python ./sdk/python/v1alpha1/examples/state-basic-job-completion-minicluster.py", "ghcr.io/flux-framework/flux-restful-api:latest"],
["python ./sdk/python/v1alpha1/examples/state-pending-jobs-minicluster.py", "ghcr.io/flux-framework/flux-restful-api:latest"],
["python ./sdk/python/v1alpha1/examples/interactive-submit.py", "ghcr.io/flux-framework/flux-restful-api:latest"],
["pytest -xs ./tests/python/test_*.py", "ghcr.io/flux-framework/flux-restful-api:latest"]]

Expand Down Expand Up @@ -65,8 +65,15 @@ jobs:
minikube ssh docker pull ${container}
make
make install
make docker-build
printf "Loading Flux Operator latest image...\n"
minikube image load ghcr.io/flux-framework/flux-operator:latest
kubectl apply -f ./examples/dist/flux-operator.yaml
- name: Test ${{ matrix.test[0] }}
env:
command: ${{ matrix.test[0] }}
run: /bin/bash ./script/test-python.sh ${command}
run: |
make clean
echo "Running command: ${command}"
${command}
3 changes: 3 additions & 0 deletions api/v1alpha1/minicluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ type LifeCycle struct {

// +optional
PostStartExec string `json:"postStartExec"`

// +optional
PreStopExec string `json:"preStopExec"`
}

type FluxUser struct {
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@
"postStartExec": {
"type": "string",
"default": ""
},
"preStopExec": {
"type": "string",
"default": ""
}
}
},
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/crd/bases/flux-framework.org_miniclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ spec:
properties:
postStartExec:
type: string
preStopExec:
type: string
type: object
name:
description: Container name is only required for non flux runners
Expand Down
34 changes: 4 additions & 30 deletions controllers/flux/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ SPDX-License-Identifier: Apache-2.0
package controllers

import (
"fmt"
"path/filepath"
"strings"

corev1 "k8s.io/api/core/v1"

api "flux-framework/flux-operator/api/v1alpha1"
Expand All @@ -30,43 +26,21 @@ func (r *MiniClusterReconciler) createContainerLifecycle(

// Manual lifecycles from the user before container start
if container.LifeCycle.PostStartExec != "" {
r.log.Info("🌀 MiniCluster", "LifeCycle.PostStartExec", container.LifeCycle.PostStartExec)
lifecycle.PostStart = &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{container.LifeCycle.PostStartExec},
},
}
}

// If this logic needs to be shared can be moved external to the function
fluxuser := "flux"
if container.FluxUser.Name != "" {
fluxuser = container.FluxUser.Name
}

// Assemble the command
asSudo := "sudo -E PYTHONPATH=$PYTHONPATH -E PATH=$PATH"
asFlux := fmt.Sprintf("sudo -u %s -E PYTHONPATH=$PYTHONPATH -E PATH=$PATH -E HOME=/home/%s", fluxuser, fluxuser)
if container.Commands.RunFluxAsRoot {
asFlux = asSudo + fmt.Sprintf("-E HOME=/home/%s", fluxuser)
}

// If we have an archive path, we will need to save there
// Note that copy FROM archive TO container happens in wait.sh
if cluster.Spec.Archive.Path != "" {

dirname := filepath.Dir(cluster.Spec.Archive.Path)
preStop := []string{
"/bin/bash", "-c",
fmt.Sprintf("mkdir -p %s && %s flux proxy local:///var/run/flux/local flux dump %s",
dirname, asFlux, cluster.Spec.Archive.Path),
}
r.log.Info("🌀 MiniCluster", "LifeCycle.PreStopExec", strings.Join(preStop, " "))
if container.LifeCycle.PreStopExec != "" {
r.log.Info("🌀 MiniCluster", "LifeCycle.PreStopExec", container.LifeCycle.PreStopExec)
lifecycle.PreStop = &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: preStop,
Command: []string{container.LifeCycle.PreStopExec},
},
}

}
return &lifecycle
}
2 changes: 2 additions & 0 deletions controllers/flux/templates/wait.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ brokerOptions="-Scron.directory=/etc/flux/system/cron.d \
-Slog-stderr-mode=local"

# if we are given an archive to use, load first, not required to exist
# Note that we ask the user to dump in interactive mode - I am not
# sure that doing it with a hook ensures the dump will be successful.
{{if .Archive.Path }}
if [[ -e "{{ .Archive.Path}}" ]]; then
{{ if not .Logging.Quiet }}printf "🧊️ Found existing archive at {{ .Archive.Path}} loading into state directory\nBefore:\n"{{ end }}
Expand Down
11 changes: 11 additions & 0 deletions docs/getting_started/custom-resource-definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,17 @@ we provide this argument on the level of the container. To enable this, set this
diagnostics: false
```

#### lifeCycle

You can define postStartExec or preStopExec hooks.

```yaml
lifeCycle:
postStartExec: ...
preStopExec: ...
```


### volumes

Volumes that are defined on the level of the container must be defined at the top level of the MiniCluster.
Expand Down
24 changes: 19 additions & 5 deletions docs/tutorials/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ of difficulty:
These small tutorials will walk through examples of each. The most likely use cases for doing
this will be using the Flux Operator Python SDK (since we need to create multiple clusters)
in a reasonable way) but for the purposes of explanation, minicluster.yaml files are provided
as well.
as well. One important note is that since we cannot control the timing of a pod termination,
while we can have Flux automatically load a saved archive, for the process to wait for
jobs to finish and then dump the archive anew, we rely on issuing a command to the MiniCluster
(done by a script or workflow tool). This can likely be improved upon.

## Saving Pending Jobs

Expand Down Expand Up @@ -50,6 +53,9 @@ flux queue stop

# This should wait for running jobs to finish
flux queue idle

# And then do the dump!
flux dump /state/archive.tar.gz
```

And this means we will stop and wait for jobs to finish, and then this state is loaded
Expand Down Expand Up @@ -118,7 +124,7 @@ Note that interactive mode is set to true - this will start a broker to keep run
Since we are defining the archive path to `/state/archive.tar.gz`, this means that before Flux is started,
we will load an archive from that path given that it exists with `flux resource reload`. This is done directly
in the entrypoint. To have better control of the reverse sequence - saving the final state to that same location,
we will run `flux dump` to that same archive via a Pre stop lifecycle hook. Note this is a simple approach
we will run `flux dump` to that same archive as an interactive command. Note this is a simple approach
that assumes we are OK replacing a previous state with a new one - for more complex workflows (where
possibly we need to maintain an original state) we likely will need to do something differently. For
the time being, let's create this first minicluster to submit jobs to, and the plan will be
Expand Down Expand Up @@ -237,7 +243,6 @@ $ kubectl apply -f examples/state/basic-job-completion/minicluster.yaml
```

At this point you can proceed to either [Interactive Submit](#interactive-submit) or [Programmatic Submit](#programmatic-submit).
We also have this example demonstrated [entirely in Python](https://github.com/flux-framework/flux-operator/tree/main/sdk/python/v1alpha1/examples/state-basic-job-completion-minicluster.py) using the Flux Operator Python SDK.

### Interactive Submit

Expand Down Expand Up @@ -307,13 +312,22 @@ When you are done, you can see all the jobs:
$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux jobs -a
```

Give the jobs a little bit of time to run, and after that, outside of the shell (if you didn't already exit) let's delete the Minicluster.
Then you can stop the queue, wait for jobs to finish, and request the dump. Note that we do this
as an interactive command and not automatically because (for large dumps 💩️) we cannot ensure that the time
will be given for completion.

```bash
$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux queue stop
$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux queue idle
$ kubectl exec -it -n flux-operator flux-sample-0-mbv54 -- sudo -u flux flux proxy local:///var/run/flux/local flux queue dump /state/archive.tar.gz
```

After that, outside of the shell (if you didn't already exit) let's delete the Minicluster.

```bash
$ kubectl delete -f examples/state/basic-job-completion/minicluster.yaml
```

Since we have the lifecycle hook in place, it's going to take slightly longer to be terminated than if we didn't.
At this point, it should be the case that the same flux state directory is dumped to the archive path we requested,
which is located at `/tmp/data/archive.tar.gz` in the MiniKube vm (`/tmp/data` is bound to `/state` and the
archive inside the container is asked to be saved to `/state/archive.tar.gz`).
Expand Down
2 changes: 2 additions & 0 deletions examples/dist/flux-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ spec:
properties:
postStartExec:
type: string
preStopExec:
type: string
type: object
name:
description: Container name is only required for non flux runners
Expand Down
6 changes: 5 additions & 1 deletion examples/state/basic-job-completion/example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ sleep 10
printf "Jobs finished...\n"
kubectl exec -it -n flux-operator ${broker_pod} -- sudo -u flux flux proxy local:///var/run/flux/local flux jobs -a

kubectl exec -it -n flux-operator ${broker_pod} -- sudo -u flux flux proxy local:///var/run/flux/local flux queue stop
kubectl exec -it -n flux-operator ${broker_pod} -- sudo -u flux flux proxy local:///var/run/flux/local flux queue idle
kubectl exec -it -n flux-operator ${broker_pod} -- sudo -u flux flux proxy local:///var/run/flux/local flux dump /state/archive.tar.gz

printf "\n🥱️ Wait a minute to be sure we have saved...\n"
sleep 60
sleep 30

printf "\n🧊️ Current state directory at /var/lib/flux...\n"
kubectl exec -it -n flux-operator ${broker_pod} -- ls -l /var/lib/flux
Expand Down
2 changes: 1 addition & 1 deletion hack/python-sdk/template/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
if __name__ == "__main__":
setup(
name="fluxoperator",
version="0.0.16",
version="0.0.17",
author="Vanessasaurus",
author_email="vsoch@users.noreply.github.com",
maintainer="Vanessasaurus",
Expand Down
25 changes: 0 additions & 25 deletions script/test-python.sh

This file was deleted.

1 change: 1 addition & 0 deletions sdk/python/v1alpha1/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and **Merged pull requests**. Critical items to know are:
The versions coincide with releases on pip. Only major versions will be released as tags on Github.

## [0.0.x](https://github.com/flux-framework/flux-operator/tree/main/sdk/python/v2alpha1) (0.0.x)
- Tweaks to client and pod resources (bugfix) for snakemake work (0.0.17)
- Support for flux start / broker / submit commands->prefix (0.0.16)
- Support for MiniClusterArchive (0.0.15)
- Support for MiniClusterExistingVolume (0.0.14)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v1alpha1/docs/LifeCycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**post_start_exec** | **str** | | [optional] [default to '']
**pre_stop_exec** | **str** | | [optional] [default to '']

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down

0 comments on commit f565d1f

Please sign in to comment.