Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iotdb.commons.schema.table.TableNodeStatus;
import org.apache.iotdb.commons.schema.table.TreeViewSchema;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.utils.MetadataUtils;
Expand Down Expand Up @@ -98,7 +97,6 @@
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
import static org.apache.iotdb.commons.schema.SchemaConstant.TABLE_MNODE_TYPE;
import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;

// Since the ConfigMTree is all stored in memory, thus it is not restricted to manage MNode through
// MTreeStore.
Expand Down Expand Up @@ -803,20 +801,14 @@ public void setTableColumnComment(
throws MetadataException {
final TsTable table = getTable(database, tableName);

final TsTableColumnSchema columnSchema =
!columnName.equals(TIME_COLUMN_NAME) || Objects.isNull(comment)
? table.getColumnSchema(columnName)
: new TimeColumnSchema(TIME_COLUMN_NAME, TSDataType.TIMESTAMP);
final TsTableColumnSchema columnSchema = table.getColumnSchema(columnName);

if (Objects.isNull(columnSchema)) {
throw new ColumnNotExistsException(
PathUtils.unQualifyDatabaseName(database.getFullPath()), tableName, columnName);
}
if (Objects.nonNull(comment)) {
columnSchema.getProps().put(TsTable.COMMENT_KEY, comment);
if (columnName.equals("time")) {
// Replace the original time column
table.addColumnSchema(columnSchema);
}
} else {
columnSchema.getProps().remove(TsTable.COMMENT_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;

public class TableInsertTabletStatementGenerator extends InsertTabletStatementGenerator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TableInsertTabletStatementGenerator.class);
Expand All @@ -54,13 +52,14 @@ public TableInsertTabletStatementGenerator(
List<TSDataType> inputColumnTypes,
List<TsTableColumnCategory> tsTableColumnCategories,
boolean isAligned,
int rowLimit) {
int rowLimit,
String timeColumnName) {
super(
targetTable,
measurementToDataTypeMap.keySet().toArray(new String[0]),
measurementToDataTypeMap.values().toArray(new TSDataType[0]),
measurementToInputLocationMap.entrySet().stream()
.filter(entry -> !entry.getKey().equalsIgnoreCase(TIME_COLUMN_NAME))
.filter(entry -> !entry.getKey().equalsIgnoreCase(timeColumnName))
.map(Map.Entry::getValue)
.toArray(InputLocation[]::new),
inputColumnTypes.stream().map(TypeFactory::getType).toArray(Type[]::new),
Expand All @@ -69,8 +68,7 @@ public TableInsertTabletStatementGenerator(
this.databaseName = databaseName;
this.writtenCounter = new AtomicLong(0);
this.columnCategories = tsTableColumnCategories.toArray(new TsTableColumnCategory[0]);
this.timeColumnIndex =
measurementToInputLocationMap.get(TIME_COLUMN_NAME).getValueColumnIndex();
this.timeColumnIndex = measurementToInputLocationMap.get(timeColumnName).getValueColumnIndex();
this.reset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;

import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -67,7 +68,8 @@ public TableIntoOperator(
Map<String, TSDataType> measurementToDataTypeMap,
boolean isAligned,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
long statementSizePerLine,
ColumnSchema timeColumnOfTargetTable) {
super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine);
this.maxReturnSize = MAX_RETURN_SIZE;
insertTabletStatementGenerator =
Expand All @@ -79,7 +81,8 @@ public TableIntoOperator(
inputColumnTypes,
inputColumnCategories,
isAligned,
maxRowNumberInStatement);
maxRowNumberInStatement,
timeColumnOfTargetTable.getName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,22 @@ private Pair<String, TsTable> parseTable4CreateTableOrView(
table.addProp(TsTable.COMMENT_KEY, node.getComment());
}

// check if the time column has been specified
long timeColumnCount =
node.getElements().stream()
.filter(
columnDefinition ->
columnDefinition.getColumnCategory() == TsTableColumnCategory.TIME)
.count();
if (timeColumnCount > 1) {
throw new SemanticException("A table cannot have more than one time column");
}
if (timeColumnCount == 0) {
// append the time column with default name "time" if user do not specify the time column
table.addColumnSchema(new TimeColumnSchema(TIME_COLUMN_NAME, TSDataType.TIMESTAMP));
}

// TODO: Place the check at statement analyzer
boolean hasTimeColumn = false;
final Set<String> sourceNameSet = new HashSet<>();
boolean hasObject = false;
for (final ColumnDefinition columnDefinition : node.getElements()) {
Expand All @@ -590,15 +604,18 @@ private Pair<String, TsTable> parseTable4CreateTableOrView(
final TSDataType dataType = getDataType(columnDefinition.getType());
hasObject |= dataType == TSDataType.OBJECT;
final String comment = columnDefinition.getComment();
if (checkTimeColumnIdempotent(category, columnName, dataType, comment, table)
&& !hasTimeColumn) {
hasTimeColumn = true;
continue;
}

if (table.getColumnSchema(columnName) != null) {
throw new SemanticException(
String.format("Columns in table shall not share the same name %s.", columnName));
}

// allow the user create time column
if (category == TsTableColumnCategory.TIME) {
validateAndGenerateTimeColumn(columnName, dataType, comment, table);
continue;
}

final TsTableColumnSchema schema =
TableHeaderSchemaValidator.generateColumnSchema(
category,
Expand Down Expand Up @@ -627,6 +644,25 @@ private Pair<String, TsTable> parseTable4CreateTableOrView(
return new Pair<>(database, table);
}

private void validateAndGenerateTimeColumn(
final String columnName,
final TSDataType dataType,
final String comment,
final TsTable table) {

if (dataType == TSDataType.TIMESTAMP) {
final TsTableColumnSchema timeColumnSchema =
new TimeColumnSchema(columnName, TSDataType.TIMESTAMP);
if (Objects.nonNull(comment)) {
timeColumnSchema.getProps().put(TsTable.COMMENT_KEY, comment);
}
table.addColumnSchema(timeColumnSchema);

} else {
throw new SemanticException("The time column's type shall be 'timestamp'.");
}
}

@Override
protected IConfigTask visitAlterColumnDataType(
AlterColumnDataType node, MPPQueryContext context) {
Expand All @@ -647,34 +683,6 @@ protected IConfigTask visitAlterColumnDataType(
node.isView());
}

private boolean checkTimeColumnIdempotent(
final TsTableColumnCategory category,
final String columnName,
final TSDataType dataType,
final String comment,
final TsTable table) {
if (category == TsTableColumnCategory.TIME || columnName.equals(TIME_COLUMN_NAME)) {
if (category == TsTableColumnCategory.TIME
&& columnName.equals(TIME_COLUMN_NAME)
&& dataType == TSDataType.TIMESTAMP) {
if (Objects.nonNull(comment)) {
final TsTableColumnSchema columnSchema =
new TimeColumnSchema(TIME_COLUMN_NAME, TSDataType.TIMESTAMP);
columnSchema.getProps().put(TsTable.COMMENT_KEY, comment);
table.addColumnSchema(columnSchema);
}
return true;
} else if (dataType == TSDataType.TIMESTAMP) {
throw new SemanticException(
"The time column category shall be bounded with column name 'time'.");
} else {
throw new SemanticException("The time column's type shall be 'timestamp'.");
}
}

return false;
}

@Override
protected IConfigTask visitRenameTable(final RenameTable node, final MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
Expand Down Expand Up @@ -3803,13 +3804,18 @@ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
int index = map.get(originInputColumnNames.get(i));
inputColumns.set(index, originColumns.get(i));
}

ColumnSchema timeColumnOfTargetTable = null;
for (int i = 0; i < inputColumns.size(); i++) {
String columnName = inputColumns.get(i).getName();
inputLocationMap.put(columnName, new InputLocation(0, i));

TsTableColumnCategory columnCategory = inputColumns.get(i).getColumnCategory();
if (columnCategory == TIME) {
if (timeColumnOfTargetTable == null) {
timeColumnOfTargetTable = inputColumns.get(i);
} else {
throw new SemanticException("Multiple columns with TIME category found");
}
continue;
}

Expand All @@ -3818,6 +3824,9 @@ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
inputColumnTypes.add(columnType);
inputColumnCategories.add(columnCategory);
}
if (timeColumnOfTargetTable == null) {
throw new SemanticException("Missing TIME category column");
}

long statementSizePerLine =
OperatorGeneratorUtil.calculateStatementSizePerLine(inputColumnTypes);
Expand All @@ -3833,7 +3842,8 @@ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
tsDataTypeMap,
true,
FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
statementSizePerLine);
statementSizePerLine,
timeColumnOfTargetTable);
}

private boolean[] checkStatisticAndScanOrder(
Expand Down Expand Up @@ -3878,6 +3888,22 @@ private boolean[] checkStatisticAndScanOrder(
break;
}

// first and last, the second argument has to be the time column
if (FIRST_AGGREGATION.equals(funcName) || LAST_AGGREGATION.equals(funcName)) {
if (!isTimeColumn(aggregation.getArguments().get(1), timeColumnName)) {
canUseStatistic = false;
break;
}
}

// first_by and last_by, the second argument has to be the time column
if (FIRST_BY_AGGREGATION.equals(funcName) || LAST_BY_AGGREGATION.equals(funcName)) {
if (!isTimeColumn(aggregation.getArguments().get(2), timeColumnName)) {
canUseStatistic = false;
break;
}
}

// only last_by(time, x) or last_by(x,time) can use statistic
if ((LAST_BY_AGGREGATION.equals(funcName) || FIRST_BY_AGGREGATION.equals(funcName))
&& !isTimeColumn(aggregation.getArguments().get(0), timeColumnName)
Expand Down Expand Up @@ -3923,6 +3949,12 @@ private OptimizeType canUseLastCacheOptimize(
return OptimizeType.NOOP;
}

// if the timeColumnName is null, the param of function is just a timestamp column other than
// the time column
if (timeColumnName == null || !checkOrderColumnIsTime(node.getAggregations(), timeColumnName)) {
return OptimizeType.NOOP;
}

if (canUseLastRowOptimize(aggregators)) {
return OptimizeType.LAST_ROW;
}
Expand All @@ -3934,6 +3966,33 @@ private OptimizeType canUseLastCacheOptimize(
return OptimizeType.NOOP;
}

/**
* Checks if the ordering column in aggregations matches the time column. only check for
* FIRST/LAST/FIRST_BY/LAST_BY
*/
private boolean checkOrderColumnIsTime(
Map<Symbol, AggregationNode.Aggregation> aggregations, String timeColumnName) {

for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : aggregations.entrySet()) {
String functionName =
entry.getValue().getResolvedFunction().getSignature().getName().toLowerCase();
List<Expression> arguments = entry.getValue().getArguments();
Expression lastParam = entry.getValue().getArguments().get(arguments.size() - 1);

switch (functionName) {
case FIRST_AGGREGATION:
case LAST_AGGREGATION:
case FIRST_BY_AGGREGATION:
case LAST_BY_AGGREGATION:
if (!((SymbolReference) lastParam).getName().equalsIgnoreCase(timeColumnName)) {
return false;
}
break;
}
}
return true;
}

private boolean canUseLastRowOptimize(List<TableAggregator> aggregators) {
for (TableAggregator aggregator : aggregators) {
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
Expand Down
Loading