/
progressive_collapse_forwarder.go
198 lines (178 loc) · 6.07 KB
/
progressive_collapse_forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
* Copyright 2018 Comcast Cable Communications Management, LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package engines
import (
"errors"
"io"
"net/http"
"sync"
"sync/atomic"
)
// ErrReadIndexTooLarge is an error indicating the read index is too large
var ErrReadIndexTooLarge = errors.New("Read index too large")
// NEED TO DEAL WITH TIMEOUT
// IndexReader implements a reader to read data at a specific index into slice b
type IndexReader func(index uint64, b []byte) (int, error)
// ProgressiveCollapseForwarder accepts data written through the io.Writer interface, caches it and
// makes all the data written available to n readers. The readers can request data at index i,
// to which the PCF may block or return the data immediately.
type ProgressiveCollapseForwarder interface {
AddClient(io.Writer) error
Write([]byte) (int, error)
Close()
IndexRead(uint64, []byte) (int, error)
WaitServerComplete()
WaitAllComplete()
GetBody() ([]byte, error)
GetResp() *http.Response
}
type progressiveCollapseForwarder struct {
resp *http.Response
rIndex uint64
dataIndex uint64
data [][]byte
dataLen uint64
dataStore []byte
dataStoreLen uint64
readCond *sync.Cond
serverReadDone int32
clientWaitgroup *sync.WaitGroup
serverWaitCond *sync.Cond
}
// NewPCF returns a new instance of a ProgressiveCollapseForwarder
func NewPCF(resp *http.Response, contentLength int64) ProgressiveCollapseForwarder {
// This contiguous block of memory is just an underlying byte store, references by the slices defined in refs
// Thread safety is provided through a read index, an atomic, which the writer must exceed and readers may not exceed
// This effectively limits the readers and writer to separate areas in memory.
dataStore := make([]byte, contentLength)
refs := make([][]byte, ((contentLength/HTTPBlockSize)*2)+1)
var wg sync.WaitGroup
sd := sync.NewCond(&sync.Mutex{})
rc := sync.NewCond(&sync.Mutex{})
pfc := &progressiveCollapseForwarder{
resp: resp,
rIndex: 0,
dataIndex: 0,
data: refs,
dataLen: uint64(len(refs)),
dataStore: dataStore,
dataStoreLen: uint64(contentLength),
readCond: rc,
serverReadDone: 0,
clientWaitgroup: &wg,
serverWaitCond: sd,
}
return pfc
}
// AddClient adds an io.Writer client to the ProgressiveCollapseForwarder
// This client will read all the cached data and read from the live edge if caught up.
func (pfc *progressiveCollapseForwarder) AddClient(w io.Writer) error {
pfc.clientWaitgroup.Add(1)
var readIndex uint64
var err error
remaining := 0
n := 0
buf := make([]byte, HTTPBlockSize)
for {
n, err = pfc.IndexRead(readIndex, buf)
if n > 0 {
// Handle the data returned by the read index > HTTPBlockSize
if n > HTTPBlockSize {
remaining = n
for {
if remaining > HTTPBlockSize {
w.Write(buf[0:HTTPBlockSize])
remaining -= HTTPBlockSize
} else {
w.Write(buf[0:remaining])
break
}
}
} else {
w.Write(buf[0:n])
}
readIndex++
}
if err != nil {
// return error at end of function
// Nominal case should be io.EOF
break
}
}
pfc.clientWaitgroup.Done()
return err
}
// WaitServerComplete blocks until the object has been retrieved from the origin server
// Need to get payload before can send to actual cache
func (pfc *progressiveCollapseForwarder) WaitServerComplete() {
if atomic.LoadInt32(&pfc.serverReadDone) != 0 {
return
}
pfc.serverWaitCond.L.Lock()
pfc.serverWaitCond.Wait()
pfc.serverWaitCond.L.Unlock()
}
// WaitAllComplete will wait till all clients have completed or timedout
// Need to no abandon goroutines
func (pfc *progressiveCollapseForwarder) WaitAllComplete() {
pfc.clientWaitgroup.Wait()
}
// GetBody returns the underlying body of the data written into a PCF
func (pfc *progressiveCollapseForwarder) GetBody() ([]byte, error) {
if atomic.LoadInt32(&pfc.serverReadDone) == 0 {
return nil, errors.New("Server request not completed")
}
return pfc.dataStore[0:pfc.dataIndex], nil
}
// GetResp returns the response from the original request
func (pfc *progressiveCollapseForwarder) GetResp() *http.Response {
return pfc.resp
}
// Write writes the data in b to the ProgressiveCollapseForwarders data store,
// adds a reference to that data, and increments the read index.
func (pfc *progressiveCollapseForwarder) Write(b []byte) (int, error) {
n := atomic.LoadUint64(&pfc.rIndex)
if pfc.dataIndex+uint64(len(b)) > pfc.dataStoreLen || n > pfc.dataLen {
return 0, io.ErrShortWrite
}
pfc.data[n] = pfc.dataStore[pfc.dataIndex : pfc.dataIndex+uint64(len(b))]
copy(pfc.data[n], b)
pfc.dataIndex += uint64(len(b))
atomic.AddUint64(&pfc.rIndex, 1)
pfc.readCond.Broadcast()
return len(b), nil
}
// Close signals all things waiting on the server response body to complete.
// This should be triggered by the client io.EOF
func (pfc *progressiveCollapseForwarder) Close() {
atomic.AddInt32(&pfc.serverReadDone, 1)
pfc.serverWaitCond.Broadcast()
pfc.readCond.Broadcast()
}
// Read will return the given index data requested by the read is behind the PCF readindex, else blocks and waits for the data
func (pfc *progressiveCollapseForwarder) IndexRead(index uint64, b []byte) (int, error) {
i := atomic.LoadUint64(&pfc.rIndex)
if index >= i {
// need to check completion and return io.EOF
if index > pfc.dataLen {
return 0, ErrReadIndexTooLarge
} else if atomic.LoadInt32(&pfc.serverReadDone) != 0 {
return 0, io.EOF
}
pfc.readCond.L.Lock()
pfc.readCond.Wait()
pfc.readCond.L.Unlock()
}
copy(b, pfc.data[index])
return len(pfc.data[index]), nil
}