Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@
import java.util.Objects;

public class TableSchema {

// the tableName is not serialized since the TableSchema is always stored in a Map, from whose
// key the tableName can be known
protected String tableName;
protected List<IMeasurementSchema> columnSchemas;
protected List<ColumnType> columnTypes;
protected boolean updatable = false;

// columnName -> pos in columnSchemas;
// columnName -> pos in columnSchemas
private Map<String, Integer> columnPosIndex;
// columnName -> pos in all id columns
private Map<String, Integer> idColumnOrder;

public TableSchema(String tableName) {
this.tableName = tableName;
Expand All @@ -69,6 +72,16 @@ public Map<String, Integer> getColumnPosIndex() {
return columnPosIndex;
}

public Map<String, Integer> getIdColumnOrder() {
if (idColumnOrder == null) {
idColumnOrder = new HashMap<>();
}
return idColumnOrder;
}

/**
* @return i if the given column is the i-th column, -1 if the column is not in the schema
*/
public int findColumnIndex(String columnName) {
return getColumnPosIndex()
.computeIfAbsent(
Expand All @@ -83,6 +96,28 @@ public int findColumnIndex(String columnName) {
});
}

/**
* @return i if the given column is the i-th ID column, -1 if the column is not in the schema or
* not an ID column
*/
public int findIdColumnOrder(String columnName) {
return getIdColumnOrder()
.computeIfAbsent(
columnName,
colName -> {
int columnOrder = 0;
for (int i = 0; i < columnSchemas.size(); i++) {
if (columnSchemas.get(i).getMeasurementId().equals(columnName)
&& columnTypes.get(i) == ColumnType.ID) {
return columnOrder;
} else if (columnTypes.get(i) == ColumnType.ID) {
columnOrder++;
}
}
return -1;
});
}

public IMeasurementSchema findColumnSchema(String columnName) {
final int columnIndex = findColumnIndex(columnName);
return columnIndex >= 0 ? columnSchemas.get(columnIndex) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public SingleDeviceTsBlockReader(
for (String idColumn : task.getColumnMapping().getIdColumns()) {
final List<Integer> columnPosInResult = task.getColumnMapping().getColumnPos(idColumn);
// the first segment in DeviceId is the table name
final int columnPosInId = task.getTableSchema().findColumnIndex(idColumn) + 1;
final int columnPosInId = task.getTableSchema().findIdColumnOrder(idColumn) + 1;
idColumnContextMap.put(idColumn, new IdColumnContext(columnPosInResult, columnPosInId));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,26 @@ public void tabletSerializationTest() throws IOException {
assertEquals(tablet, deserialized);
}

@Test
public void testWriterWithIDOrderUnfixed()
throws IOException, WriteProcessException, ReadProcessException {
TableSchema tableSchema = genMixedTableSchema(0);
testWrite(tableSchema);
}

@Test
public void testWriteOneTable() throws IOException, WriteProcessException, ReadProcessException {
testWrite(testTableSchema);
}

private void testWrite(TableSchema tableSchema)
throws IOException, WriteProcessException, ReadProcessException {
final File testFile = new File(testDir, "testFile");
TsFileWriter writer = new TsFileWriter(testFile);
writer.setGenerateTableSchema(true);
writer.registerTableSchema(testTableSchema);
writer.registerTableSchema(tableSchema);

writer.writeTable(genTablet(testTableSchema, 0, 100));
writer.writeTable(genTablet(tableSchema, 0, 100));
writer.close();

TsFileSequenceReader sequenceReader = new TsFileSequenceReader(testFile.getAbsolutePath());
Expand All @@ -111,18 +123,18 @@ public void testWriteOneTable() throws IOException, WriteProcessException, ReadP
TableQueryOrdering.DEVICE);

final List<String> columns =
testTableSchema.getColumnSchemas().stream()
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getMeasurementId)
.collect(Collectors.toList());
final TsBlockReader reader =
tableQueryExecutor.query(testTableSchema.getTableName(), columns, null, null, null);
tableQueryExecutor.query(tableSchema.getTableName(), columns, null, null, null);
assertTrue(reader.hasNext());
int cnt = 0;
while (reader.hasNext()) {
final TsBlock result = reader.next();
for (int i = 0; i < result.getPositionCount(); i++) {
String col = result.getColumn(0).getObject(i).toString();
for (int j = 1; j < testTableSchema.getColumnSchemas().size(); j++) {
for (int j = 1; j < tableSchema.getColumnSchemas().size(); j++) {
assertEquals(col, result.getColumn(j).getObject(i).toString());
}
}
Expand Down Expand Up @@ -399,4 +411,35 @@ private TableSchema genTableSchema(int tableNum) {
}
return new TableSchema("testTable" + tableNum, measurementSchemas, columnTypes);
}

private TableSchema genMixedTableSchema(int tableNum) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
List<ColumnType> columnTypes = new ArrayList<>();

int idIndex = 0;
int measurementIndex = 0;

while (idIndex < idSchemaNum || measurementIndex < measurementSchemaNum) {
if (idIndex < idSchemaNum) {
measurementSchemas.add(
new MeasurementSchema(
"id" + idIndex, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED));
columnTypes.add(ColumnType.ID);
idIndex++;
}

if (measurementIndex < measurementSchemaNum) {
measurementSchemas.add(
new MeasurementSchema(
"s" + measurementIndex,
TSDataType.INT64,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
columnTypes.add(ColumnType.MEASUREMENT);
measurementIndex++;
}
}

return new TableSchema("testTable" + tableNum, measurementSchemas, columnTypes);
}
}