-
Notifications
You must be signed in to change notification settings - Fork 704
/
BloomCoarseGrainDataMap.java
394 lines (364 loc) · 17.3 KB
/
BloomCoarseGrainDataMap.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.datamap.bloom;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.scan.expression.conditional.InExpression;
import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.loading.converter.FieldConverter;
import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.CarbonBloomFilter;
import org.apache.hadoop.util.bloom.Key;
/**
* BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed column,
* a bloom filter is constructed to indicate whether a value belongs to this blocklet.
* More information of the index file can be found in the corresponding datamap writer.
*/
@InterfaceAudience.Internal
public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
private Map<String, CarbonColumn> name2Col;
private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
private String shardName;
private Path indexPath;
private Set<String> filteredShard;
private boolean needShardPrune;
/**
* This is used to convert literal filter value to internal carbon value
*/
private Map<String, FieldConverter> name2Converters;
private BadRecordLogHolder badRecordLogHolder;
@Override
public void init(DataMapModel dataMapModel) throws IOException {
this.indexPath = FileFactory.getPath(dataMapModel.getFilePath());
this.shardName = indexPath.getName();
if (dataMapModel instanceof BloomDataMapModel) {
BloomDataMapModel model = (BloomDataMapModel) dataMapModel;
this.cache = model.getCache();
}
}
public void setFilteredShard(Set<String> filteredShard) {
this.filteredShard = filteredShard;
// do shard prune when pruning only if bloom index files are merged
this.needShardPrune = filteredShard != null &&
shardName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME);
}
/**
* init field converters for index columns
*/
public void initIndexColumnConverters(CarbonTable carbonTable, List<CarbonColumn> indexedColumn) {
this.name2Col = new HashMap<>(indexedColumn.size());
for (CarbonColumn col : indexedColumn) {
this.name2Col.put(col.getColName(), col);
}
String parentTablePath = getAncestorTablePath(carbonTable);
try {
this.name2Converters = new HashMap<>(indexedColumn.size());
AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
.from(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
String nullFormat = "\\N";
Map<Object, Integer>[] localCaches = new Map[indexedColumn.size()];
for (int i = 0; i < indexedColumn.size(); i++) {
localCaches[i] = new ConcurrentHashMap<>();
DataField dataField = new DataField(indexedColumn.get(i));
String dateFormat = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
dataField.setDateFormat(dateFormat);
String tsFormat = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
dataField.setTimestampFormat(tsFormat);
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(dataField, absoluteTableIdentifier, i, nullFormat, null, false,
localCaches[i], false, parentTablePath);
this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter);
}
} catch (IOException e) {
LOGGER.error(e, "Exception occurs while init index columns");
throw new RuntimeException(e);
}
this.badRecordLogHolder = new BadRecordLogHolder();
this.badRecordLogHolder.setLogged(false);
}
/**
* recursively find the ancestor's table path. This is used for dictionary scenario
* where preagg will use the dictionary of the parent table.
*/
private String getAncestorTablePath(CarbonTable currentTable) {
if (!currentTable.isChildDataMap()) {
return currentTable.getTablePath();
}
RelationIdentifier parentIdentifier =
currentTable.getTableInfo().getParentRelationIdentifiers().get(0);
CarbonTable parentTable = CarbonMetadata.getInstance().getCarbonTable(
parentIdentifier.getDatabaseName(), parentIdentifier.getTableName());
return getAncestorTablePath(parentTable);
}
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
List<PartitionSpec> partitions) throws IOException {
Set<Blocklet> hitBlocklets = new HashSet<>();
if (filterExp == null) {
// null is different from empty here. Empty means after pruning, no blocklet need to scan.
return null;
}
List<BloomQueryModel> bloomQueryModels;
try {
bloomQueryModels = createQueryModel(filterExp.getFilterExpression());
} catch (DictionaryGenerationException | UnsupportedEncodingException e) {
LOGGER.error(e, "Exception occurs while creating query model");
throw new RuntimeException(e);
}
for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
LOGGER.debug("prune blocklet for query: " + bloomQueryModel);
BloomCacheKeyValue.CacheKey cacheKey = new BloomCacheKeyValue.CacheKey(
this.indexPath.toString(), bloomQueryModel.columnName);
BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
for (CarbonBloomFilter bloomFilter : bloomIndexList) {
if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) {
// skip shard which has been pruned in Main datamap
continue;
}
boolean scanRequired = bloomFilter.membershipTest(new Key(bloomQueryModel.filterValue));
if (scanRequired) {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
String.valueOf(bloomFilter.getBlockletNo())));
Blocklet blocklet = new Blocklet(bloomFilter.getShardName(),
String.valueOf(bloomFilter.getBlockletNo()));
hitBlocklets.add(blocklet);
} else {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
String.valueOf(bloomFilter.getBlockletNo())));
}
}
}
return new ArrayList<>(hitBlocklets);
}
private List<BloomQueryModel> createQueryModel(Expression expression)
throws DictionaryGenerationException, UnsupportedEncodingException {
List<BloomQueryModel> queryModels = new ArrayList<BloomQueryModel>();
// bloomdatamap only support equalTo and In operators now
if (expression instanceof EqualToExpression) {
Expression left = ((EqualToExpression) expression).getLeft();
Expression right = ((EqualToExpression) expression).getRight();
String column;
if (left instanceof ColumnExpression && right instanceof LiteralExpression) {
column = ((ColumnExpression) left).getColumnName();
if (this.name2Col.containsKey(column)) {
BloomQueryModel bloomQueryModel =
buildQueryModelForEqual((ColumnExpression) left, (LiteralExpression) right);
queryModels.add(bloomQueryModel);
}
return queryModels;
} else if (left instanceof LiteralExpression && right instanceof ColumnExpression) {
column = ((ColumnExpression) right).getColumnName();
if (this.name2Col.containsKey(column)) {
BloomQueryModel bloomQueryModel =
buildQueryModelForEqual((ColumnExpression) right, (LiteralExpression) left);
queryModels.add(bloomQueryModel);
}
return queryModels;
} else {
String errorMsg = "BloomFilter can only support the 'equal' filter like 'Col = PlainValue'";
LOGGER.warn(errorMsg);
throw new RuntimeException(errorMsg);
}
} else if (expression instanceof InExpression) {
Expression left = ((InExpression) expression).getLeft();
Expression right = ((InExpression) expression).getRight();
String column;
if (left instanceof ColumnExpression && right instanceof ListExpression) {
column = ((ColumnExpression) left).getColumnName();
if (this.name2Col.containsKey(column)) {
List<BloomQueryModel> models =
buildQueryModelForIn((ColumnExpression) left, (ListExpression) right);
queryModels.addAll(models);
}
return queryModels;
} else if (left instanceof ListExpression && right instanceof ColumnExpression) {
column = ((ColumnExpression) right).getColumnName();
if (this.name2Col.containsKey(column)) {
List<BloomQueryModel> models =
buildQueryModelForIn((ColumnExpression) right, (ListExpression) left);
queryModels.addAll(models);
}
return queryModels;
} else {
String errorMsg = "BloomFilter can only support the 'in' filter like 'Col in PlainValue'";
LOGGER.warn(errorMsg);
throw new RuntimeException(errorMsg);
}
}
for (Expression child : expression.getChildren()) {
queryModels.addAll(createQueryModel(child));
}
return queryModels;
}
private BloomQueryModel buildQueryModelForEqual(ColumnExpression ce,
LiteralExpression le) throws DictionaryGenerationException, UnsupportedEncodingException {
String columnName = ce.getColumnName();
DataType dataType = ce.getDataType();
Object expressionValue = le.getLiteralExpValue();
Object literalValue;
// note that if the datatype is date/timestamp, the expressionValue is long type.
if (null == expressionValue) {
literalValue = null;
} else if (le.getLiteralExpDataType() == DataTypes.DATE) {
DateFormat format = new SimpleDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
// the below settings are set statically according to DateDirectDirectionaryGenerator
format.setLenient(false);
format.setTimeZone(TimeZone.getTimeZone("GMT"));
literalValue = format.format(new Date((long) expressionValue / 1000));
} else if (le.getLiteralExpDataType() == DataTypes.TIMESTAMP) {
DateFormat format =
new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
// the below settings are set statically according to TimeStampDirectDirectionaryGenerator
format.setLenient(false);
literalValue = format.format(new Date((long) expressionValue / 1000));
} else {
literalValue = expressionValue;
}
return buildQueryModelInternal(this.name2Col.get(columnName), literalValue, dataType);
}
/**
* for `in` expressions, we use `equal` to handle it.
* Note that `in` operator needs at least one match not exactly match. since while doing pruning,
* we collect all the blocklets that will match the querymodel, this will not be a problem.
*/
private List<BloomQueryModel> buildQueryModelForIn(ColumnExpression ce, ListExpression le)
throws DictionaryGenerationException, UnsupportedEncodingException {
List<BloomQueryModel> queryModels = new ArrayList<>();
for (Expression child : le.getChildren()) {
queryModels.add(buildQueryModelForEqual(ce, (LiteralExpression) child));
}
return queryModels;
}
private BloomQueryModel buildQueryModelInternal(CarbonColumn carbonColumn,
Object filterLiteralValue, DataType filterValueDataType) throws
DictionaryGenerationException, UnsupportedEncodingException {
// convert the filter value to string and apply convertes on it to get carbon internal value
String strFilterValue = null;
if (null != filterLiteralValue) {
strFilterValue = String.valueOf(filterLiteralValue);
}
Object convertedValue = this.name2Converters.get(carbonColumn.getColName()).convert(
strFilterValue, badRecordLogHolder);
byte[] internalFilterValue;
if (carbonColumn.isMeasure()) {
// for measures, the value is already the type, just convert it to bytes.
if (convertedValue == null) {
convertedValue = DataConvertUtil.getNullValueForMeasure(carbonColumn.getDataType(),
carbonColumn.getColumnSchema().getScale());
}
// Carbon stores boolean as byte. Here we convert it for `getValueAsBytes`
if (carbonColumn.getDataType().equals(DataTypes.BOOLEAN)) {
convertedValue = BooleanConvert.boolean2Byte((Boolean)convertedValue);
}
internalFilterValue = CarbonUtil.getValueAsBytes(carbonColumn.getDataType(), convertedValue);
} else if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) ||
carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
// for dictionary/date columns, convert the surrogate key to bytes
internalFilterValue = CarbonUtil.getValueAsBytes(DataTypes.INT, convertedValue);
} else {
// for non dictionary dimensions, is already bytes,
internalFilterValue = (byte[]) convertedValue;
}
if (internalFilterValue.length == 0) {
internalFilterValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
}
return new BloomQueryModel(carbonColumn.getColName(), internalFilterValue);
}
@Override
public boolean isScanRequired(FilterResolverIntf filterExp) {
return true;
}
@Override
public void clear() {
}
static class BloomQueryModel {
private String columnName;
private byte[] filterValue;
/**
* represent an query model will be applyied on bloom index
*
* @param columnName bloom index column
* @param filterValue key for the bloom index,
* this value is converted from user specified filter value in query
*/
private BloomQueryModel(String columnName, byte[] filterValue) {
this.columnName = columnName;
this.filterValue = filterValue;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BloomQueryModel{");
sb.append("columnName='").append(columnName).append('\'');
sb.append(", filterValue=").append(Arrays.toString(filterValue));
sb.append('}');
return sb.toString();
}
}
@Override
public void finish() {
}
}