forked from gazette/core
/
service.go
55 lines (49 loc) · 1.88 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package consumer
import (
"context"
"github.com/LiveRamp/gazette/v2/pkg/allocator"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/trace"
"google.golang.org/grpc"
)
// Service is the top-level runtime concern of a Gazette Consumer process.
// It drives local shard processing in response to allocator.State,
// powers shard resolution, and is also an implementation of ShardServer.
type Service struct {
// Resolver of Service shards.
Resolver *Resolver
// Distributed allocator state of the service.
State *allocator.State
// Loopback connection which defaults to the local server, but is wired with
// a protocol.DispatchBalancer. Consumer applications may use Loopback to
// proxy application-specific RPCs to peer consumer instances, after
// performing shard resolution.
Loopback *grpc.ClientConn
// Journal client for use by consumer applications.
Journals pb.RoutedJournalClient
// Etcd client for use by consumer applications.
Etcd *clientv3.Client
}
// NewService constructs a new Service of the Application, driven by allocator.State.
func NewService(app Application, state *allocator.State, rjc pb.RoutedJournalClient, lo *grpc.ClientConn, etcd *clientv3.Client) *Service {
return &Service{
Resolver: NewResolver(state, func() *Replica { return NewReplica(app, state.KS, etcd, rjc) }),
State: state,
Loopback: lo,
Journals: rjc,
Etcd: etcd,
}
}
// Watch the Service KeySpace and serve any local assignments
// reflected therein, until the Context is cancelled or an error occurs.
// Watch shuts down all local replicas prior to return regardless of
// error status.
func (svc *Service) Watch(ctx context.Context) error {
return svc.Resolver.watch(ctx, svc.Etcd)
}
func addTrace(ctx context.Context, format string, args ...interface{}) {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(format, args...)
}
}