/
cs_proxy_unix.go
168 lines (152 loc) · 4 KB
/
cs_proxy_unix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
//go:build !windows
// +build !windows
/*
* Copyright (c) 2023. Nydus Developers. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package converter
import (
"archive/tar"
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
"github.com/containerd/containerd/content"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type contentStoreProxy struct {
socketPath string
server *http.Server
}
func setupContentStoreProxy(workDir string, ra content.ReaderAt) (*contentStoreProxy, error) {
sockP, err := os.CreateTemp(workDir, "nydus-cs-proxy-*.sock")
if err != nil {
return nil, errors.Wrap(err, "create unix socket file")
}
if err := os.Remove(sockP.Name()); err != nil {
return nil, err
}
listener, err := net.Listen("unix", sockP.Name())
if err != nil {
return nil, errors.Wrap(err, "listen unix socket when setup content store proxy")
}
server := &http.Server{
Handler: contentProxyHandler(ra),
}
go func() {
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Warn("serve content store proxy")
}
}()
return &contentStoreProxy{
socketPath: sockP.Name(),
server: server,
}, nil
}
func (p *contentStoreProxy) close() error {
defer os.Remove(p.socketPath)
if err := p.server.Shutdown(context.Background()); err != nil {
return errors.Wrap(err, "shutdown content store proxy")
}
return nil
}
func parseRangeHeader(rangeStr string, totalLen int64) (start, wantedLen int64, err error) {
rangeList := strings.Split(rangeStr, "-")
start, err = strconv.ParseInt(rangeList[0], 10, 64)
if err != nil {
err = errors.Wrap(err, "parse range header")
return
}
if len(rangeList) == 2 {
var end int64
end, err = strconv.ParseInt(rangeList[1], 10, 64)
if err != nil {
err = errors.Wrap(err, "parse range header")
return
}
wantedLen = end - start + 1
} else {
wantedLen = totalLen - start
}
if start < 0 || start >= totalLen || wantedLen <= 0 {
err = fmt.Errorf("invalid range header: %s", rangeStr)
return
}
return
}
func contentProxyHandler(ra content.ReaderAt) http.Handler {
var (
dataReader io.Reader
curPos int64
tarHeader *tar.Header
totalLen int64
)
resetReader := func() {
// TODO: Handle error?
_, _ = seekFile(ra, EntryBlob, func(reader io.Reader, hdr *tar.Header) error {
dataReader, tarHeader = reader, hdr
return nil
})
curPos = 0
}
resetReader()
if tarHeader != nil {
totalLen = tarHeader.Size
} else {
totalLen = ra.Size()
}
handler := func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodHead:
{
w.Header().Set("Content-Length", strconv.FormatInt(totalLen, 10))
w.Header().Set("Content-Type", "application/octet-stream")
return
}
case http.MethodGet:
{
start, wantedLen, err := parseRangeHeader(strings.TrimPrefix(r.Header.Get("Range"), "bytes="), totalLen)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
// TODO: Handle error?
_, _ = w.Write([]byte(err.Error()))
return
}
// we need to make sure that the dataReader is at the right position
if start < curPos {
resetReader()
}
if start > curPos {
_, err = io.CopyN(io.Discard, dataReader, start-curPos)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
// TODO: Handle error?
_, _ = w.Write([]byte(err.Error()))
return
}
curPos = start
}
// then, the curPos must be equal to start
readLen, err := io.CopyN(w, dataReader, wantedLen)
if err != nil && !errors.Is(err, io.EOF) {
w.WriteHeader(http.StatusInternalServerError)
// TODO: Handle error?
_, _ = w.Write([]byte(err.Error()))
return
}
curPos += readLen
w.Header().Set("Content-Length", strconv.FormatInt(readLen, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+readLen-1, totalLen))
w.Header().Set("Content-Type", "application/octet-stream")
return
}
}
}
return http.HandlerFunc(handler)
}