This repository has been archived by the owner on Feb 6, 2024. It is now read-only.
/
forward.go
64 lines (56 loc) · 1.99 KB
/
forward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package grpc
import (
"context"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/service"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
)
// getForwardedCeresmetaClient get forwarded ceresmeta client. When current node is the leader, this func will return (nil,nil).
func (s *Service) getForwardedCeresmetaClient(ctx context.Context) (metaservicepb.CeresmetaRpcServiceClient, error) {
forwardedAddr, _, err := s.getForwardedAddr(ctx)
if err != nil {
return nil, errors.WithMessage(err, "get forwarded ceresmeta client")
}
if forwardedAddr != "" {
ceresmetaClient, err := s.getCeresmetaClient(ctx, forwardedAddr)
if err != nil {
return nil, errors.WithMessagef(err, "get forwarded ceresmeta client, addr:%s", forwardedAddr)
}
return ceresmetaClient, nil
}
return nil, nil
}
func (s *Service) getCeresmetaClient(ctx context.Context, addr string) (metaservicepb.CeresmetaRpcServiceClient, error) {
client, err := s.getForwardedGrpcClient(ctx, addr)
if err != nil {
return nil, errors.WithMessagef(err, "get ceresmeta client, addr:%s", addr)
}
return metaservicepb.NewCeresmetaRpcServiceClient(client), nil
}
func (s *Service) getForwardedGrpcClient(ctx context.Context, forwardedAddr string) (*grpc.ClientConn, error) {
client, ok := s.conns.Load(forwardedAddr)
if !ok {
log.Info("try to create ceresmeta client", zap.String("addr", forwardedAddr))
cc, err := service.GetClientConn(ctx, forwardedAddr)
if err != nil {
return nil, err
}
client = cc
s.conns.Store(forwardedAddr, cc)
}
return client.(*grpc.ClientConn), nil
}
func (s *Service) getForwardedAddr(ctx context.Context) (string, bool, error) {
resp, err := s.h.GetLeader(ctx)
if err != nil {
return "", false, errors.WithMessage(err, "get forwarded addr")
}
if resp.IsLocal {
return "", true, nil
}
return resp.LeaderEndpoint, false, nil
}