-
Notifications
You must be signed in to change notification settings - Fork 136
/
server.go
82 lines (71 loc) · 2.61 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package server
import (
"context"
"github.com/gogo/protobuf/types"
log "github.com/sirupsen/logrus"
"github.com/armadaproject/armada/internal/jobservice/configuration"
"github.com/armadaproject/armada/internal/jobservice/events"
"github.com/armadaproject/armada/internal/jobservice/repository"
js "github.com/armadaproject/armada/pkg/api/jobservice"
)
type JobServiceServer struct {
jobServiceConfig *configuration.JobServiceConfiguration
jobRepository repository.SQLJobService
newSubChan chan *repository.SubscribedTuple
}
func NewJobService(config *configuration.JobServiceConfiguration, sqlService repository.SQLJobService) *JobServiceServer {
return &JobServiceServer{
jobServiceConfig: config,
jobRepository: sqlService,
// TODO: What's a reasonable buffer length?
newSubChan: make(chan *repository.SubscribedTuple, 1000),
}
}
func (s *JobServiceServer) GetNewSubscriptionChannel() <-chan *repository.SubscribedTuple {
return s.newSubChan
}
func (s *JobServiceServer) GetJobStatus(ctx context.Context, opts *js.JobServiceRequest) (*js.JobServiceResponse, error) {
requestFields := log.Fields{
"job_id": opts.JobId,
"job_set_id": opts.JobSetId,
"queue": opts.Queue,
}
jobSetExists, fromMessageId, err := s.jobRepository.IsJobSetSubscribed(ctx, opts.Queue, opts.JobSetId)
if err != nil {
log.Error("error checking if job is subscribed", err)
}
if !jobSetExists {
errsubscribe := s.jobRepository.SubscribeJobSet(ctx, opts.Queue, opts.JobSetId, fromMessageId)
if errsubscribe != nil {
log.Error("unable to subscribe job set", err)
} else {
s.newSubChan <- &repository.SubscribedTuple{
JobSetKey: repository.JobSetKey{
Queue: opts.Queue,
JobSetId: opts.JobSetId,
},
FromMessageId: fromMessageId,
}
}
log.Infof("Subscribing to queue %s jobset %s messageId %s", opts.Queue, opts.JobSetId, fromMessageId)
} else {
if err := s.jobRepository.UpdateJobSetDb(ctx, opts.Queue, opts.JobSetId, fromMessageId); err != nil {
log.WithFields(requestFields).Warn(err)
}
}
response, err := s.jobRepository.GetJobStatus(ctx, opts.JobId)
if err != nil {
log.WithFields(requestFields).Error(err)
return nil, err
}
return response, err
}
func (s *JobServiceServer) Health(ctx context.Context, _ *types.Empty) (*js.HealthCheckResponse, error) {
eventClient := events.NewEventClient(&s.jobServiceConfig.ApiConnection)
_, err := eventClient.Health(context.Background(), &types.Empty{})
if err != nil {
log.Errorf("health check failed for events with %s", err)
return nil, err
}
return &js.HealthCheckResponse{Status: js.HealthCheckResponse_SERVING}, nil
}