forked from filecoin-project/go-legs
/
head.go
148 lines (130 loc) · 3.71 KB
/
head.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package head
import (
"context"
"fmt"
"io"
"net"
"net/http"
"path"
"sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
gostream "github.com/libp2p/go-libp2p-gostream"
"github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const closeTimeout = 30 * time.Second
var log = logging.Logger("go-legs/head")
type Publisher struct {
rl sync.RWMutex
root cid.Cid
server *http.Server
}
func NewPublisher() *Publisher {
p := &Publisher{
server: &http.Server{},
}
p.server.Handler = http.Handler(p)
return p
}
func deriveProtocolID(topic string) protocol.ID {
return protocol.ID(path.Join("/legs/head", topic, "0.0.1"))
}
func (p *Publisher) Serve(host host.Host, topic string) error {
pid := deriveProtocolID(topic)
l, err := gostream.Listen(host, pid)
if err != nil {
log.Errorw("Failed to listen to gostream with protocol", "host", host.ID(), "protocolID", pid)
return err
}
log.Infow("Serving gostream", "host", host.ID(), "protocolID", pid)
return p.server.Serve(l)
}
func QueryRootCid(ctx context.Context, host host.Host, topic string, peerID peer.ID) (cid.Cid, error) {
client := http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
addrInfo := peer.AddrInfo{
ID: peerID,
}
err := host.Connect(ctx, addrInfo)
if err != nil {
return nil, err
}
conn, err := gostream.Dial(ctx, host, peerID, deriveProtocolID(topic))
if err != nil {
// If protocol ID is wrong, then try the old "double-slashed" protocol ID.
// TODO: remove this code when all providers have upgraded.
// if !errors.Is(err, multistream.ErrNotSupported) {
// return nil, err
// }
oldProtoID := protocol.ID("/legs/head/" + topic + "/0.0.1")
conn, err = gostream.Dial(ctx, host, peerID, oldProtoID)
if err != nil {
return nil, err
}
log.Infow("Peer head CID server uses old protocol ID", "peer", peerID, "proto", oldProtoID)
}
return conn, err
},
},
}
// The httpclient expects there to be a host here. `.invalid` is a reserved
// TLD for this purpose. See
// https://datatracker.ietf.org/doc/html/rfc2606#section-2
resp, err := client.Get("http://unused.invalid/head")
if err != nil {
return cid.Undef, err
}
defer resp.Body.Close()
cidStr, err := io.ReadAll(resp.Body)
if err != nil {
return cid.Undef, fmt.Errorf("cannot fully read response body: %w", err)
}
if len(cidStr) == 0 {
log.Debug("No head is set; returning cid.Undef")
return cid.Undef, nil
}
cs := string(cidStr)
decode, err := cid.Decode(cs)
if err != nil {
return cid.Undef, fmt.Errorf("failed to decode CID %s: %w", cs, err)
}
log.Debugw("Sucessfully queried latest head", "head", decode)
return decode, nil
}
func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
base := path.Base(r.URL.Path)
if base != "head" {
log.Debug("Only head is supported; rejecting request with different base path")
http.Error(w, "", http.StatusNotFound)
return
}
p.rl.RLock()
defer p.rl.RUnlock()
var out []byte
if p.root != cid.Undef {
currentHead := p.root.String()
log.Debug("Found current head: %s", currentHead)
out = []byte(currentHead)
} else {
log.Debug("No head is set; responding with empty")
}
_, err := w.Write(out)
if err != nil {
log.Errorw("Failed to write response", "err", err)
}
}
func (p *Publisher) UpdateRoot(_ context.Context, c cid.Cid) error {
p.rl.Lock()
defer p.rl.Unlock()
p.root = c
return nil
}
func (p *Publisher) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
defer cancel()
return p.server.Shutdown(ctx)
}