forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.go
662 lines (579 loc) · 21.5 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/pem"
"sync"
"sync/atomic"
"time"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/tools/protolator"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// ConnByCertMap maps certificates represented as strings
// to gRPC connections
type ConnByCertMap map[string]*grpc.ClientConn
// Lookup looks up a certificate and returns the connection that was mapped
// to the certificate, and whether it was found or not
func (cbc ConnByCertMap) Lookup(cert []byte) (*grpc.ClientConn, bool) {
conn, ok := cbc[string(cert)]
return conn, ok
}
// Put associates the given connection to the certificate
func (cbc ConnByCertMap) Put(cert []byte, conn *grpc.ClientConn) {
cbc[string(cert)] = conn
}
// Remove removes the connection that is associated to the given certificate
func (cbc ConnByCertMap) Remove(cert []byte) {
delete(cbc, string(cert))
}
func (cbc ConnByCertMap) Size() int {
return len(cbc)
}
// MemberMapping defines NetworkMembers by their ID
type MemberMapping map[uint64]*Stub
// Put inserts the given stub to the MemberMapping
func (mp MemberMapping) Put(stub *Stub) {
mp[stub.ID] = stub
}
// ByID retrieves the Stub with the given ID from the MemberMapping
func (mp MemberMapping) ByID(ID uint64) *Stub {
return mp[ID]
}
// LookupByClientCert retrieves a Stub with the given client certificate
func (mp MemberMapping) LookupByClientCert(cert []byte) *Stub {
for _, stub := range mp {
if bytes.Equal(stub.ClientTLSCert, cert) {
return stub
}
}
return nil
}
// ServerCertificates returns a set of the server certificates
// represented as strings
func (mp MemberMapping) ServerCertificates() StringSet {
res := make(StringSet)
for _, member := range mp {
res[string(member.ServerTLSCert)] = struct{}{}
}
return res
}
// StringSet is a set of strings
type StringSet map[string]struct{}
// union adds the elements of the given set to the StringSet
func (ss StringSet) union(set StringSet) {
for k := range set {
ss[k] = struct{}{}
}
}
// subtract removes all elements in the given set from the StringSet
func (ss StringSet) subtract(set StringSet) {
for k := range set {
delete(ss, k)
}
}
// PredicateDialer creates gRPC connections
// that are only established if the given predicate
// is fulfilled
type PredicateDialer struct {
lock sync.RWMutex
comm.ClientConfig
}
func (dialer *PredicateDialer) UpdateRootCAs(serverRootCAs [][]byte) {
dialer.lock.Lock()
defer dialer.lock.Unlock()
dialer.ClientConfig.SecOpts.ServerRootCAs = serverRootCAs
}
// Dial creates a new gRPC connection that can only be established, if the remote node's
// certificate chain satisfy verifyFunc
func (dialer *PredicateDialer) Dial(address string, verifyFunc RemoteVerifier) (*grpc.ClientConn, error) {
dialer.lock.RLock()
cfg := dialer.ClientConfig.Clone()
dialer.lock.RUnlock()
cfg.SecOpts.VerifyCertificate = verifyFunc
client, err := comm.NewGRPCClient(cfg)
if err != nil {
return nil, errors.WithStack(err)
}
return client.NewConnection(address, "", func(tlsConfig *tls.Config) {
// We need to dynamically overwrite the TLS root CAs,
// as they may be updated.
dialer.lock.RLock()
serverRootCAs := dialer.ClientConfig.Clone().SecOpts.ServerRootCAs
dialer.lock.RUnlock()
tlsConfig.RootCAs = x509.NewCertPool()
for _, pem := range serverRootCAs {
tlsConfig.RootCAs.AppendCertsFromPEM(pem)
}
})
}
// DERtoPEM returns a PEM representation of the DER
// encoded certificate
func DERtoPEM(der []byte) string {
return string(pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: der,
}))
}
// StandardDialer wraps an AtomicClientConfig,
// and provides a means to connect according to given EndpointCriteria.
type StandardDialer struct {
comm.ClientConfig
}
// Dial dials an address according to the given EndpointCriteria
func (dialer *StandardDialer) Dial(endpointCriteria EndpointCriteria) (*grpc.ClientConn, error) {
cfg := dialer.ClientConfig.Clone()
cfg.SecOpts.ServerRootCAs = endpointCriteria.TLSRootCAs
client, err := comm.NewGRPCClient(cfg)
if err != nil {
return nil, errors.Wrap(err, "failed creating gRPC client")
}
return client.NewConnection(endpointCriteria.Endpoint, "")
}
//go:generate mockery -dir . -name BlockVerifier -case underscore -output ./mocks/
// BlockVerifier verifies block signatures.
type BlockVerifier interface {
// VerifyBlockSignature verifies a signature of a block.
// It has an optional argument of a configuration envelope
// which would make the block verification to use validation rules
// based on the given configuration in the ConfigEnvelope.
// If the config envelope passed is nil, then the validation rules used
// are the ones that were applied at commit of previous blocks.
VerifyBlockSignature(sd []*common.SignedData, config *common.ConfigEnvelope) error
}
// BlockSequenceVerifier verifies that the given consecutive sequence
// of blocks is valid.
type BlockSequenceVerifier func(blocks []*common.Block, channel string) error
// Dialer creates a gRPC connection to a remote address
type Dialer interface {
Dial(endpointCriteria EndpointCriteria) (*grpc.ClientConn, error)
}
// VerifyBlocks verifies the given consecutive sequence of blocks is valid,
// and returns nil if it's valid, else an error.
func VerifyBlocks(blockBuff []*common.Block, signatureVerifier BlockVerifier) error {
if len(blockBuff) == 0 {
return errors.New("buffer is empty")
}
// First, we verify that the block hash in every block is:
// Equal to the hash in the header
// Equal to the previous hash in the succeeding block
for i := range blockBuff {
if err := VerifyBlockHash(i, blockBuff); err != nil {
return err
}
}
var config *common.ConfigEnvelope
var isLastBlockConfigBlock bool
// Verify all configuration blocks that are found inside the block batch,
// with the configuration that was committed (nil) or with one that is picked up
// during iteration over the block batch.
for _, block := range blockBuff {
configFromBlock, err := ConfigFromBlock(block)
if err == errNotAConfig {
isLastBlockConfigBlock = false
continue
}
if err != nil {
return err
}
// The block is a configuration block, so verify it
if err := VerifyBlockSignature(block, signatureVerifier, config); err != nil {
return err
}
config = configFromBlock
isLastBlockConfigBlock = true
}
// Verify the last block's signature
lastBlock := blockBuff[len(blockBuff)-1]
// If last block is a config block, we verified it using the policy of the previous block, so it's valid.
if isLastBlockConfigBlock {
return nil
}
return VerifyBlockSignature(lastBlock, signatureVerifier, config)
}
var errNotAConfig = errors.New("not a config block")
// ConfigFromBlock returns a ConfigEnvelope if exists, or a *NotAConfigBlock error.
// It may also return some other error in case parsing failed.
func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) {
if block == nil || block.Data == nil || len(block.Data.Data) == 0 {
return nil, errors.New("empty block")
}
txn := block.Data.Data[0]
env, err := utils.GetEnvelopeFromBlock(txn)
if err != nil {
return nil, errors.WithStack(err)
}
payload, err := utils.GetPayload(env)
if err != nil {
return nil, errors.WithStack(err)
}
if block.Header.Number == 0 {
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}
return configEnvelope, nil
}
if payload.Header == nil {
return nil, errors.New("nil header in payload")
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, errors.WithStack(err)
}
if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG {
return nil, errNotAConfig
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}
return configEnvelope, nil
}
// VerifyBlockHash verifies the hash chain of the block with the given index
// among the blocks of the given block buffer.
func VerifyBlockHash(indexInBuffer int, blockBuff []*common.Block) error {
if len(blockBuff) <= indexInBuffer {
return errors.Errorf("index %d out of bounds (total %d blocks)", indexInBuffer, len(blockBuff))
}
block := blockBuff[indexInBuffer]
if block.Header == nil {
return errors.New("missing block header")
}
seq := block.Header.Number
dataHash := block.Data.Hash()
// Verify data hash matches the hash in the header
if !bytes.Equal(dataHash, block.Header.DataHash) {
computedHash := hex.EncodeToString(dataHash)
claimedHash := hex.EncodeToString(block.Header.DataHash)
return errors.Errorf("computed hash of block (%d) (%s) doesn't match claimed hash (%s)",
seq, computedHash, claimedHash)
}
// We have a previous block in the buffer, ensure current block's previous hash matches the previous one.
if indexInBuffer > 0 {
prevBlock := blockBuff[indexInBuffer-1]
currSeq := block.Header.Number
if prevBlock.Header == nil {
return errors.New("previous block header is nil")
}
prevSeq := prevBlock.Header.Number
if prevSeq+1 != currSeq {
return errors.Errorf("sequences %d and %d were received consecutively", prevSeq, currSeq)
}
if !bytes.Equal(block.Header.PreviousHash, prevBlock.Header.Hash()) {
claimedPrevHash := hex.EncodeToString(block.Header.PreviousHash)
actualPrevHash := hex.EncodeToString(prevBlock.Header.Hash())
return errors.Errorf("block [%d]'s hash (%s) mismatches %d's prev block hash (%s)",
prevSeq, actualPrevHash, currSeq, claimedPrevHash)
}
}
return nil
}
// SignatureSetFromBlock creates a signature set out of a block.
func SignatureSetFromBlock(block *common.Block) ([]*common.SignedData, error) {
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_SIGNATURES) {
return nil, errors.New("no metadata in block")
}
metadata, err := utils.GetMetadataFromBlock(block, common.BlockMetadataIndex_SIGNATURES)
if err != nil {
return nil, errors.Errorf("failed unmarshaling medatata for signatures: %v", err)
}
var signatureSet []*common.SignedData
for _, metadataSignature := range metadata.Signatures {
sigHdr, err := utils.GetSignatureHeader(metadataSignature.SignatureHeader)
if err != nil {
return nil, errors.Errorf("failed unmarshaling signature header for block with id %d: %v",
block.Header.Number, err)
}
signatureSet = append(signatureSet,
&common.SignedData{
Identity: sigHdr.Creator,
Data: util.ConcatenateBytes(metadata.Value,
metadataSignature.SignatureHeader, block.Header.Bytes()),
Signature: metadataSignature.Signature,
},
)
}
return signatureSet, nil
}
// VerifyBlockSignature verifies the signature on the block with the given BlockVerifier and the given config.
func VerifyBlockSignature(block *common.Block, verifier BlockVerifier, config *common.ConfigEnvelope) error {
signatureSet, err := SignatureSetFromBlock(block)
if err != nil {
return err
}
return verifier.VerifyBlockSignature(signatureSet, config)
}
// EndpointCriteria defines criteria of how to connect to a remote orderer node.
type EndpointCriteria struct {
Endpoint string // Endpoint of the form host:port
TLSRootCAs [][]byte // PEM encoded TLS root CA certificates
}
// EndpointconfigFromConfigBlock retrieves TLS CA certificates and endpoints
// from a config block.
func EndpointconfigFromConfigBlock(block *common.Block) ([]EndpointCriteria, error) {
if block == nil {
return nil, errors.New("nil block")
}
envelopeConfig, err := utils.ExtractEnvelope(block, 0)
if err != nil {
return nil, err
}
bundle, err := channelconfig.NewBundleFromEnvelope(envelopeConfig)
if err != nil {
return nil, errors.Wrap(err, "failed extracting bundle from envelope")
}
msps, err := bundle.MSPManager().GetMSPs()
if err != nil {
return nil, errors.Wrap(err, "failed obtaining MSPs from MSPManager")
}
ordererConfig, ok := bundle.OrdererConfig()
if !ok {
return nil, errors.New("failed obtaining orderer config from bundle")
}
mspIDsToCACerts := make(map[string][][]byte)
var aggregatedTLSCerts [][]byte
for _, org := range ordererConfig.Organizations() {
// Validate that every orderer org has a corresponding MSP instance in the MSP Manager.
msp, exists := msps[org.MSPID()]
if !exists {
return nil, errors.Errorf("no MSP found for MSP with ID of %s", org.MSPID())
}
// Build a per org mapping of the TLS CA certs for this org,
// and aggregate all TLS CA certs into aggregatedTLSCerts to be used later on.
var caCerts [][]byte
caCerts = append(caCerts, msp.GetTLSIntermediateCerts()...)
caCerts = append(caCerts, msp.GetTLSRootCerts()...)
mspIDsToCACerts[org.MSPID()] = caCerts
aggregatedTLSCerts = append(aggregatedTLSCerts, caCerts...)
}
endpointsPerOrg := perOrgEndpoints(ordererConfig, mspIDsToCACerts)
if len(endpointsPerOrg) > 0 {
return endpointsPerOrg, nil
}
return globalEndpointsFromConfig(aggregatedTLSCerts, bundle), nil
}
func perOrgEndpoints(ordererConfig channelconfig.Orderer, mspIDsToCerts map[string][][]byte) []EndpointCriteria {
var endpointsPerOrg []EndpointCriteria
for _, org := range ordererConfig.Organizations() {
for _, endpoint := range org.Endpoints() {
endpointsPerOrg = append(endpointsPerOrg, EndpointCriteria{
TLSRootCAs: mspIDsToCerts[org.MSPID()],
Endpoint: endpoint,
})
}
}
return endpointsPerOrg
}
func globalEndpointsFromConfig(aggregatedTLSCerts [][]byte, bundle *channelconfig.Bundle) []EndpointCriteria {
var globalEndpoints []EndpointCriteria
for _, endpoint := range bundle.ChannelConfig().OrdererAddresses() {
globalEndpoints = append(globalEndpoints, EndpointCriteria{
Endpoint: endpoint,
TLSRootCAs: aggregatedTLSCerts,
})
}
return globalEndpoints
}
//go:generate mockery -dir . -name VerifierFactory -case underscore -output ./mocks/
// VerifierFactory creates BlockVerifiers.
type VerifierFactory interface {
// VerifierFromConfig creates a BlockVerifier from the given configuration.
VerifierFromConfig(configuration *common.ConfigEnvelope, channel string) (BlockVerifier, error)
}
// VerificationRegistry registers verifiers and retrieves them.
type VerificationRegistry struct {
LoadVerifier func(chain string) BlockVerifier
Logger *flogging.FabricLogger
VerifierFactory VerifierFactory
VerifiersByChannel map[string]BlockVerifier
}
// RegisterVerifier adds a verifier into the registry if applicable.
func (vr *VerificationRegistry) RegisterVerifier(chain string) {
if _, exists := vr.VerifiersByChannel[chain]; exists {
vr.Logger.Debugf("No need to register verifier for chain %s", chain)
return
}
v := vr.LoadVerifier(chain)
if v == nil {
vr.Logger.Errorf("Failed loading verifier for chain %s", chain)
return
}
vr.VerifiersByChannel[chain] = v
vr.Logger.Infof("Registered verifier for chain %s", chain)
}
// RetrieveVerifier returns a BlockVerifier for the given channel, or nil if not found.
func (vr *VerificationRegistry) RetrieveVerifier(channel string) BlockVerifier {
verifier, exists := vr.VerifiersByChannel[channel]
if exists {
return verifier
}
vr.Logger.Errorf("No verifier for channel %s exists", channel)
return nil
}
// BlockCommitted notifies the VerificationRegistry upon a block commit, which may
// trigger a registration of a verifier out of the block in case the block is a config block.
func (vr *VerificationRegistry) BlockCommitted(block *common.Block, channel string) {
conf, err := ConfigFromBlock(block)
// The block doesn't contain a config block, but is a valid block
if err == errNotAConfig {
vr.Logger.Debugf("Committed block [%d] for channel %s that is not a config block",
block.Header.Number, channel)
return
}
// The block isn't a valid block
if err != nil {
vr.Logger.Errorf("Failed parsing block of channel %s: %v, content: %s",
channel, err, BlockToString(block))
return
}
// The block contains a config block
verifier, err := vr.VerifierFactory.VerifierFromConfig(conf, channel)
if err != nil {
vr.Logger.Errorf("Failed creating a verifier from a config block for channel %s: %v, content: %s",
channel, err, BlockToString(block))
return
}
vr.VerifiersByChannel[channel] = verifier
vr.Logger.Debugf("Committed config block [%d] for channel %s", block.Header.Number, channel)
}
// BlockToString returns a string representation of this block.
func BlockToString(block *common.Block) string {
buff := &bytes.Buffer{}
protolator.DeepMarshalJSON(buff, block)
return buff.String()
}
// BlockCommitFunc signals a block commit.
type BlockCommitFunc func(block *common.Block, channel string)
// LedgerInterceptor intercepts block commits.
type LedgerInterceptor struct {
Channel string
InterceptBlockCommit BlockCommitFunc
LedgerWriter
}
// Append commits a block into the ledger, and also fires the configured callback.
func (interceptor *LedgerInterceptor) Append(block *common.Block) error {
defer interceptor.InterceptBlockCommit(block, interceptor.Channel)
return interceptor.LedgerWriter.Append(block)
}
// BlockVerifierAssembler creates a BlockVerifier out of a config envelope
type BlockVerifierAssembler struct {
Logger *flogging.FabricLogger
}
// VerifierFromConfig creates a BlockVerifier from the given configuration.
func (bva *BlockVerifierAssembler) VerifierFromConfig(configuration *common.ConfigEnvelope, channel string) (BlockVerifier, error) {
bundle, err := channelconfig.NewBundle(channel, configuration.Config)
if err != nil {
return nil, errors.Wrap(err, "failed extracting bundle from envelope")
}
policyMgr := bundle.PolicyManager()
return &BlockValidationPolicyVerifier{
Logger: bva.Logger,
PolicyMgr: policyMgr,
Channel: channel,
}, nil
}
// BlockValidationPolicyVerifier verifies signatures based on the block validation policy.
type BlockValidationPolicyVerifier struct {
Logger *flogging.FabricLogger
Channel string
PolicyMgr policies.Manager
}
// VerifyBlockSignature verifies the signed data associated to a block, optionally with the given config envelope.
func (bv *BlockValidationPolicyVerifier) VerifyBlockSignature(sd []*common.SignedData, envelope *common.ConfigEnvelope) error {
policyMgr := bv.PolicyMgr
// If the envelope passed isn't nil, we should use a different policy manager.
if envelope != nil {
bundle, err := channelconfig.NewBundle(bv.Channel, envelope.Config)
if err != nil {
buff := &bytes.Buffer{}
protolator.DeepMarshalJSON(buff, envelope.Config)
bv.Logger.Errorf("Failed creating a new bundle for channel %s, Config content is: %s", bv.Channel, buff.String())
return err
}
bv.Logger.Infof("Initializing new PolicyManager for channel %s", bv.Channel)
policyMgr = bundle.PolicyManager()
}
policy, exists := policyMgr.GetPolicy(policies.BlockValidation)
if !exists {
return errors.Errorf("policy %s wasn't found", policies.BlockValidation)
}
return policy.Evaluate(sd)
}
//go:generate mockery -dir . -name BlockRetriever -case underscore -output ./mocks/
// BlockRetriever retrieves blocks
type BlockRetriever interface {
// Block returns a block with the given number,
// or nil if such a block doesn't exist.
Block(number uint64) *common.Block
}
// LastConfigBlock returns the last config block relative to the given block.
func LastConfigBlock(block *common.Block, blockRetriever BlockRetriever) (*common.Block, error) {
if block == nil {
return nil, errors.New("nil block")
}
if blockRetriever == nil {
return nil, errors.New("nil blockRetriever")
}
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_LAST_CONFIG) {
return nil, errors.New("no metadata in block")
}
lastConfigBlockNum, err := utils.GetLastConfigIndexFromBlock(block)
if err != nil {
return nil, err
}
lastConfigBlock := blockRetriever.Block(lastConfigBlockNum)
if lastConfigBlock == nil {
return nil, errors.Errorf("unable to retrieve last config block [%d]", lastConfigBlockNum)
}
return lastConfigBlock, nil
}
// StreamCountReporter reports the number of streams currently connected to this node
type StreamCountReporter struct {
Metrics *Metrics
count uint32
}
func (scr *StreamCountReporter) Increment() {
count := atomic.AddUint32(&scr.count, 1)
scr.Metrics.reportStreamCount(count)
}
func (scr *StreamCountReporter) Decrement() {
count := atomic.AddUint32(&scr.count, ^uint32(0))
scr.Metrics.reportStreamCount(count)
}
type certificateExpirationCheck struct {
minimumExpirationWarningInterval time.Duration
expiresAt time.Time
expirationWarningThreshold time.Duration
lastWarning time.Time
nodeName string
endpoint string
alert func(string, ...interface{})
}
func (exp *certificateExpirationCheck) checkExpiration(currentTime time.Time, channel string) {
timeLeft := exp.expiresAt.Sub(currentTime)
if timeLeft > exp.expirationWarningThreshold {
return
}
timeSinceLastWarning := currentTime.Sub(exp.lastWarning)
if timeSinceLastWarning < exp.minimumExpirationWarningInterval {
return
}
exp.alert("Certificate of %s from %s for channel %s expires in less than %v",
exp.nodeName, exp.endpoint, channel, timeLeft)
exp.lastWarning = currentTime
}