forked from google/certificate-transparency-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fix_and_log.go
132 lines (119 loc) · 4.39 KB
/
fix_and_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fixchain
import (
"context"
"log"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/google/certificate-transparency-go/client"
"github.com/google/certificate-transparency-go/x509"
)
// FixAndLog contains a Fixer and a Logger, for all your fix-then-log-chain needs!
type FixAndLog struct {
fixer *Fixer
chains chan []*x509.Certificate
logger *Logger
wg sync.WaitGroup
// Number of whole chains queued - before checking cache & adding chains for intermediate certs.
queued uint32
// Cache of chains that QueueAllCertsInChain() has already been called on.
done *lockedMap
// Number of whole chains submitted to the FixAndLog using QueueAllCertsInChain() (before adding chains for intermediate certs) that had previously been processed.
alreadyDone uint32
// Number of chains queued total, including chains from intermediate certs.
// Note that in each chain there is len(chain) certs. So when calling QueueAllCertsInChain(chain), len(chain) chains will actually be added to the queue.
chainsQueued uint32
// Number of chains whose leaf cert has already been posted to the log with a valid chain.
alreadyPosted uint32
// Number of chains sent on to the Fixer to begin fixing & logging!
chainsSent uint32
}
// QueueAllCertsInChain adds every cert in the chain and the chain to the queue
// to be fixed and logged.
func (fl *FixAndLog) QueueAllCertsInChain(chain []*x509.Certificate) {
if chain != nil {
atomic.AddUint32(&fl.queued, 1)
atomic.AddUint32(&fl.chainsQueued, uint32(len(chain)))
dchain := newDedupedChain(chain)
// Caching check
h := hashBag(dchain.certs)
if fl.done.get(h) {
atomic.AddUint32(&fl.alreadyDone, 1)
return
}
fl.done.set(h, true)
for _, cert := range dchain.certs {
if fl.logger.IsPosted(cert) {
atomic.AddUint32(&fl.alreadyPosted, 1)
continue
}
fl.fixer.QueueChain(cert, dchain.certs, fl.logger.RootCerts())
atomic.AddUint32(&fl.chainsSent, 1)
}
}
}
// QueueChain queues the given chain to be fixed wrt the roots of the logger
// contained in fl, and then logged to the Certificate Transparency log
// represented by the logger. Note: chain is expected to be in the order of
// cert --> root.
func (fl *FixAndLog) QueueChain(chain []*x509.Certificate) {
if chain != nil {
if fl.logger.IsPosted(chain[0]) {
atomic.AddUint32(&fl.alreadyPosted, 1)
return
}
fl.fixer.QueueChain(chain[0], chain, fl.logger.RootCerts())
atomic.AddUint32(&fl.chainsSent, 1)
}
}
// Wait waits for the all of the queued chains to complete being fixed and
// logged.
func (fl *FixAndLog) Wait() {
fl.fixer.Wait()
close(fl.chains)
fl.wg.Wait()
fl.logger.Wait()
}
// NewFixAndLog creates an object that will asynchronously fix any chains that
// are added to its queue, and then log them to the Certificate Transparency log
// found at the given url. Any errors encountered along the way are pushed to
// the given errors channel.
func NewFixAndLog(ctx context.Context, fixerWorkerCount int, loggerWorkerCount int, errors chan<- *FixError, client *http.Client, logClient client.AddLogClient, limiter Limiter, logStats bool) *FixAndLog {
chains := make(chan []*x509.Certificate)
fl := &FixAndLog{
fixer: NewFixer(fixerWorkerCount, chains, errors, client, logStats),
chains: chains,
logger: NewLogger(ctx, loggerWorkerCount, errors, logClient, limiter, logStats),
done: newLockedMap(),
}
fl.wg.Add(1)
go func() {
for chain := range chains {
fl.logger.QueueChain(chain)
}
fl.wg.Done()
}()
if logStats {
t := time.NewTicker(time.Second)
go func() {
for range t.C {
log.Printf("fix-then-log: %d whole chains queued, %d whole chains already done, %d total chains queued, %d chains don't need posting (cache hits), %d chains sent to fixer", fl.queued, fl.alreadyDone, fl.chainsQueued, fl.alreadyPosted, fl.chainsSent)
}
}()
}
return fl
}