-
Notifications
You must be signed in to change notification settings - Fork 704
/
CompressedMeasureChunkFileBasedReaderV3.java
225 lines (212 loc) · 10.1 KB
/
CompressedMeasureChunkFileBasedReaderV3.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
import org.apache.carbondata.format.Encoding;
import org.apache.commons.lang.ArrayUtils;
/**
* Measure column V3 Reader class which will be used to read and uncompress
* V3 format data
* data format
* Data Format
* <FileHeader>
* <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
* <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
* <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
* <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
* <File Footer>
*/
public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChunkReaderV2V3Format {
/**
* end position of last measure in carbon data file
*/
private long measureOffsets;
public CompressedMeasureChunkFileBasedReaderV3(BlockletInfo blockletInfo, String filePath) {
super(blockletInfo, filePath);
measureOffsets = blockletInfo.getMeasureOffsets();
}
/**
* Below method will be used to read the measure column data form carbon data file
* 1. Get the length of the data to be read
* 2. Allocate the direct buffer
* 3. read the data from file
* 4. Get the data chunk object from data read
* 5. Create the raw chunk object and fill the details
*
* @param fileReader reader for reading the column from carbon data file
* @param columnIndex column to be read
* @return measure raw chunk
*/
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileReader fileReader,
int columnIndex) throws IOException {
int dataLength = 0;
// to calculate the length of the data to be read
// column other than last column we can subtract the offset of current column with
// next column and get the total length.
// but for last column we need to use lastDimensionOffset which is the end position
// of the last dimension, we can subtract current dimension offset from lastDimensionOffset
if (measureColumnChunkOffsets.size() - 1 == columnIndex) {
dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(columnIndex));
} else {
dataLength =
(int) (measureColumnChunkOffsets.get(columnIndex + 1) - measureColumnChunkOffsets
.get(columnIndex));
}
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
buffer = fileReader
.readByteBuffer(filePath, measureColumnChunkOffsets.get(columnIndex), dataLength);
}
// get the data chunk which will have all the details about the data pages
DataChunk3 dataChunk =
CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(columnIndex));
return getMeasureRawColumnChunk(fileReader, columnIndex, 0, dataLength, buffer,
dataChunk);
}
MeasureRawColumnChunk getMeasureRawColumnChunk(FileReader fileReader, int columnIndex,
long offset, int dataLength, ByteBuffer buffer, DataChunk3 dataChunk) {
// creating a raw chunks instance and filling all the details
MeasureRawColumnChunk rawColumnChunk =
new MeasureRawColumnChunk(columnIndex, buffer, offset, dataLength, this);
int numberOfPages = dataChunk.getPage_length().size();
byte[][] maxValueOfEachPage = new byte[numberOfPages][];
byte[][] minValueOfEachPage = new byte[numberOfPages][];
int[] eachPageLength = new int[numberOfPages];
for (int i = 0; i < minValueOfEachPage.length; i++) {
maxValueOfEachPage[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
minValueOfEachPage[i] =
dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
}
rawColumnChunk.setDataChunkV3(dataChunk);
rawColumnChunk.setFileReader(fileReader);
rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
rawColumnChunk.setMaxValues(maxValueOfEachPage);
rawColumnChunk.setMinValues(minValueOfEachPage);
rawColumnChunk.setRowCount(eachPageLength);
rawColumnChunk.setOffsets(ArrayUtils
.toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
return rawColumnChunk;
}
/**
* Below method will be used to read the multiple measure column data in group
* and divide into measure raw chunk object
* Steps for reading
* 1. Get the length of the data to be read
* 2. Allocate the direct buffer
* 3. read the data from file
* 4. Get the data chunk object from file for each column
* 5. Create the raw chunk object and fill the details for each column
* 6. increment the offset of the data
*
* @param fileReader
* reader which will be used to read the measure columns data from file
* @param startColumnIndex
* column index of the first measure column
* @param endColumnIndex
* column index of the last measure column
* @return MeasureRawColumnChunk array
*/
protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileReader,
int startColumnIndex, int endColumnIndex) throws IOException {
// to calculate the length of the data to be read
// column we can subtract the offset of start column offset with
// end column+1 offset and get the total length.
long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnIndex);
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
buffer = fileReader.readByteBuffer(filePath, currentMeasureOffset,
(int) (measureColumnChunkOffsets.get(endColumnIndex + 1) - currentMeasureOffset));
}
// create raw chunk for each measure column
MeasureRawColumnChunk[] measureDataChunk =
new MeasureRawColumnChunk[endColumnIndex - startColumnIndex + 1];
int runningLength = 0;
int index = 0;
for (int i = startColumnIndex; i <= endColumnIndex; i++) {
int currentLength =
(int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
DataChunk3 dataChunk =
CarbonUtil.readDataChunk3(buffer, runningLength, measureColumnChunkLength.get(i));
MeasureRawColumnChunk measureRawColumnChunk =
getMeasureRawColumnChunk(fileReader, i, runningLength, currentLength, buffer, dataChunk);
measureDataChunk[index] = measureRawColumnChunk;
runningLength += currentLength;
index++;
}
return measureDataChunk;
}
/**
* Below method will be used to convert the compressed measure chunk raw data to actual data
*
* @param rawColumnChunk measure raw chunk
* @param pageNumber number
* @return DimensionColumnPage
*/
@Override
public ColumnPage decodeColumnPage(
MeasureRawColumnChunk rawColumnChunk, int pageNumber)
throws IOException, MemoryException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
int offset = (int) rawColumnChunk.getOffSet() +
measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
dataChunk3.getPage_offset().get(pageNumber);
ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return decodedPage;
}
/**
* Decode measure column page with page header and raw data starting from offset
*/
protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset)
throws MemoryException, IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
compressorName);
return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
}
}