@@ -44,9 +44,9 @@ type StreamWriter struct {
4444 db * DB
4545 done func ()
4646 throttle * y.Throttle
47- head valuePointer
4847 maxVersion uint64
4948 writers map [uint32 ]* sortedWriter
49+ closer * y.Closer
5050}
5151
5252// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
@@ -60,6 +60,7 @@ func (db *DB) NewStreamWriter() *StreamWriter {
6060 // concurrent streams being processed.
6161 throttle : y .NewThrottle (16 ),
6262 writers : make (map [uint32 ]* sortedWriter ),
63+ closer : y .NewCloser (0 ),
6364 }
6465}
6566
@@ -74,9 +75,12 @@ func (sw *StreamWriter) Prepare() error {
7475}
7576
7677// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
77- // would use to demux the writes.
78+ // would use to demux the writes. Write is not thread safe and it should NOT be called concurrently.
7879func (sw * StreamWriter ) Write (kvs * pb.KVList ) error {
79- var entries []* Entry
80+ if len (kvs .GetKv ()) == 0 {
81+ return nil
82+ }
83+ streamReqs := make (map [uint32 ]* request )
8084 for _ , kv := range kvs .Kv {
8185 var meta , userMeta byte
8286 if len (kv .Meta ) > 0 {
@@ -98,50 +102,28 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
98102 // If the value can be colocated with the key in LSM tree, we can skip
99103 // writing the value to value log.
100104 e .skipVlog = sw .db .shouldWriteValueToLSM (* e )
101- entries = append (entries , e )
105+ req := streamReqs [kv .StreamId ]
106+ if req == nil {
107+ req = & request {}
108+ streamReqs [kv .StreamId ] = req
109+ }
110+ req .Entries = append (req .Entries , e )
102111 }
103- req := & request {
104- Entries : entries ,
112+ var all []* request
113+ for _ , req := range streamReqs {
114+ all = append (all , req )
105115 }
106- y .AssertTrue (len (kvs .Kv ) == len (req .Entries ))
107- if err := sw .db .vlog .write ([]* request {req }); err != nil {
116+ if err := sw .db .vlog .write (all ); err != nil {
108117 return err
109118 }
110119
111- for i , kv := range kvs .Kv {
112- e := req .Entries [i ]
113- vptr := req .Ptrs [i ]
114- if ! vptr .IsZero () {
115- y .AssertTrue (sw .head .Less (vptr ))
116- sw .head = vptr
117- }
118-
119- writer , ok := sw .writers [kv .StreamId ]
120+ for streamId , req := range streamReqs {
121+ writer , ok := sw .writers [streamId ]
120122 if ! ok {
121- writer = sw .newWriter (kv .StreamId )
122- sw .writers [kv .StreamId ] = writer
123- }
124-
125- var vs y.ValueStruct
126- if e .skipVlog {
127- vs = y.ValueStruct {
128- Value : e .Value ,
129- Meta : e .meta ,
130- UserMeta : e .UserMeta ,
131- ExpiresAt : e .ExpiresAt ,
132- }
133- } else {
134- vbuf := make ([]byte , vptrSize )
135- vs = y.ValueStruct {
136- Value : vptr .Encode (vbuf ),
137- Meta : e .meta | bitValuePointer ,
138- UserMeta : e .UserMeta ,
139- ExpiresAt : e .ExpiresAt ,
140- }
141- }
142- if err := writer .Add (e .Key , vs ); err != nil {
143- return err
123+ writer = sw .newWriter (streamId )
124+ sw .writers [streamId ] = writer
144125 }
126+ writer .reqCh <- req
145127 }
146128 return nil
147129}
@@ -150,15 +132,21 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
150132// updates Oracle with maxVersion found in all entries (if DB is not managed).
151133func (sw * StreamWriter ) Flush () error {
152134 defer sw .done ()
135+
136+ sw .closer .SignalAndWait ()
137+ var maxHead valuePointer
153138 for _ , writer := range sw .writers {
154139 if err := writer .Done (); err != nil {
155140 return err
156141 }
142+ if maxHead .Less (writer .head ) {
143+ maxHead = writer .head
144+ }
157145 }
158146
159147 // Encode and write the value log head into a new table.
160148 data := make ([]byte , vptrSize )
161- sw . head .Encode (data )
149+ maxHead .Encode (data )
162150 headWriter := sw .newWriter (headStreamId )
163151 if err := headWriter .Add (
164152 y .KeyWithTs (head , sw .maxVersion ),
@@ -198,20 +186,74 @@ type sortedWriter struct {
198186 builder * table.Builder
199187 lastKey []byte
200188 streamId uint32
189+ reqCh chan * request
190+ head valuePointer
201191}
202192
203193func (sw * StreamWriter ) newWriter (streamId uint32 ) * sortedWriter {
204- return & sortedWriter {
194+ w := & sortedWriter {
205195 db : sw .db ,
206196 streamId : streamId ,
207197 throttle : sw .throttle ,
208198 builder : table .NewTableBuilder (),
199+ reqCh : make (chan * request , 3 ),
209200 }
201+ sw .closer .AddRunning (1 )
202+ go w .handleRequests (sw .closer )
203+ return w
210204}
211205
212206// ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add.
213207var ErrUnsortedKey = errors .New ("Keys not in sorted order" )
214208
209+ func (w * sortedWriter ) handleRequests (closer * y.Closer ) {
210+ defer closer .Done ()
211+
212+ process := func (req * request ) {
213+ for i , e := range req .Entries {
214+ vptr := req .Ptrs [i ]
215+ if ! vptr .IsZero () {
216+ y .AssertTrue (w .head .Less (vptr ))
217+ w .head = vptr
218+ }
219+
220+ var vs y.ValueStruct
221+ if e .skipVlog {
222+ vs = y.ValueStruct {
223+ Value : e .Value ,
224+ Meta : e .meta ,
225+ UserMeta : e .UserMeta ,
226+ ExpiresAt : e .ExpiresAt ,
227+ }
228+ } else {
229+ vbuf := make ([]byte , vptrSize )
230+ vs = y.ValueStruct {
231+ Value : vptr .Encode (vbuf ),
232+ Meta : e .meta | bitValuePointer ,
233+ UserMeta : e .UserMeta ,
234+ ExpiresAt : e .ExpiresAt ,
235+ }
236+ }
237+ if err := w .Add (e .Key , vs ); err != nil {
238+ panic (err )
239+ }
240+ }
241+ }
242+
243+ for {
244+ select {
245+ case req := <- w .reqCh :
246+ process (req )
247+ case <- closer .HasBeenClosed ():
248+ close (w .reqCh )
249+ for req := range w .reqCh {
250+ process (req )
251+ }
252+ return
253+ }
254+ }
255+ }
256+
215257// Add adds key and vs to sortedWriter.
216258func (w * sortedWriter ) Add (key []byte , vs y.ValueStruct ) error {
217259 if bytes .Compare (key , w .lastKey ) <= 0 {
0 commit comments