/
slurm_apptainer.go
127 lines (107 loc) · 2.94 KB
/
slurm_apptainer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package batflow
import (
"context"
"fmt"
"path"
"strings"
"golang.org/x/crypto/ssh"
"k8s.io/apimachinery/pkg/util/rand"
)
const (
DeviceNvidiaGPUKey = "nvidia.com/gpu"
)
type Container struct {
// ID of the container specified as a DNS_LABEL. Each Container must
// have a unique name.
ID string `json:"id"`
// Image container image name.
Image Image `json:"image"`
// Command to execute.
Command []string `json:"command"`
// Ports list of port to expose from the container.
Ports []uint16 `json:"ports"`
// Resources compute resources required by container.
Resource Resource `json:"resource"`
Status string `json:"status"`
}
// Image container image name.
type Image struct {
URI string `json:"uri"`
}
// Resource compute resources required by container.
type Resource struct {
CPU struct {
Cores uint `json:"cores"`
}
Memory struct {
Size string `json:"size"`
}
Devices map[string]uint `json:"devices"`
}
type SlurmApptainerActivities struct {
sshClient *ssh.Client
}
func NewSlurmApptainerActivites(client *ssh.Client) *SlurmApptainerActivities {
return &SlurmApptainerActivities{sshClient: client}
}
// Start a named instance of the given container image
func (a *SlurmApptainerActivities) Start(ctx context.Context, container *Container) (err error) {
sess, err := a.sshClient.NewSession()
if err != nil {
return fmt.Errorf("ssh new session: %v", err)
}
defer sess.Close()
if err := sess.Run(buildSlurmJobSubmitCommand(container)); err != nil {
return fmt.Errorf("submit job: %v", err)
}
return nil
}
// List all running and named container instances
func (a *SlurmApptainerActivities) List(ctx context.Context, names []string) (err error) {
return
}
func buildSlurmJobSubmitCommand(container *Container) string {
if container.ID == "" {
container.ID = rand.String(16)
}
workDir := path.Join("~/.batflow/jobs", container.ID)
return `sh <<- 'CMD'
mkdir -p ` + workDir + ` || exit $?
cd ` + workDir + ` || exit $?
# Generate job script.
cat <<- 'SCRIPT' > job.sh || exit $?
#!/usr/bin/env bash
` + buildSlurmJobOptions(container) + `
[[ -f ~/.batflow/env ]] && source ~/.batflow/env || true
` + buildSlurmJobStepCommand(container) + `
SCRIPT
# Submit job script.
sbatch job.sh
CMD`
}
func buildSlurmJobOptions(container *Container) string {
opts := []string{
fmt.Sprintf("--job-name=%q", container.ID),
}
if count, ok := container.Resource.Devices[DeviceNvidiaGPUKey]; ok {
opts = append(opts, fmt.Sprintf("--gpus=%d", count))
}
for i, opt := range opts {
opts[i] = "#SBATCH " + opt
}
return strings.Join(opts, "\n")
}
func buildSlurmJobStepCommand(container *Container) string {
command := []string{
"apptainer",
"exec",
"--compat",
"--fakeroot",
}
if _, ok := container.Resource.Devices[DeviceNvidiaGPUKey]; ok {
command = append(command, "--nv")
}
command = append(command, container.Image.URI)
command = append(command, container.Command...)
return strings.Join(command, " ")
}