@@ -111,10 +111,6 @@ type RawColumnWriter struct {
111
111
validator invariants.Value [* colblk.DataBlockValidator ]
112
112
disableKeyOrderChecks bool
113
113
cpuMeasurer base.CPUMeasurer
114
-
115
- // RawColumnWriter writes data sequentially so each writer can have a
116
- // physical block maker.
117
- physBlockMaker block.PhysicalBlockMaker
118
114
}
119
115
120
116
// Assert that *RawColumnWriter implements RawWriter.
@@ -148,12 +144,11 @@ func newColumnarWriter(
148
144
w .topLevelIndexBlock .Init ()
149
145
w .rangeDelBlock .Init (w .comparer .Equal )
150
146
w .rangeKeyBlock .Init (w .comparer .Equal )
151
- w .physBlockMaker .Init (w .opts .Compression , w .opts .Checksum )
152
147
if ! o .DisableValueBlocks {
153
148
flushGovernor := block .MakeFlushGovernor (o .BlockSize , o .BlockSizeThreshold , o .SizeClassAwareThreshold , o .AllocatorSizeClasses )
154
149
// We use the value block writer in the same goroutine so it's safe to share
155
150
// the physBlockMaker.
156
- w .valueBlock = valblk .NewWriter (flushGovernor , & w .physBlockMaker , func (compressedSize int ) {})
151
+ w .valueBlock = valblk .NewWriter (flushGovernor , & w .layout . physBlockMaker , func (compressedSize int ) {})
157
152
}
158
153
if o .FilterPolicy != base .NoFilterPolicy {
159
154
switch o .FilterType {
@@ -714,7 +709,7 @@ func (w *RawColumnWriter) enqueueDataBlock(
714
709
}
715
710
716
711
// Compress and checksum the data block and send it to the write queue.
717
- pb := w .physBlockMaker .Make (serializedBlock , blockkind .SSTableData , block .NoFlags )
712
+ pb := w .layout . physBlockMaker .Make (serializedBlock , blockkind .SSTableData , block .NoFlags )
718
713
return w .enqueuePhysicalBlock (pb , separator )
719
714
}
720
715
@@ -1035,7 +1030,7 @@ func (w *RawColumnWriter) Close() (err error) {
1035
1030
}
1036
1031
}
1037
1032
1038
- w .props .CompressionStats = w .physBlockMaker .Compressor .Stats ().String ()
1033
+ w .props .CompressionStats = w .layout . physBlockMaker .Compressor .Stats ().String ()
1039
1034
var toWrite []byte
1040
1035
w .props .CompressionOptions = rocksDBCompressionOptions
1041
1036
if w .opts .TableFormat >= TableFormatPebblev7 {
@@ -1069,7 +1064,6 @@ func (w *RawColumnWriter) Close() (err error) {
1069
1064
return err
1070
1065
}
1071
1066
w .meta .Properties = w .props
1072
- w .physBlockMaker .Close ()
1073
1067
// Release any held memory and make any future calls error.
1074
1068
* w = RawColumnWriter {meta : w .meta , err : errWriterClosed }
1075
1069
return nil
@@ -1089,7 +1083,7 @@ func (w *RawColumnWriter) rewriteSuffixes(
1089
1083
return errors .Wrap (err , "reading layout" )
1090
1084
}
1091
1085
// Copy data blocks in parallel, rewriting suffixes as we go.
1092
- blocks , err := rewriteDataBlocksInParallel (r , sstBytes , wo , l .Data , from , to , concurrency , w .physBlockMaker .Compressor .Stats (), func () blockRewriter {
1086
+ blocks , err := rewriteDataBlocksInParallel (r , sstBytes , wo , l .Data , from , to , concurrency , w .layout . physBlockMaker .Compressor .Stats (), func () blockRewriter {
1093
1087
return colblk .NewDataBlockRewriter (wo .KeySchema , w .comparer )
1094
1088
})
1095
1089
if err != nil {
@@ -1247,7 +1241,7 @@ func (w *RawColumnWriter) copyDataBlocks(
1247
1241
// using CopySpan().
1248
1242
func (w * RawColumnWriter ) addDataBlock (b , sep []byte , bhp block.HandleWithProperties ) error {
1249
1243
// Compress and checksum the data block and send it to the write queue.
1250
- pb := w .physBlockMaker .Make (b , blockkind .SSTableData , block .NoFlags )
1244
+ pb := w .layout . physBlockMaker .Make (b , blockkind .SSTableData , block .NoFlags )
1251
1245
if err := w .enqueuePhysicalBlock (pb , sep ); err != nil {
1252
1246
return err
1253
1247
}
0 commit comments