/
service.go
65 lines (58 loc) · 1.6 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package resolver
import (
"context"
"errors"
"fmt"
"github.com/awakari/client-sdk-go/api/grpc/auth"
"github.com/awakari/client-sdk-go/api/grpc/limits"
"github.com/awakari/client-sdk-go/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Service interface {
OpenWriter(ctx context.Context, userId string) (w model.Writer[*pb.CloudEvent], err error)
}
type service struct {
client ServiceClient
}
var ErrUnavailable = errors.New("unavailable")
var ErrInternal = errors.New("internal failure")
func NewService(client ServiceClient) Service {
return service{
client: client,
}
}
func (svc service) OpenWriter(ctx context.Context, userId string) (w model.Writer[*pb.CloudEvent], err error) {
ctx = auth.SetOutgoingAuthInfo(ctx, userId)
var stream Service_SubmitMessagesClient
stream, err = svc.client.SubmitMessages(ctx)
err = decodeError(err)
if err == nil {
w = newStreamWriter(stream)
}
return
}
func decodeError(src error) (dst error) {
switch {
case src == nil:
default:
s, isGrpcErr := status.FromError(src)
switch {
case !isGrpcErr:
dst = src
case s.Code() == codes.OK:
case s.Code() == codes.ResourceExhausted:
dst = fmt.Errorf("%w: %s", limits.ErrReached, src)
case s.Code() == codes.Unauthenticated:
dst = fmt.Errorf("%w: %s", auth.ErrAuth, src)
case s.Code() == codes.Unavailable:
dst = fmt.Errorf("%w: %s", ErrUnavailable, src)
case status.Code(src) == codes.DeadlineExceeded:
dst = context.DeadlineExceeded
default:
dst = fmt.Errorf("%w: %s", ErrInternal, src)
}
}
return
}