Skip to content

Commit

Permalink
Merge 2973e75 into 72a5973
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed Apr 24, 2019
2 parents 72a5973 + 2973e75 commit 0561f72
Show file tree
Hide file tree
Showing 19 changed files with 1,039 additions and 190 deletions.
Expand Up @@ -478,6 +478,11 @@ private CarbonCommonConstants() {
*/
public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";

/**
* column level property: the measure is changed to the dimension
*/
public static final String COLUMN_DRIFT = "column_drift";

//////////////////////////////////////////////////////////////////////////////////////////
// Data loading parameter start here
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datamap;

import java.io.Serializable;

import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

/**
* the filter of DataMap
*/
public class DataMapFilter implements Serializable {

private CarbonTable table;

private Expression expression;

private FilterResolverIntf resolver;

public DataMapFilter(CarbonTable table, Expression expression) {
this.table = table;
this.expression = expression;
resolve();
}

public DataMapFilter(FilterResolverIntf resolver) {
this.resolver = resolver;
}

private void resolve() {
if (expression != null) {
table.processFilterExpression(expression, null, null);
resolver = CarbonTable.resolveFilter(expression, table.getAbsoluteTableIdentifier());
}
}

public Expression getExpression() {
return expression;
}

public void setExpression(Expression expression) {
this.expression = expression;
}

public FilterResolverIntf getResolver() {
return resolver;
}

public void setResolver(FilterResolverIntf resolver) {
this.resolver = resolver;
}

public boolean isEmpty() {
return resolver == null;
}

public boolean isResolvedOnSegment(SegmentProperties segmentProperties) {
if (expression == null || table == null) {
return true;
}
if (!table.isTransactionalTable()) {
return false;
}
if (table.hasColumnDrift() && RestructureUtil
.hasColumnDriftOnSegment(table, segmentProperties)) {
return false;
}
return true;
}
}
Expand Up @@ -47,7 +47,6 @@
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.events.Event;
Expand Down Expand Up @@ -100,38 +99,6 @@ public BlockletDetailsFetcher getBlockletDetailsFetcher() {
return blockletDetailsFetcher;
}


/**
* Pass the valid segments and prune the datamap using filter expression
*
* @param segments
* @param filterExp
* @return
*/
public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp,
List<PartitionSpec> partitions) throws IOException {
List<ExtendedBlocklet> blocklets = new ArrayList<>();
SegmentProperties segmentProperties;
Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
for (Segment segment : segments) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
// if filter is not passed then return all the blocklets
if (filterExp == null) {
pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
} else {
segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(dataMap
.prune(filterExp, segmentProperties, partitions, table));
}
}
blocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
segment));
}
return blocklets;
}

public CarbonTable getTable() {
return table;
}
Expand All @@ -140,10 +107,10 @@ public CarbonTable getTable() {
* Pass the valid segments and prune the datamap using filter expression
*
* @param segments
* @param filterExp
* @param filter
* @return
*/
public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolverIntf filterExp,
public List<ExtendedBlocklet> prune(List<Segment> segments, final DataMapFilter filter,
final List<PartitionSpec> partitions) throws IOException {
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
Expand All @@ -164,15 +131,15 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
// driver should have minimum threads opened to support multiple concurrent queries.
if (filterExp == null) {
if (filter.isEmpty()) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
return pruneWithFilter(segments, filter, partitions, blocklets, dataMaps);
}
// handle by multi-thread
List<ExtendedBlocklet> extendedBlocklets =
pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
List<ExtendedBlocklet> extendedBlocklets = pruneMultiThread(
segments, filter, partitions, blocklets, dataMaps, totalFiles);
return extendedBlocklets;
}

Expand All @@ -187,14 +154,22 @@ private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
return blocklets;
}

private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
FilterResolverIntf filterExp, List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> dataMaps) throws IOException {
private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments, DataMapFilter filter,
List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
Map<Segment, List<DataMap>> dataMaps) throws IOException {
for (Segment segment : segments) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
if (filter.isResolvedOnSegment(segmentProperties)) {
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(
dataMap.prune(filter.getResolver(), segmentProperties, partitions));
}
} else {
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(
dataMap.prune(filter.getExpression(), segmentProperties, partitions, table));
}
}
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
Expand All @@ -204,7 +179,7 @@ private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
}

private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
final DataMapFilter filter, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
/*
Expand Down Expand Up @@ -295,14 +270,24 @@ private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
Segment segment = segmentDataMapGroup.getSegment();
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(filterExp,
segmentProperties,
partitions);
pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
if (filter.isResolvedOnSegment(segmentProperties)) {
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
filter.getResolver(), segmentProperties, partitions);
pruneBlocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
} else {
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
filter.getExpression(), segmentProperties, partitions, table);
pruneBlocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
}
synchronized (prunedBlockletMap) {
List<ExtendedBlocklet> pruneBlockletsExisting =
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;

import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
Expand Down Expand Up @@ -50,7 +51,7 @@ public DataMapExprWrapperImpl(TableDataMap dataMap, FilterResolverIntf expressio
@Override
public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
throws IOException {
return dataMap.prune(segments, expression, partitionsToPrune);
return dataMap.prune(segments, new DataMapFilter(expression), partitionsToPrune);
}

public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
Expand Down
Expand Up @@ -120,6 +120,11 @@ public class CarbonTable implements Serializable {
*/
private List<CarbonMeasure> allMeasures;

/**
* list of column drift
*/
private List<CarbonDimension> columnDrift;

/**
* table bucket map.
*/
Expand Down Expand Up @@ -189,6 +194,7 @@ private CarbonTable() {
this.tablePartitionMap = new HashMap<>();
this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.columnDrift = new ArrayList<CarbonDimension>();
}

/**
Expand Down Expand Up @@ -898,6 +904,12 @@ private void fillVisibleDimensions(String tableName) {
for (CarbonDimension dimension : allDimensions) {
if (!dimension.isInvisible()) {
visibleDimensions.add(dimension);
Map<String, String> columnProperties = dimension.getColumnProperties();
if (columnProperties != null) {
if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
columnDrift.add(dimension);
}
}
}
}
tableDimensionsMap.put(tableName, visibleDimensions);
Expand All @@ -912,6 +924,14 @@ public List<CarbonMeasure> getAllMeasures() {
return allMeasures;
}

public List<CarbonDimension> getColumnDrift() {
return columnDrift;
}

public boolean hasColumnDrift() {
return tableInfo.hasColumnDrift();
}

/**
* This method will all the visible allMeasures
*
Expand Down
Expand Up @@ -91,6 +91,8 @@ public class TableInfo implements Serializable, Writable {
*/
private boolean isTransactionalTable = true;

private boolean hasColumnDrift = false;

// this identifier is a lazy field which will be created when it is used first time
private AbsoluteTableIdentifier identifier;

Expand Down Expand Up @@ -122,6 +124,7 @@ public void setFactTable(TableSchema factTable) {
this.factTable = factTable;
updateParentRelationIdentifier();
updateIsSchemaModified();
updateHasColumnDrift();
}

private void updateIsSchemaModified() {
Expand Down Expand Up @@ -276,6 +279,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(lastUpdatedTime);
out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
out.writeBoolean(isTransactionalTable);
out.writeBoolean(hasColumnDrift);
boolean isChildSchemaExists =
null != dataMapSchemaList && dataMapSchemaList.size() > 0;
out.writeBoolean(isChildSchemaExists);
Expand Down Expand Up @@ -305,6 +309,7 @@ public void write(DataOutput out) throws IOException {
this.lastUpdatedTime = in.readLong();
this.tablePath = in.readUTF();
this.isTransactionalTable = in.readBoolean();
this.hasColumnDrift = in.readBoolean();
boolean isChildSchemaExists = in.readBoolean();
this.dataMapSchemaList = new ArrayList<>();
if (isChildSchemaExists) {
Expand Down Expand Up @@ -371,4 +376,22 @@ public boolean isSchemaModified() {
return isSchemaModified;
}

private void updateHasColumnDrift() {
this.hasColumnDrift = false;
for (ColumnSchema columnSchema : factTable.getListOfColumns()) {
if (columnSchema.isDimensionColumn() && !columnSchema.isInvisible()) {
Map<String, String> columnProperties = columnSchema.getColumnProperties();
if (columnProperties != null) {
if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
this.hasColumnDrift = true;
break;
}
}
}
}
}

public boolean hasColumnDrift() {
return hasColumnDrift;
}
}

0 comments on commit 0561f72

Please sign in to comment.