Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix mv on non-partitioned jdbc table #31177

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,30 +231,22 @@ private static String getHiveFormatStringValue(DateLiteral dateLiteral) {
}

public static List<String> getPartitionNames(Table table) {
if (table.isUnPartitioned()) {
return Lists.newArrayList(table.getName());
}
List<String> partitionNames = null;
if (table.isHiveTable() || table.isHudiTable()) {
HiveMetaStoreTable hmsTable = (HiveMetaStoreTable) table;
if (hmsTable.isUnPartitioned()) {
// return table name if table is unpartitioned
return Lists.newArrayList(hmsTable.getTableName());
}
partitionNames = GlobalStateMgr.getCurrentState().getMetadataMgr()
.listPartitionNames(hmsTable.getCatalogName(), hmsTable.getDbName(), hmsTable.getTableName());
} else if (table.isIcebergTable()) {
IcebergTable icebergTable = (IcebergTable) table;
if (icebergTable.isUnPartitioned()) {
// return table name if table is unpartitioned
return Lists.newArrayList(icebergTable.getRemoteTableName());
}
partitionNames = GlobalStateMgr.getCurrentState().getMetadataMgr().listPartitionNames(
icebergTable.getCatalogName(), icebergTable.getRemoteDbName(), icebergTable.getRemoteTableName());
} else if (table.isJDBCTable()) {
JDBCTable jdbcTable = (JDBCTable) table;
partitionNames = GlobalStateMgr.getCurrentState().getMetadataMgr().listPartitionNames(
jdbcTable.getCatalogName(), jdbcTable.getDbName(), jdbcTable.getJdbcTable());
if (partitionNames.size() == 0) {
return Lists.newArrayList(jdbcTable.getJdbcTable());
}
} else {
Preconditions.checkState(false, "Do not support get partition names and columns for" +
"table type %s", table.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.connector.exception.StarRocksConnectorException;
import org.jetbrains.annotations.NotNull;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -130,7 +131,8 @@ public Type convertColumnType(int dataType, String typeName, int columnSize, int

@Override
public List<String> listPartitionNames(Connection connection, String databaseName, String tableName) {
String partitionNamesQuery = "SELECT PARTITION_DESCRIPTION FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? " +
String partitionNamesQuery =
"SELECT PARTITION_DESCRIPTION as NAME FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? " +
"AND TABLE_NAME = ? AND PARTITION_NAME IS NOT NULL";
try (PreparedStatement ps = connection.prepareStatement(partitionNamesQuery)) {
ps.setString(1, databaseName);
Expand All @@ -139,7 +141,7 @@ public List<String> listPartitionNames(Connection connection, String databaseNam
ImmutableList.Builder<String> list = ImmutableList.builder();
if (null != rs) {
while (rs.next()) {
String[] partitionNames = rs.getString("PARTITION_DESCRIPTION").
String[] partitionNames = rs.getString("NAME").
replace("'", "").split(",");
for (String partitionName : partitionNames) {
list.add(partitionName);
Expand Down Expand Up @@ -180,18 +182,15 @@ public List<String> listPartitionColumns(Connection connection, String databaseN

public List<Partition> getPartitions(Connection connection, Table table) {
JDBCTable jdbcTable = (JDBCTable) table;
String partitionsQuery = "SELECT PARTITION_DESCRIPTION, " +
"IF(UPDATE_TIME IS NULL, CREATE_TIME, UPDATE_TIME) AS MODIFIED_TIME " +
"FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " +
"AND PARTITION_NAME IS NOT NULL";
try (PreparedStatement ps = connection.prepareStatement(partitionsQuery)) {
String query = getPartitionQuery(table);
try (PreparedStatement ps = connection.prepareStatement(query)) {
ps.setString(1, jdbcTable.getDbName());
ps.setString(2, jdbcTable.getJdbcTable());
ResultSet rs = ps.executeQuery();
ImmutableList.Builder<Partition> list = ImmutableList.builder();
if (null != rs) {
while (rs.next()) {
String[] partitionNames = rs.getString("PARTITION_DESCRIPTION").
String[] partitionNames = rs.getString("NAME").
replace("'", "").split(",");
long createTime = rs.getDate("MODIFIED_TIME").getTime();
for (String partitionName : partitionNames) {
Expand All @@ -206,4 +205,16 @@ public List<Partition> getPartitions(Connection connection, Table table) {
throw new StarRocksConnectorException(e.getMessage());
}
}

@NotNull
private static String getPartitionQuery(Table table) {
final String partitionsQuery = "SELECT PARTITION_DESCRIPTION AS NAME, " +
"IF(UPDATE_TIME IS NULL, CREATE_TIME, UPDATE_TIME) AS MODIFIED_TIME " +
"FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " +
"AND PARTITION_NAME IS NOT NULL";
final String nonPartitionQuery = "SELECT TABLE_NAME AS NAME, " +
"IF(UPDATE_TIME IS NULL, CREATE_TIME, UPDATE_TIME) AS MODIFIED_TIME " +
"FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ";
return table.isUnPartitioned() ? nonPartitionQuery : partitionsQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@

package com.starrocks.connector.jdbc;

import com.google.common.collect.Lists;
import com.mockrunner.mock.jdbc.MockResultSet;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.JDBCResource;
import com.starrocks.catalog.JDBCTable;
import com.starrocks.catalog.Type;
import com.starrocks.common.DdlException;
import com.starrocks.connector.PartitionUtil;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
Expand Down Expand Up @@ -57,7 +60,7 @@ public class MysqlSchemaResolverTest {
@Before
public void setUp() throws SQLException {
partitionsResult = new MockResultSet("partitions");
partitionsResult.addColumn("PARTITION_DESCRIPTION", Arrays.asList("'20230810'"));
partitionsResult.addColumn("NAME", Arrays.asList("'20230810'"));
partitionsResult.addColumn("PARTITION_EXPRESSION", Arrays.asList("`d`"));
partitionsResult.addColumn("MODIFIED_TIME", Arrays.asList("2023-08-01"));
properties = new HashMap<>();
Expand Down Expand Up @@ -161,6 +164,18 @@ public void testGetPartitions() {
}
}

@Test
public void testGetPartitions_NonPartitioned() throws DdlException {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
List<Column> columns = Arrays.asList(new Column("d", Type.VARCHAR));
JDBCTable jdbcTable = new JDBCTable(100000, "tbl1", columns, Lists.newArrayList(),
"test", "catalog", properties);
int size = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size();
Assert.assertEquals(1, size);
List<String> partitionNames = PartitionUtil.getPartitionNames(jdbcTable);
Assert.assertEquals(Arrays.asList("tbl1"), partitionNames);
}

@Test
public void testGetPartitionsRsNull() {
try {
Expand Down