/
billstat.go
102 lines (84 loc) · 2.64 KB
/
billstat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package backendpb
import (
"context"
"fmt"
"io"
"net/url"
"github.com/AdguardTeam/AdGuardDNS/internal/agd"
"github.com/AdguardTeam/AdGuardDNS/internal/billstat"
"github.com/AdguardTeam/AdGuardDNS/internal/errcoll"
"github.com/AdguardTeam/golibs/errors"
"google.golang.org/protobuf/types/known/timestamppb"
)
// BillStatConfig is the configuration structure for the business logic backend
// billing statistics uploader.
type BillStatConfig struct {
// ErrColl is the error collector that is used to collect critical and
// non-critical errors.
ErrColl errcoll.Interface
// Endpoint is the backend API URL. The scheme should be either "grpc" or
// "grpcs".
Endpoint *url.URL
}
// NewBillStat creates a new billing statistics uploader. c must not be nil.
func NewBillStat(c *BillStatConfig) (b *BillStat, err error) {
b = &BillStat{
errColl: c.ErrColl,
}
b.client, err = newClient(c.Endpoint)
if err != nil {
// Don't wrap the error, because it's informative enough as is.
return nil, err
}
return b, nil
}
// BillStat is the implementation of the [billstat.Uploader] interface that
// uploads the billing statistics to the business logic backend. It is safe for
// concurrent use.
//
// TODO(a.garipov): Consider uniting with [ProfileStorage] into a single
// backendpb.Client.
type BillStat struct {
errColl errcoll.Interface
// client is the current GRPC client.
client DNSServiceClient
}
// type check
var _ billstat.Uploader = (*BillStat)(nil)
// Upload implements the [billstat.Uploader] interface for *BillStat.
func (b *BillStat) Upload(ctx context.Context, records billstat.Records) (err error) {
if len(records) == 0 {
return nil
}
stream, err := b.client.SaveDevicesBillingStat(ctx)
if err != nil {
return fmt.Errorf("opening stream: %w", err)
}
for deviceID, record := range records {
if record == nil {
reportf(ctx, b.errColl, "device %q: null record", deviceID)
continue
}
sendErr := stream.Send(recordToProtobuf(record, deviceID))
if sendErr != nil {
return fmt.Errorf("uploading device %q record: %w", deviceID, sendErr)
}
}
_, err = stream.CloseAndRecv()
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("finishing stream: %w", err)
}
return nil
}
// recordToProtobuf converts a billstat record structure into the protobuf
// structure.
func recordToProtobuf(r *billstat.Record, devID agd.DeviceID) (s *DeviceBillingStat) {
return &DeviceBillingStat{
LastActivityTime: timestamppb.New(r.Time),
DeviceId: string(devID),
ClientCountry: string(r.Country),
Proto: uint32(r.Proto),
Asn: uint32(r.ASN),
Queries: uint32(r.Queries),
}
}