Skip to content

Commit

Permalink
JobTemplater interface for getting JobTemplate of submitted job
Browse files Browse the repository at this point in the history
  • Loading branch information
dgruber committed Apr 8, 2023
1 parent 600ab00 commit 54d7dcd
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 2 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ _StageOutFiles_ creates a bucket if it does not exist before the job is submitte
If that failes then the job submission call fails. Currently only _gs://_ is evaluated
in the StageOutFiles map.

````go
StageOutFiles: map[string]string{
"gs://outputbucket": "/tmp/joboutput",
},
````

## Examples

See examples directory.
24 changes: 24 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gcpbatchtracker
import (
"context"
"fmt"
"io/ioutil"
"strings"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -57,3 +58,26 @@ func MountBucket(jobRequest *batchpb.CreateJobRequest, execPosition int, destina
}
return jobRequest
}

// ReadFromBucket reads the content of an object from a bucket.
// This is a convenience function to read files, like output files
// from a bucket.
func ReadFromBucket(source string, file string) ([]byte, error) {
if !strings.HasPrefix(source, "gs://") {
return nil, fmt.Errorf("source %s is not a GCS bucket", source)
}
bucket := strings.Split(strings.TrimPrefix(source, "gs://"), "/")[0]
storageClient, err := GetStorageClient()
if err != nil {
return nil, fmt.Errorf("could not create storage client: %v", err)
}
bucketHandle := storageClient.Bucket(bucket)
obj := bucketHandle.Object(file)
reader, err := obj.NewReader(context.Background())
if err != nil {
return nil, fmt.Errorf("could not read object %s from bucket %s: %v",
file, bucket, err)
}
defer reader.Close()
return ioutil.ReadAll(reader)
}
2 changes: 2 additions & 0 deletions examples/drmaa2job/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
jobsession.db
drmaa2job
13 changes: 12 additions & 1 deletion examples/drmaa2job/drmaa2job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func main() {
sm, err := drmaa2os.NewGoogleBatchSessionManager(
gcpbatchtracker.GoogleBatchTrackerParams{
GoogleProjectID: "yourgoogleprojectid",
GoogleProjectID: "googleprojectid",
Region: "us-central1",
},
"jobsession.db",
Expand Down Expand Up @@ -69,4 +69,15 @@ echo "hello from prolog"
panic(err)
}
fmt.Printf("Job %s terminated\n", termintedJob.GetID())

// Get job template
jt, err := job.GetJobTemplate()
if err != nil {
panic(err)
}
templateJSON, err := jt.MarshalJSON()
if err != nil {
panic(err)
}
fmt.Printf("Job template: %s\n", string(templateJSON))
}
76 changes: 75 additions & 1 deletion jobtemplate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gcpbatchtracker

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"math/rand"
Expand All @@ -10,6 +13,7 @@ import (
"time"

"github.com/dgruber/drmaa2interface"
"github.com/mitchellh/copystructure"
batchpb "google.golang.org/genproto/googleapis/cloud/batch/v1"
"google.golang.org/protobuf/types/known/durationpb"
)
Expand All @@ -20,6 +24,8 @@ const (
// job categories (otherwise it is a container image)
JobCategoryScriptPath = "$scriptpath$" // treats RemoteCommand as path to script and ignores args
JobCategoryScript = "$script$" // treats RemoteCommand as script and ignores args
// Env variable name container job template
EnvJobTemplate = "DRMAA2_JOB_TEMPLATE"
)

// https://cloud.google.com/go/docs/reference/cloud.google.com/go/batch/latest/apiv1#example-usage
Expand Down Expand Up @@ -58,6 +64,22 @@ echo 'Prolog'
barries = false
}

// set job template as environment variable, so that
// we can access it later; unfortunately, we cannot
// store it as a label as labels are limited to 63
// characters.
env, err := JobTemplateToEnv(jt)

if jt.JobEnvironment == nil {
jt.JobEnvironment = make(map[string]string)
}
jobEnvironment, err := copystructure.Copy(jt.JobEnvironment)
if err != nil {
return jobRequest,
fmt.Errorf("failed to copy job environment: %s", err)
}
jobEnvironment.(map[string]string)[EnvJobTemplate] = env

jobRequest.Job = &batchpb.Job{
Priority: int64(jt.Priority),
TaskGroups: []*batchpb.TaskGroup{
Expand All @@ -71,7 +93,9 @@ echo 'Prolog'
// what is with containers?
PermissiveSsh: true,
TaskSpec: &batchpb.TaskSpec{
Environments: jt.JobEnvironment,
Environment: &batchpb.Environment{
Variables: jobEnvironment.(map[string]string),
},
ComputeResource: &batchpb.ComputeResource{
CpuMilli: defaultCPUMilli,
BootDiskMib: defaultBootDiskMib,
Expand Down Expand Up @@ -455,3 +479,53 @@ func ValidateJobTemplate(jt drmaa2interface.JobTemplate) (drmaa2interface.JobTem
}
return jt, nil
}

// Implements JobTemplater interface

func (t *GCPBatchTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error) {
// get job template from env variables
job, err := t.client.GetJob(context.Background(), &batchpb.GetJobRequest{
Name: jobID,
})
if err != nil {
return drmaa2interface.JobTemplate{},
fmt.Errorf("could not get job %s: %v", jobID, err)
}

for _, group := range job.GetTaskGroups() {
for _, envs := range group.GetTaskEnvironments() {
value, exists := envs.GetVariables()[EnvJobTemplate]
if exists {
jt, err := GetJobTemplateFromBase64(value)
if err != nil {
continue
}
return jt, nil
}
}
}

return drmaa2interface.JobTemplate{},
fmt.Errorf("could not find job template in env variables")
}

func JobTemplateToEnv(jt drmaa2interface.JobTemplate) (string, error) {
jtBytes, err := json.Marshal(jt)
if err != nil {
return "", fmt.Errorf("could not marshal job template: %v", err)
}
return base64.StdEncoding.EncodeToString(jtBytes), nil
}

func GetJobTemplateFromBase64(base64encondedJT string) (drmaa2interface.JobTemplate, error) {
jt := drmaa2interface.JobTemplate{}
decodedJT, err := base64.StdEncoding.DecodeString(base64encondedJT)
if err != nil {
return jt, fmt.Errorf("could not decode job template: %v", err)
}
err = json.Unmarshal(decodedJT, &jt)
if err != nil {
return jt, fmt.Errorf("could not unmarshal job template: %v", err)
}
return jt, nil
}
36 changes: 36 additions & 0 deletions jobtemplate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,42 @@ var _ = Describe("Jobtemplate", func() {

})

Describe("JobTemplateToEnv", func() {

It("should encode a JobTemplate to a base64 string", func() {
jt := drmaa2interface.JobTemplate{
RemoteCommand: "test",
Args: []string{"arg1", "arg2"},
}
encodedJT, err := JobTemplateToEnv(jt)
Expect(err).NotTo(HaveOccurred())
Expect(encodedJT).ToNot(BeEmpty())
})

})

Describe("GetJobTemplateFromEnv", func() {

It("should decode a base64 string to a JobTemplate", func() {
jt := drmaa2interface.JobTemplate{
RemoteCommand: "test",
Args: []string{"arg1", "arg2"},
}
encodedJT, err := JobTemplateToEnv(jt)
Expect(err).NotTo(HaveOccurred())

decodedJT, err := GetJobTemplateFromBase64(encodedJT)
Expect(err).NotTo(HaveOccurred())
Expect(decodedJT).To(Equal(jt))
})

It("should return an error for an invalid base64 string", func() {
_, err := GetJobTemplateFromBase64("invalid base64 string")
Expect(err).To(HaveOccurred())
})

})

Context("Regressions", func() {

It("should not generate the same job id if non is provided", func() {
Expand Down

0 comments on commit 54d7dcd

Please sign in to comment.