Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2733]should support scalar sub query for partition tables #2492

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.common.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* The annotation indicates that the version number since a member or a type has been present.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE, ElementType.METHOD})
public @interface Since {
/**
* the value indicating a version number since this member
* or type has been present.
*/
String value();
}
Expand Up @@ -947,6 +947,8 @@ public final class CarbonCommonConstants {
*/
public static final String DICTIONARY_PATH = "dictionary_path";
public static final String SORT_COLUMNS = "sort_columns";
// file format for the data files
public static final String FORMAT = "format";
public static final String PARTITION_TYPE = "partition_type";
public static final String NUM_PARTITIONS = "num_partitions";
public static final String RANGE_INFO = "range_info";
Expand All @@ -969,6 +971,8 @@ public final class CarbonCommonConstants {
// Flat folder support on table. when it is true all carbondata files store directly under table
// path instead of sub folders.
public static final String FLAT_FOLDER = "flat_folder";
// this will be used in hadoop conf to pass the format type to executor
public static final String CARBON_EXTERNAL_FORMAT_CONF_KEY = "carbon_external_format_type";

/**
* 16 mb size
Expand Down Expand Up @@ -1725,7 +1729,7 @@ public final class CarbonCommonConstants {
public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";

/**
* It's timeout threshold of carbon search query
* It's timeout threshold of carbon search query, in seconds
*/
@CarbonProperty
@InterfaceStability.Unstable
Expand All @@ -1734,7 +1738,7 @@ public final class CarbonCommonConstants {
/**
* Default value is 10 seconds
*/
public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s";
public static final int CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = 10;

/**
* The size of thread pool used for reading files in Work for search mode. By default,
Expand Down
Expand Up @@ -220,6 +220,11 @@ public static boolean createNewFile(
final FsPermission permission) throws IOException {
return getCarbonFile(filePath).createNewFile(filePath, fileType, doAs, permission);
}

public static boolean deleteFile(String filePath) throws IOException {
return deleteFile(filePath, getFileType(filePath));
}

public static boolean deleteFile(String filePath, FileType fileType) throws IOException {
return getCarbonFile(filePath).deleteFile(filePath, fileType);
}
Expand Down
Expand Up @@ -59,12 +59,11 @@ public CarbonTableBuilder tableSchema(TableSchema tableSchema) {
return this;
}

public CarbonTable build() {
public TableInfo buildTableInfo() {
Objects.requireNonNull(tablePath, "tablePath should not be null");
Objects.requireNonNull(tableSchema, "tableSchema should not be null");
Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null");


TableInfo tableInfo = new TableInfo();
tableInfo.setDatabaseName(databaseName);
tableInfo.setTableUniqueName(databaseName + "_" + tableName);
Expand All @@ -73,6 +72,10 @@ public CarbonTable build() {
tableInfo.setTransactionalTable(isTransactionalTable);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0));
return CarbonTable.buildFromTableInfo(tableInfo);
return tableInfo;
}

public CarbonTable build() {
return CarbonTable.buildFromTableInfo(buildTableInfo());
}
}
Expand Up @@ -25,11 +25,13 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.common.annotations.Since;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -89,6 +91,17 @@ public class TableInfo implements Serializable, Writable {
*
*/
private boolean isTransactionalTable = true;
/**
* The format of the fact table.
* By default it is carbondata, and we also support other format like CSV
*/
@Since("1.4.1")
private String format = "carbondata";
/**
* properties for the format, such as delimiter/header for csv format
*/
@Since("1.4.1")
private Map<String, String> formatProperties;

// this identifier is a lazy field which will be created when it is used first time
private AbsoluteTableIdentifier identifier;
Expand All @@ -104,6 +117,7 @@ public class TableInfo implements Serializable, Writable {

public TableInfo() {
dataMapSchemaList = new ArrayList<>();
formatProperties = new HashMap<>();
isTransactionalTable = true;
}

Expand Down Expand Up @@ -196,6 +210,22 @@ public void setTablePath(String tablePath) {
this.tablePath = tablePath;
}

public String getFormat() {
return format;
}

public void setFormat(String format) {
this.format = format;
}

public Map<String, String> getFormatProperties() {
return formatProperties;
}

public void setFormatProperties(Map<String, String> formatProperties) {
this.formatProperties = formatProperties;
}

public List<DataMapSchema> getDataMapSchemaList() {
return dataMapSchemaList;
}
Expand Down Expand Up @@ -291,6 +321,17 @@ public void write(DataOutput out) throws IOException {
}
}
out.writeBoolean(isSchemaModified);

out.writeUTF(format);
boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0;
out.writeBoolean(isFormatPropertiesExists);
if (isFormatPropertiesExists) {
out.writeShort(formatProperties.size());
for (Map.Entry<String, String> entry : formatProperties.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}
}
}

@Override public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -327,6 +368,17 @@ public void write(DataOutput out) throws IOException {
}
}
this.isSchemaModified = in.readBoolean();

this.format = in.readUTF();
boolean isFormatPropertiesExists = in.readBoolean();
if (isFormatPropertiesExists) {
short size = in.readShort();
for (int i = 0; i < size; i++) {
String key = in.readUTF();
String value = in.readUTF();
this.formatProperties.put(key, value);
}
}
}

public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
Expand Down
Expand Up @@ -54,12 +54,22 @@ public class TableSchemaBuilder {

private List<ColumnSchema> measures = new LinkedList<>();

private Map<String, String> tableProperties;

private int blockSize;

private int blockletSize;

private String tableName;

public TableSchemaBuilder properties(Map<String, String> tableProperties) {
if (tableProperties == null) {
throw new IllegalArgumentException("blockSize should not be null");
}
this.tableProperties = tableProperties;
return this;
}

public TableSchemaBuilder blockSize(int blockSize) {
if (blockSize <= 0) {
throw new IllegalArgumentException("blockSize should be greater than 0");
Expand Down Expand Up @@ -97,22 +107,26 @@ public TableSchema build() {
allColumns.addAll(measures);
schema.setListOfColumns(allColumns);

Map<String, String> property = new HashMap<>();
if (tableProperties == null) {
tableProperties = new HashMap<>();
}
if (blockSize > 0) {
property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize));
tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize));
}
if (blockletSize > 0) {
property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize));
tableProperties.put(
CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize));
}
if (property.size() != 0) {
schema.setTableProperties(property);
if (tableProperties.size() != 0) {
schema.setTableProperties(tableProperties);
}

return schema;
}

public void setSortColumns(List<ColumnSchema> sortColumns) {
public TableSchemaBuilder setSortColumns(List<ColumnSchema> sortColumns) {
this.sortColumns = sortColumns;
return this;
}

public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, boolean isSortColumn) {
Expand Down
Expand Up @@ -69,8 +69,14 @@ public String getString() {
public String getStatement() {
StringBuffer value = new StringBuffer();
value.append("(");
boolean start = false;
for (Expression expr : children) {
value.append(expr.getString()).append(";");
if (start) {
value.append(", ");
} else {
start = true;
}
value.append(expr.getStatement());
}
value.append(')');

Expand Down
Expand Up @@ -100,6 +100,11 @@ public String getString() {

@Override
public String getStatement() {
return left.getStatement() + " <> " + right.getStatement();
if (isNotNull) {
return left.getStatement() + " is not " + right.getStatement();
} else {
return left.getStatement() + " <> " + right.getStatement();
}

}
}
Expand Up @@ -59,6 +59,6 @@ public String getString() {

@Override
public String getStatement() {
return "(" + left.getString() + " or " + right.getString() + ")";
return "(" + left.getStatement() + " or " + right.getStatement() + ")";
}
}
Expand Up @@ -66,7 +66,7 @@ public String getString() {

@Override
public String getStatement() {
return left.getStatement() + " between " + right.getStatement();
return left.getStatement() + " and " + right.getStatement();
}

@Override
Expand Down
Expand Up @@ -69,6 +69,7 @@ public class QueryModel {
* table block information in which query will be executed
*/
private List<TableBlockInfo> tableBlockInfos;

/**
* To handle most of the computation in query engines like spark and hive, carbon should give
* raw detailed records to it.
Expand Down Expand Up @@ -109,11 +110,6 @@ public class QueryModel {
*/
private boolean requiredRowId;

/**
* whether it is FG with search mode
*/
private boolean isFG;

private QueryModel(CarbonTable carbonTable) {
tableBlockInfos = new ArrayList<TableBlockInfo>();
invalidSegmentIds = new ArrayList<>();
Expand Down Expand Up @@ -375,19 +371,12 @@ public void setRequiredRowId(boolean requiredRowId) {
this.requiredRowId = requiredRowId;
}

public boolean isFG() {
return isFG;
}

public void setFG(boolean FG) {
isFG = FG;
}

@Override
public String toString() {
return String.format("scan on table %s.%s, %d projection columns with filter (%s)",
table.getDatabaseName(), table.getTableName(),
projection.getDimensions().size() + projection.getMeasures().size(),
filterExpressionResolverTree.getFilterExpression().toString());
filterExpressionResolverTree == null ? "" :
filterExpressionResolverTree.getFilterExpression().toString());
}
}
Expand Up @@ -18,15 +18,19 @@
package org.apache.carbondata.core.statusmanager;

/**
* The data file format supported in carbondata project
* The data file format supported in carbondata project.
* The fileformat along with its property will be stored in tableinfo
*/
public enum FileFormat {

// carbondata columnar file format, optimized for read
COLUMNAR_V3,

// carbondata row file format, optimized for write
ROW_V1;
ROW_V1,

// external file format, such as parquet/csv
EXTERNAL;

public static FileFormat getByOrdinal(int ordinal) {
if (ordinal < 0 || ordinal >= FileFormat.values().length) {
Expand All @@ -38,6 +42,8 @@ public static FileFormat getByOrdinal(int ordinal) {
return COLUMNAR_V3;
case 1:
return ROW_V1;
case 2:
return EXTERNAL;
}

return COLUMNAR_V3;
Expand Down