-
Notifications
You must be signed in to change notification settings - Fork 0
/
origin.go
107 lines (92 loc) · 3.09 KB
/
origin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package origin
import (
"fmt"
"github.com/bocchi-the-cache/hitori/pkg/utils"
"time"
"github.com/valyala/fasthttp"
"github.com/bocchi-the-cache/hitori/pkg/cache"
"github.com/bocchi-the-cache/hitori/pkg/config"
"github.com/bocchi-the-cache/hitori/pkg/logger"
)
var DefaultOrigin *Origin
func Init(mapCfg *config.Mapping) {
DefaultOrigin = NewOrigin(mapCfg, cache.DefaultCache)
logger.Info("origin init successfully", mapCfg)
}
func NewOrigin(mapCfg *config.Mapping, ca cache.Cache) *Origin {
return &Origin{
c: new(fasthttp.Client),
cache: ca,
mp: buildOriginMapping(mapCfg),
}
}
type Origin struct {
c *fasthttp.Client
cache cache.Cache
mp *Mapping
}
// ServeProxyHTTP
// TODO: high performance tuning with `fasthttp`
func (o *Origin) ServeProxyHTTP(ctx *fasthttp.RequestCtx) {
u := ctx.URI()
d, ok := o.mp.DomainMap[string(u.Host())]
if !ok {
utils.SetCtxErrorWithLog(ctx, fmt.Errorf("proxy domain not found: %v", string(u.Host())), fasthttp.StatusInternalServerError)
return
}
ori, ok := o.mp.OriginMap[d.Origins]
if !ok {
utils.SetCtxErrorWithLog(ctx, fmt.Errorf("origin name not found: %v", string(d.Origins)), fasthttp.StatusInternalServerError)
return
}
node, err := SelectRandomNode(ori)
if err != nil {
utils.SetCtxErrorWithLog(ctx, err, fasthttp.StatusInternalServerError)
return
}
logger.Debugf("origin select, origin node: %v, uri: %v", ori.OriginName+node, u)
// TODO: using small slice to fetch origin, streaming to client and cache
oriRequest := fasthttp.AcquireRequest()
ctx.Request.CopyTo(oriRequest)
oriRequest.SetRequestURI(fmt.Sprintf("%s://%s%s", ori.Protocol, node, string(ctx.URI().Path())))
if ori.OriginHost != "" {
oriRequest.SetHost(ori.OriginHost)
} else {
oriRequest.SetHost(string(ctx.URI().Host()))
}
resp := fasthttp.AcquireResponse()
err = o.c.Do(oriRequest, resp)
if err != nil {
utils.SetCtxErrorWithLog(ctx, err, fasthttp.StatusServiceUnavailable)
return
}
logger.Debugf("do origin finished, header: %v, uri: %v", string(resp.Header.Header()), u)
resp.Header.Set("X-Cache-Timestamp", fmt.Sprintf("%v", time.Now()))
// TODO: too much mem copy
respToCache := fasthttp.AcquireResponse()
resp.CopyTo(respToCache)
// copy response header and body to cache
if isResponseOKToCache(resp) {
ProduceCache(ctx, o.cache, respToCache.Header.Header(), respToCache.Body())
}
// copy response header and body to client
resp.Header.CopyTo(&ctx.Response.Header)
ctx.Response.SetBody(resp.Body())
}
// ProduceCache TODO: process header/body in pkg `cache`
func ProduceCache(ctx *fasthttp.RequestCtx, ca cache.Cache, header []byte, body []byte) {
cacheKey := ctx.Request.URI().String() + "_body"
err := ca.Set(cacheKey, body)
if err != nil {
logger.Error("cache body set error, uri", err)
return
}
logger.Debugf("cache body set, uri: %v", ctx.Request.URI().String())
// set header cache after body cache finished
cacheKey = ctx.Request.URI().String() + "_header"
err = ca.Set(cacheKey, header)
if err != nil {
logger.Error("cache header set error, uri", err)
}
logger.Debugf("cache header set, uri: %v", ctx.Request.URI().String())
}