-
-
Notifications
You must be signed in to change notification settings - Fork 155
/
data.go
89 lines (79 loc) · 2.05 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package data
import (
"context"
orderv1 "github.com/go-kratos/beer-shop/api/order/service/v1"
"github.com/go-kratos/beer-shop/app/courier/job/internal/conf"
"github.com/Shopify/sarama"
consul "github.com/go-kratos/kratos/contrib/registry/consul/v2"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/google/wire"
consulAPI "github.com/hashicorp/consul/api"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
// init mysql driver
_ "github.com/go-sql-driver/mysql"
)
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(
NewData,
NewKafkaConsumer,
NewCourierRepo,
NewDiscovery,
NewOrderServiceClient,
)
// Data .
type Data struct {
kc sarama.Consumer
oc orderv1.OrderClient
log *log.Helper
}
// NewData .
func NewData(consumer sarama.Consumer, logger log.Logger, oc orderv1.OrderClient,
) (*Data, func(), error) {
log := log.NewHelper(log.With(logger, "module", "courier-job/data"))
d := &Data{
kc: consumer,
oc: oc,
log: log,
}
return d, func() {
d.kc.Close()
}, nil
}
func NewKafkaConsumer(conf *conf.Data) sarama.Consumer {
c := sarama.NewConfig()
p, err := sarama.NewConsumer(conf.Kafka.Addrs, c)
if err != nil {
panic(err)
}
return p
}
func NewDiscovery(conf *conf.Registry) registry.Discovery {
c := consulAPI.DefaultConfig()
c.Address = conf.Consul.Address
c.Scheme = conf.Consul.Scheme
cli, err := consulAPI.NewClient(c)
if err != nil {
panic(err)
}
r := consul.New(cli, consul.WithHealthCheck(false))
return r
}
func NewOrderServiceClient(r registry.Discovery, tp *tracesdk.TracerProvider) orderv1.OrderClient {
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///beer.order.service"),
grpc.WithDiscovery(r),
grpc.WithMiddleware(
tracing.Client(tracing.WithTracerProvider(tp)),
recovery.Recovery(),
),
)
if err != nil {
panic(err)
}
return orderv1.NewOrderClient(conn)
}