Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ public String getSystemPropertiesPath() {
return workDirFilePath("data/datanode/system/schema", IoTDBStartCheck.PROPERTIES_FILE_NAME);
}

public String getDataDir() {
return getNodePath() + File.separator + "data";
}

public String getWalDir() {
return getDataDir() + File.separator + "datanode" + File.separator + "wal";
}

@Override
protected MppJVMConfig initVMConfig() {
return MppJVMConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
*/
package org.apache.iotdb.relational.it.session;

import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
Expand All @@ -43,6 +49,8 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.ArrayList;
Expand All @@ -56,6 +64,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
Expand Down Expand Up @@ -1529,4 +1538,77 @@ public void autoCreateTagColumnTest2()
assertEquals(30, cnt);
}
}

@Test
public void testAttrColumnRemoved()
throws IoTDBConnectionException, StatementExecutionException, IOException {
EnvFactory.getEnv().cleanClusterEnvironment();
EnvFactory.getEnv().getConfig().getCommonConfig().setWalMode("SYNC");
EnvFactory.getEnv().initClusterEnvironment();
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
session.executeNonQueryStatement("create database if not exists db1");
session.executeNonQueryStatement("use db1");
session.executeNonQueryStatement(
"CREATE TABLE remove_attr_col (tag1 string tag, attr1 string attribute, "
+ "m1 double "
+ "field)");

// insert tablet to WAL
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("tag1", TSDataType.STRING));
schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
final List<ColumnCategory> columnTypes =
Arrays.asList(ColumnCategory.TAG, ColumnCategory.ATTRIBUTE, ColumnCategory.FIELD);

long timestamp = 0;
Tablet tablet =
new Tablet(
"remove_attr_col",
IMeasurementSchema.getMeasurementNameList(schemaList),
IMeasurementSchema.getDataTypeList(schemaList),
columnTypes);

for (int rowIndex = 0; rowIndex < 10; rowIndex++) {
tablet.addTimestamp(rowIndex, timestamp);
tablet.addValue("tag1", rowIndex, "tag:1");
tablet.addValue("attr1", rowIndex, "attr:" + timestamp);
tablet.addValue("m1", rowIndex, timestamp * 1.0);
timestamp++;
}
session.insert(tablet);
tablet.reset();

// insert records to WAL
session.executeNonQueryStatement(
"INSERT INTO remove_attr_col (time, tag1, attr1, m1) VALUES (10, 'tag:1', 'attr:10', 10.0)");

// check WAL
for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) {
String walNodeDir = dataNodeWrapper.getWalDir() + File.separator + "0";
File[] walFiles = new File(walNodeDir).listFiles(f -> f.getName().endsWith(".wal"));
if (walFiles != null && walFiles.length > 0) {
File walFile = walFiles[0];
WALEntry entry;
try (WALReader walReader = new WALReader(walFile)) {
entry = walReader.next();
RelationalInsertTabletNode tabletNode = (RelationalInsertTabletNode) entry.getValue();
assertTrue(
Arrays.stream(tabletNode.getColumnCategories())
.noneMatch(c -> c == TsTableColumnCategory.ATTRIBUTE));

entry = walReader.next();
RelationalInsertRowsNode rowsNode = (RelationalInsertRowsNode) entry.getValue();
assertTrue(
Arrays.stream(rowsNode.getInsertRowNodeList().get(0).getColumnCategories())
.noneMatch(c -> c == TsTableColumnCategory.ATTRIBUTE));
return;
}
}
}
} finally {
EnvFactory.getEnv().cleanClusterEnvironment();
EnvFactory.getEnv().initClusterEnvironment();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static void validate(
insertStatement.validateTableSchema(metadata, context);
insertStatement.updateAfterSchemaValidation(context);
insertStatement.validateDeviceSchema(metadata, context);
insertStatement.removeAttributeColumns();
} catch (final QueryProcessException e) {
throw new SemanticException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,47 +163,61 @@ public void adjustIdColumns(
tableSchema = null;
}

public static void processNonExistColumn(
ColumnSchema incoming, InsertBaseStatement innerTreeStatement, int i) {
// the column does not exist and auto-creation is disabled
SemanticException semanticException =
new SemanticException(
"Column " + incoming.getName() + " does not exists or fails to be " + "created",
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD
|| !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// non-measurement columns cannot be partially inserted
throw semanticException;
} else {
// partial insertion
innerTreeStatement.markFailedMeasurement(i, semanticException);
}
}

public static void processTypeConflictColumn(
ColumnSchema incoming, ColumnSchema real, int i, InsertBaseStatement innerTreeStatement) {
SemanticException semanticException =
new SemanticException(
String.format(
"Incompatible data type of column %s: %s/%s",
incoming.getName(), incoming.getType(), real.getType()),
TSStatusCode.DATA_TYPE_MISMATCH.getStatusCode());
if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD
|| !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// non-measurement columns cannot be partially inserted
throw semanticException;
} else {
// partial insertion
innerTreeStatement.markFailedMeasurement(i, semanticException);
}
}

public static void validateTableSchema(
ColumnSchema incoming, ColumnSchema real, int i, InsertBaseStatement innerTreeStatement) {
if (real == null) {
// the column does not exist and auto-creation is disabled
SemanticException semanticException =
new SemanticException(
"Column " + incoming.getName() + " does not exists or fails to be " + "created",
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD
|| !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// non-measurement columns cannot be partially inserted
throw semanticException;
} else {
// partial insertion
innerTreeStatement.markFailedMeasurement(i, semanticException);
return;
}
processNonExistColumn(incoming, innerTreeStatement, i);
return;
}

// check data type
if (incoming.getType() == null || incoming.getColumnCategory() != TsTableColumnCategory.FIELD) {
// sql insertion does not provide type
// the type is inferred and can be inconsistent with the existing one
innerTreeStatement.setDataType(InternalTypeManager.getTSDataType(real.getType()), i);
} else if (!InternalTypeManager.getTSDataType(real.getType())
.isCompatible(InternalTypeManager.getTSDataType(incoming.getType()))
&& !innerTreeStatement.isForceTypeConversion()) {
SemanticException semanticException =
new SemanticException(
String.format(
"Incompatible data type of column %s: %s/%s",
incoming.getName(), incoming.getType(), real.getType()),
TSStatusCode.DATA_TYPE_MISMATCH.getStatusCode());
if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD
|| !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// non-measurement columns cannot be partially inserted
throw semanticException;
} else {
// partial insertion
innerTreeStatement.markFailedMeasurement(i, semanticException);
return;
}
processTypeConflictColumn(incoming, real, i, innerTreeStatement);
return;
}

// check column category
if (incoming.getColumnCategory() == null) {
// sql insertion does not provide category
innerTreeStatement.setColumnCategory(real.getColumnCategory(), i);
Expand All @@ -214,6 +228,8 @@ public static void validateTableSchema(
incoming.getName(), incoming.getColumnCategory(), real.getColumnCategory()),
TSStatusCode.COLUMN_CATEGORY_MISMATCH.getStatusCode());
}

// construct measurement schema
TSDataType tsDataType = InternalTypeManager.getTSDataType(real.getType());
MeasurementSchema measurementSchema =
new MeasurementSchema(
Expand Down Expand Up @@ -244,4 +260,8 @@ public String getDatabase() {
public void toLowerCase() {
getInnerTreeStatement().toLowerCase();
}

public void removeAttributeColumns() {
getInnerTreeStatement().removeAttributeColumns();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -368,6 +369,55 @@ public List<String> getFailedMessages() {
.collect(Collectors.toList());
}

@TableModel
public void removeAttributeColumns() {
if (columnCategories == null) {
return;
}

List<Integer> columnsToKeep = new ArrayList<>();
for (int i = 0; i < columnCategories.length; i++) {
if (!columnCategories[i].equals(TsTableColumnCategory.ATTRIBUTE)) {
columnsToKeep.add(i);
}
}

if (columnsToKeep.size() == columnCategories.length) {
return;
}

if (failedMeasurementIndex2Info != null) {
failedMeasurementIndex2Info =
failedMeasurementIndex2Info.entrySet().stream()
.collect(Collectors.toMap(e -> columnsToKeep.indexOf(e.getKey()), Entry::getValue));
}

if (measurementSchemas != null) {
measurementSchemas =
columnsToKeep.stream().map(i -> measurementSchemas[i]).toArray(MeasurementSchema[]::new);
}
if (measurements != null) {
measurements = columnsToKeep.stream().map(i -> measurements[i]).toArray(String[]::new);
}
if (dataTypes != null) {
dataTypes = columnsToKeep.stream().map(i -> dataTypes[i]).toArray(TSDataType[]::new);
}
if (columnCategories != null) {
columnCategories =
columnsToKeep.stream()
.map(i -> columnCategories[i])
.toArray(TsTableColumnCategory[]::new);
}

subRemoveAttributeColumns(columnsToKeep);

// to reconstruct indices
idColumnIndices = null;
attrColumnIndices = null;
}

protected abstract void subRemoveAttributeColumns(List<Integer> columnsToKeep);

public static class FailedMeasurementInfo {
protected String measurement;
protected TSDataType dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.tsfile.exception.NotImplementedException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -172,4 +173,14 @@ public Optional<String> getDatabaseName() {
}
return database;
}

@Override
public void removeAttributeColumns() {
subRemoveAttributeColumns(Collections.emptyList());
}

@Override
protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) {
insertTabletStatementList.forEach(InsertBaseStatement::removeAttributeColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,4 +526,11 @@ public void swapColumn(int src, int target) {
super.swapColumn(src, target);
CommonUtils.swapArray(values, src, target);
}

@Override
protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) {
if (values != null) {
values = columnsToKeep.stream().map(i -> values[i]).toArray();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,9 @@ public Optional<String> getDatabaseName() {
}
return database;
}

@Override
protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) {
insertRowStatementList.forEach(InsertRowStatement::removeAttributeColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -209,4 +210,14 @@ public Optional<String> getDatabaseName() {
public Statement toRelationalStatement(MPPQueryContext context) {
return new InsertRows(this, context);
}

@Override
public void removeAttributeColumns() {
subRemoveAttributeColumns(Collections.emptyList());
}

@Override
protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) {
insertRowStatementList.forEach(InsertBaseStatement::removeAttributeColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,14 @@ public boolean isNull(int row, int col) {
}
return nullBitMaps[col].isMarked(row);
}

@Override
protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) {
if (columns != null) {
columns = columnsToKeep.stream().map(i -> columns[i]).toArray();
}
if (nullBitMaps != null) {
nullBitMaps = columnsToKeep.stream().map(i -> nullBitMaps[i]).toArray(BitMap[]::new);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public long getWALCurrentReadOffset() throws IOException {
* @throws NoSuchElementException when not calling hasNext before.
*/
public WALEntry next() {
if (nextEntry == null) {
if (!hasNext()) {
throw new NoSuchElementException();
}
WALEntry next = nextEntry;
Expand Down
Loading