-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetcher.go
executable file
·126 lines (111 loc) · 3.41 KB
/
fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package ds
import (
"context"
"fmt"
"time"
dt "github.com/fernandokm/transparencia-de-dominios"
"github.com/fernandokm/transparencia-de-dominios/util"
ct "github.com/google/certificate-transparency-go"
"github.com/google/certificate-transparency-go/client"
"github.com/google/certificate-transparency-go/scanner"
"github.com/google/certificate-transparency-go/x509"
)
type FetchParams struct {
InitialTreeSize uint64
STHCheckInterval time.Duration
LogID [32]byte
LogIndex uint64
LogClient *client.LogClient
ReturnOnError bool
C chan<- dt.WorkerTransaction
}
// FetchLogForWorker fetches the specified log and passes all entries to the worker.
func FetchLogForWorker(ctx context.Context, params FetchParams) error {
opts := scanner.DefaultFetcherOptions()
opts.ParallelFetch = 1
opts.BatchSize = 64
opts.StartIndex = int64(params.InitialTreeSize)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
if err := runFetcherIteration(ctx, cancel, params, opts); err != nil {
if params.ReturnOnError || err == ctx.Err() {
return err
} else {
fmt.Printf("Error (log %d): %v\n", params.LogIndex, err)
time.Sleep(params.STHCheckInterval)
}
}
}
}
func runFetcherIteration(ctx context.Context, cancel context.CancelFunc, params FetchParams, opts *scanner.FetcherOptions) error {
opts.EndIndex = 0
f := scanner.NewFetcher(params.LogClient, opts)
if ctx.Err() != nil {
return ctx.Err()
}
sth, err := f.Prepare(ctx)
if err != nil {
return err
}
if opts.EndIndex <= opts.StartIndex {
time.Sleep(params.STHCheckInterval)
return nil
}
fmt.Printf("Fetcher (log %d): new STH (size=%d)\n", params.LogIndex, sth.TreeSize)
t := dt.WorkerTransaction{
LogIndex: params.LogIndex,
LogID: params.LogID,
LogRevision: dt.LogRevision{
TreeSize: sth.TreeSize,
RootHash: sth.SHA256RootHash,
},
NewCertificatesIndices: make(map[string][]uint64),
}
var processErr error
processFetcherBatch := func(batch scanner.EntryBatch) {
for i, leaf := range batch.Entries {
leafIndex := int64(i) + batch.Start
logEntry, err := ct.LogEntryFromLeaf(leafIndex, &leaf)
if err != nil && logEntry == nil {
processErr = err
cancel()
return
}
var cert *x509.Certificate
if logEntry.X509Cert != nil {
cert = logEntry.X509Cert
} else if logEntry.Precert != nil {
cert = logEntry.Precert.TBSCertificate
} else {
fmt.Printf("Warning (log %d): ignoring JSON Data (index=%d)\n", params.LogIndex, leafIndex)
continue
}
for _, d := range cert.DNSNames {
d, err := util.NormalizeDomainName(d)
if err != nil {
fmt.Printf("Warning (log %d): ignoring invalid domain name for certificate at index=%d: %q\n", params.LogIndex, leafIndex, d)
continue
}
t.NewCertificatesIndices[d] = append(t.NewCertificatesIndices[d], uint64(leafIndex))
}
d, err := util.NormalizeDomainName(cert.Subject.CommonName)
if err != nil {
if len(cert.DNSNames) == 0 {
fmt.Printf("Warning (log %d): ignoring certificate at index=%d: no valid domain names found\n", params.LogIndex, leafIndex)
}
continue
}
t.NewCertificatesIndices[d] = append(t.NewCertificatesIndices[d], uint64(leafIndex))
}
}
if err := f.Run(ctx, processFetcherBatch); err != nil {
return err
}
if processErr != nil {
return fmt.Errorf("error processing data: %w", processErr)
}
params.C <- t
opts.StartIndex = int64(sth.TreeSize)
return nil
}