Skip to content

Commit

Permalink
[#3347] improvement(jdbc-doris): support creating Doris table with pa…
Browse files Browse the repository at this point in the history
…rtition (#3380)

### What changes were proposed in this pull request?

support creating Doris table with partition

### Why are the changes needed?

Fix: #3347 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

IT

Co-authored-by: XiaoZ <57973980+xiaozcy@users.noreply.github.com>
Co-authored-by: zhanghan18 <zhanghan18@xiaomi.com>
  • Loading branch information
3 people committed May 14, 2024
1 parent 7a6d045 commit 63893d4
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Strategy;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -35,8 +36,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -73,7 +76,7 @@ protected String generateCreateTableSql(
Map<String, String> properties,
Transform[] partitioning,
Distribution distribution,
com.datastrato.gravitino.rel.indexes.Index[] indexes) {
Index[] indexes) {

validateIncrementCol(columns);
validateDistribution(distribution, columns);
Expand Down Expand Up @@ -107,6 +110,9 @@ protected String generateCreateTableSql(
sqlBuilder.append(" COMMENT \"").append(comment).append("\"");
}

// Add Partition Info
appendPartitionSql(partitioning, columns, sqlBuilder);

// Add distribution info
if (distribution.strategy() == Strategy.HASH) {
sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY HASH(");
Expand All @@ -126,12 +132,6 @@ protected String generateCreateTableSql(
// Add table properties
sqlBuilder.append(NEW_LINE).append(DorisUtils.generatePropertiesSql(properties));

// Add Partition Info
if (partitioning != null && partitioning.length > 0) {
// TODO: Add partitioning support
throw new UnsupportedOperationException("Currently we do not support Partitioning in Doris");
}

// Return the generated SQL statement
String result = sqlBuilder.toString();

Expand Down Expand Up @@ -169,9 +169,7 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]
}
}

@VisibleForTesting
static void appendIndexesSql(
com.datastrato.gravitino.rel.indexes.Index[] indexes, StringBuilder sqlBuilder) {
private static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {

if (indexes.length == 0) {
return;
Expand All @@ -194,6 +192,57 @@ static void appendIndexesSql(
sqlBuilder.append(",").append(NEW_LINE).append(indexSql);
}

private static void appendPartitionSql(
Transform[] partitioning, JdbcColumn[] columns, StringBuilder sqlBuilder) {
if (ArrayUtils.isEmpty(partitioning)) {
return;
}
Preconditions.checkArgument(
partitioning.length == 1, "Composite partition type is not supported");

StringBuilder partitionSqlBuilder = new StringBuilder();
Set<String> columnNames =
Arrays.stream(columns).map(JdbcColumn::name).collect(Collectors.toSet());

if (partitioning[0] instanceof Transforms.RangeTransform) {
partitionSqlBuilder.append(NEW_LINE).append(" PARTITION BY RANGE(");
// TODO support multi-column range partitioning in doris
Transforms.RangeTransform rangePartition = (Transforms.RangeTransform) partitioning[0];

Preconditions.checkArgument(
rangePartition.fieldName().length == 1, "Doris partition does not support nested field");
Preconditions.checkArgument(
columnNames.contains(rangePartition.fieldName()[0]),
"The partition field must be one of the columns");

String partitionColumn = BACK_QUOTE + rangePartition.fieldName()[0] + BACK_QUOTE;
// TODO we currently do not support pre-assign partition when creating range partitioning
partitionSqlBuilder.append(partitionColumn).append(") () ");
} else if (partitioning[0] instanceof Transforms.ListTransform) {
Transforms.ListTransform listPartition = (Transforms.ListTransform) partitioning[0];
partitionSqlBuilder.append(" PARTITION BY LIST(");

ImmutableList.Builder<String> partitionColumnsBuilder = ImmutableList.builder();
String[][] filedNames = listPartition.fieldNames();
for (String[] filedName : filedNames) {
Preconditions.checkArgument(
filedName.length == 1, "Doris partition does not support nested field");
Preconditions.checkArgument(
columnNames.contains(filedName[0]), "The partition field must be one of the columns");

partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
}
String partitionColumns =
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));
// TODO we currently do not support pre-assign partition when creating list partitioning table
partitionSqlBuilder.append(partitionColumns).append(") () ");
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
}

sqlBuilder.append(partitionSqlBuilder);
}

@Override
protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException {
return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.datastrato.gravitino.rel.types.Type;
Expand Down Expand Up @@ -411,4 +413,72 @@ public void testCreateNotSupportTypeTable() {
"Couldn't convert Gravitino type %s to Doris type", type.simpleString())));
}
}

@Test
public void testCreateTableWithPartition() {
String tableComment = "partition_table_comment";
JdbcColumn col1 =
JdbcColumn.builder()
.withName("col_1")
.withType(Types.IntegerType.get())
.withNullable(false)
.build();
JdbcColumn col2 =
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
JdbcColumn col3 =
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
JdbcColumn col4 =
JdbcColumn.builder()
.withName("col_4")
.withType(Types.DateType.get())
.withNullable(false)
.build();
List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
Distribution distribution =
Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
Index[] indexes = new Index[] {};

// create table with range partition
String rangePartitionTableName = GravitinoITUtils.genRandomName("range_partition_table");
Transform[] rangePartition = new Transform[] {Transforms.range(new String[] {col4.name()})};
TABLE_OPERATIONS.create(
databaseName,
rangePartitionTableName,
columns.toArray(new JdbcColumn[] {}),
tableComment,
createProperties(),
rangePartition,
distribution,
indexes);
JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, rangePartitionTableName);
assertionsTableInfo(
rangePartitionTableName,
tableComment,
columns,
Collections.emptyMap(),
null,
rangePartitionTable);

// create table with list partition
String listPartitionTableName = GravitinoITUtils.genRandomName("list_partition_table");
Transform[] listPartition =
new Transform[] {Transforms.list(new String[] {col1.name()}, new String[] {col4.name()})};
TABLE_OPERATIONS.create(
databaseName,
listPartitionTableName,
columns.toArray(new JdbcColumn[] {}),
tableComment,
createProperties(),
listPartition,
distribution,
indexes);
JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, listPartitionTableName);
assertionsTableInfo(
listPartitionTableName,
tableComment,
columns,
Collections.emptyMap(),
null,
listPartitionTable);
}
}

0 comments on commit 63893d4

Please sign in to comment.