/
job.go
150 lines (129 loc) · 4.77 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runnerlib
import (
"context"
"fmt"
"io"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
"github.com/golang/protobuf/proto"
)
// JobOptions capture the various options for submitting jobs
// to universal runners.
type JobOptions struct {
// Name is the job name.
Name string
// Experiments are additional experiments.
Experiments []string
// TODO(herohde) 3/17/2018: add further parametrization as needed
// Worker is the worker binary override.
Worker string
// RetainDocker is an option to pass to the runner.
RetainDocker bool
Parallelism int
}
// Prepare prepares a job to the given job service. It returns the preparation id
// artifact staging endpoint, and staging token if successful.
func Prepare(ctx context.Context, client jobpb.JobServiceClient, p *pipepb.Pipeline, opt *JobOptions) (id, endpoint, stagingToken string, err error) {
hooks.SerializeHooksToOptions()
raw := runtime.RawOptionsWrapper{
Options: beam.PipelineOptions.Export(),
AppName: opt.Name,
Experiments: append(opt.Experiments, "beam_fn_api"),
RetainDocker: opt.RetainDocker,
Parallelism: opt.Parallelism,
}
options, err := provision.OptionsToProto(raw)
if err != nil {
return "", "", "", errors.WithContext(err, "producing pipeline options")
}
req := &jobpb.PrepareJobRequest{
Pipeline: p,
PipelineOptions: options,
JobName: opt.Name,
}
resp, err := client.Prepare(ctx, req)
if err != nil {
return "", "", "", errors.Wrap(err, "failed to connect to job service")
}
return resp.GetPreparationId(), resp.GetArtifactStagingEndpoint().GetUrl(), resp.GetStagingSessionToken(), nil
}
// Submit submits a job to the given job service. It returns a jobID, if successful.
func Submit(ctx context.Context, client jobpb.JobServiceClient, id, token string) (string, error) {
req := &jobpb.RunJobRequest{
PreparationId: id,
RetrievalToken: token,
}
resp, err := client.Run(ctx, req)
if err != nil {
return "", errors.Wrap(err, "failed to submit job")
}
return resp.GetJobId(), nil
}
// WaitForCompletion monitors the given job until completion. It logs any messages
// and state changes received.
func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID string) error {
stream, err := client.GetMessageStream(ctx, &jobpb.JobMessagesRequest{JobId: jobID})
if err != nil {
return errors.Wrap(err, "failed to get job stream")
}
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
switch {
case msg.GetStateResponse() != nil:
resp := msg.GetStateResponse()
log.Infof(ctx, "Job state: %v", resp.GetState().String())
switch resp.State {
case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
return nil
case jobpb.JobState_FAILED:
return errors.Errorf("job %v failed", jobID)
}
case msg.GetMessageResponse() != nil:
resp := msg.GetMessageResponse()
text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText())
log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text)
default:
return errors.Errorf("unexpected job update: %v", proto.MarshalTextString(msg))
}
}
}
func messageSeverity(importance jobpb.JobMessage_MessageImportance) log.Severity {
switch importance {
case jobpb.JobMessage_JOB_MESSAGE_ERROR:
return log.SevError
case jobpb.JobMessage_JOB_MESSAGE_WARNING:
return log.SevWarn
case jobpb.JobMessage_JOB_MESSAGE_BASIC:
return log.SevInfo
case jobpb.JobMessage_JOB_MESSAGE_DEBUG, jobpb.JobMessage_JOB_MESSAGE_DETAILED:
return log.SevDebug
default:
return log.SevUnspecified
}
}