Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -852,10 +852,23 @@ public int getMaxRows(int batch, int paramSize) {
// SQL Server supports a maximum of 2100 parameters in a request. Adjust the maxRowsInBatch accordingly
int maxAllowedRows = (2100 - paramSize) / paramSize;
return Math.min(batch, maxAllowedRows);
} else if (isPOSTGRES()) {
int maxAllowedRows = (32767 - paramSize) / paramSize;
return Math.min(batch, maxAllowedRows);
}
return batch;
}

public int getMaxBatch(int batch, int totalQueryParams) {
int minBatch = batch;
if (isSQLSERVER()) {
minBatch = (totalQueryParams + 2100) / 2100;
} else if (isPOSTGRES()) {
minBatch = (totalQueryParams + 32767) / 32767;
}
return batch <= 0 ? minBatch : Math.max(batch, minBatch);
}

// This class implements the Configurable interface for the benefit
// of "plugin" instances created via reflection (see invocation of
// ReflectionUtils.newInstance in method determineDatabaseProduct)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class DirectSqlAggrStats {
private static final Logger LOG = LoggerFactory.getLogger(DirectSqlAggrStats.class);
private final PersistenceManager pm;
private final int batchSize;
private final DatabaseProduct dbType;

@java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
@java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
Expand All @@ -81,7 +82,7 @@ class DirectSqlAggrStats {

public DirectSqlAggrStats(PersistenceManager pm, Configuration conf, String schema) {
this.pm = pm;
DatabaseProduct dbType = PersistenceManagerProvider.getDatabaseProduct();
this.dbType = PersistenceManagerProvider.getDatabaseProduct();
int configBatchSize =
MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
if (configBatchSize == DETECT_BATCHING) {
Expand Down Expand Up @@ -184,34 +185,19 @@ public List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
final double ndvTuner, final boolean enableBitVector,
boolean enableKll) throws MetaException {
final boolean areAllPartsFound = (partsFound == partNames.size());
return Batchable.runBatched(batchSize, colNames, new Batchable<String, ColumnStatisticsObj>() {
int batch = dbType.getMaxBatch(batchSize, colNames.size() + partNames.size() + 4);
return Batchable.runBatched(batch, colNames, new Batchable<String, ColumnStatisticsObj>() {
@Override
public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException {
return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName,
partNames, inputColNames, engine, areAllPartsFound,
useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll);
}
});
}

/**
* Should be called with the list short enough to not trip up Oracle/etc.
*/
private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(
String catName,
String dbName, String tableName,
List<String> partNames, List<String> colNames, String engine,
boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation,
double ndvTuner, boolean enableBitVector,
boolean enableKll) throws MetaException {
if (enableBitVector || enableKll) {
return aggrStatsUseJava(catName, dbName, tableName, partNames,
colNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation,
ndvTuner, enableBitVector, enableKll);
} else {
return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, engine,
useDensityFunctionForNDVEstimation, ndvTuner);
}
if (enableBitVector || enableKll) {
return aggrStatsUseJava(catName, dbName, tableName, partNames,
inputColNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation,
ndvTuner, enableBitVector, enableKll);
} else {
return aggrStatsUseDB(catName, dbName, tableName, partNames, inputColNames, engine,
useDensityFunctionForNDVEstimation, ndvTuner);
}
}});
Comment on lines +188 to +200
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columnStatisticsObjForPartitions computes a DB-specific batch size but only applies it to batching over colNames. The SQL built in aggrStatsUseDB uses both inputColNames and the full partNames list as bind parameters (... in (%1$s) ... in (%2$s) plus 4 fixed params). If batchSize is NO_BATCHING (the default for Postgres today), the unbatched partNames can still exceed PostgreSQL’s 32767 parameter limit, so this change doesn’t fully address the failure mode.

To actually enforce the parameter limit, apply batching to the partNames dimension as well (or derive inputColNames and inputPartNames batch sizes from the same per-statement parameter budget, accounting for the 4 fixed params).

Copilot uses AI. Check for mistakes.
}

private List<ColumnStatisticsObj> aggrStatsUseJava(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege>
*/
public List<Partition> alterPartitions(MTable table, List<String> partNames,
List<Partition> newParts, String queryWriteIdList) throws MetaException {
List<Object[]> rows = Batchable.runBatched(batchSize, partNames, new Batchable<String, Object[]>() {
int batch = dbType.getMaxBatch(batchSize, partNames.size() + 3);
List<Object[]> rows = Batchable.runBatched(batch, partNames, new Batchable<String, Object[]>() {
@Override
public List<Object[]> run(List<String> input) throws Exception {
String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
Expand Down Expand Up @@ -717,10 +718,11 @@ public List<Partition> getPartitionsViaPartNames(final String catName, final Str
if (partNames.isEmpty()) {
return Collections.emptyList();
}
return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() {
int batch = dbType.getMaxBatch(batchSize, partNames.size() + 3);
return Batchable.runBatched(batch, partNames, new Batchable<String, Partition>() {
@Override
public List<Partition> run(List<String> input) throws MetaException {
return getPartitionsByNames(catName, dbName, tblName, partNames, false, args);
return getPartitionsByNames(catName, dbName, tblName, input, false, args);
}
});
}
Expand Down Expand Up @@ -756,12 +758,7 @@ public List<Partition> getPartitionsViaSqlPs(Table table, GetPartitionsArgs args
return Collections.emptyList(); // no partitions, bail early.
}
boolean isAcidTable = TxnUtils.isAcidTable(table);
return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return getPartitionsByPartitionIds(catName, dbName, tblName, input, isAcidTable, args);
}
});
return getPartitionsByPartitionIdsInBatch(catName, dbName, tblName, partitionIds, isAcidTable, args);
}

/**
Expand Down Expand Up @@ -811,11 +808,16 @@ public List<Partition> getPartitionsUsingProjectionAndFilterSpec(Table tbl,
filter.joins, null);
break;
case BY_NAMES:
String partNamesFilter =
"" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(filterSpec.getFilters().size())
+ ")";
partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, partNamesFilter,
filterSpec.getFilters(), Collections.EMPTY_LIST, null);
int batch = dbType.getMaxBatch(batchSize, filterSpec.getFiltersSize() + 3);
partitionIds = Batchable.runBatched(batch, filterSpec.getFilters(), new Batchable<String, Long>() {
@Override
public List<Long> run(List<String> input) throws Exception {
String partNamesFilter =
"" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
return getPartitionIdsViaSqlFilter(catName, dbName, tblName, partNamesFilter,
input, Collections.EMPTY_LIST, null);
}
});
break;
case BY_VALUES:
// we are going to use the SQL regex pattern in the LIKE clause below. So the default string
Expand Down Expand Up @@ -848,8 +850,9 @@ public List<Partition> getPartitionsUsingProjectionAndFilterSpec(Table tbl,
PartitionProjectionEvaluator projectionEvaluator =
new PartitionProjectionEvaluator(pm, fieldnameToTableName, partitionFields,
convertMapNullsToEmptyStrings, isView, includeParamKeyPattern, excludeParamKeyPattern);
int batch = dbType.getMaxBatch(batchSize, partitionIds.size());
// Get full objects. For Oracle/etc. do it in batches.
return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
return Batchable.runBatched(batch, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return projectionEvaluator.getPartitionsUsingProjectionList(input);
Expand Down Expand Up @@ -1010,10 +1013,6 @@ private List<Partition> getPartitionsByNames(String catName, String dbName,
throws MetaException {
// Get most of the fields for the partNames provided.
// Assume db and table names are the same for all partition, as provided in arguments.
String quotedPartNames = partNameList.stream()
.map(DirectSqlUpdatePart::quoteString)
.collect(Collectors.joining(","));

String queryText =
"select " + PARTITIONS + ".\"PART_ID\"," + SDS + ".\"SD_ID\"," + SDS + ".\"CD_ID\","
+ SERDES + ".\"SERDE_ID\"," + PARTITIONS + ".\"CREATE_TIME\"," + PARTITIONS
Expand All @@ -1025,12 +1024,12 @@ private List<Partition> getPartitionsByNames(String catName, String dbName,
+ " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" "
+ " inner join " + TBLS + " on " + TBLS + ".\"TBL_ID\" = " + PARTITIONS + ".\"TBL_ID\" "
+ " inner join " + DBS + " on " + DBS + ".\"DB_ID\" = " + TBLS + ".\"DB_ID\" "
+ " where \"PART_NAME\" in (" + quotedPartNames + ") "
+ " where \"PART_NAME\" in (" + makeParams(partNameList.size()) + ") "
+ " and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + DBS
+ ".\"CTLG_NAME\" = ? order by \"PART_NAME\" asc";

Object[] params = new Object[]{tblName, dbName, catName};
return getPartitionsByQuery(catName, dbName, tblName, queryText, params, isAcidTable, args);
List<Object> params = new ArrayList<>(partNameList);
params.addAll(List.of(tblName, dbName, catName));
return getPartitionsByQuery(catName, dbName, tblName, queryText, params.toArray(), isAcidTable, args);
}

private List<Partition> getPartitionsByPartitionIdsInBatch(String catName, String dbName,
Expand All @@ -1039,7 +1038,8 @@ private List<Partition> getPartitionsByPartitionIdsInBatch(String catName, Strin
if (partIdList.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
return Batchable.runBatched(batchSize, partIdList, new Batchable<Long, Partition>() {
int batch = dbType.getMaxBatch(batchSize, partIdList.size());
return Batchable.runBatched(batch, partIdList, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return getPartitionsByPartitionIds(catName, dbName, tblName, input, isAcidTable, args);
Expand Down Expand Up @@ -1773,7 +1773,8 @@ private long partsFoundForPartitions(
List<Long> allCounts = Batchable.runBatched(batchSize, colNames, new Batchable<String, Long>() {
@Override
public List<Long> run(final List<String> inputColName) throws MetaException {
return Batchable.runBatched(batchSize, partNames, new Batchable<String, Long>() {
int batch = dbType.getMaxBatch(batchSize, inputColName.size() + partNames.size() + 4);
return Batchable.runBatched(batch, partNames, new Batchable<String, Long>() {
@Override
public List<Long> run(List<String> inputPartNames) throws MetaException {
long partsFound = 0;
Expand Down Expand Up @@ -2246,8 +2247,8 @@ public void dropPartitionsViaSqlFilter(final String catName, final String dbName
if (partNames.isEmpty()) {
return;
}

Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() {
int batch = dbType.getMaxBatch(batchSize, partNames.size() + 3);
Batchable.runBatched(batch, partNames, new Batchable<String, Void>() {
@Override
public List<Void> run(List<String> input) throws MetaException {
String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
Expand Down Expand Up @@ -2294,10 +2295,10 @@ public List<String> dropAllPartitionsAndGetLocations(Long tableId, String baseLo
}
}

int batch = batchSize == NO_BATCHING ? 1 : (partIds.size() + batchSize) / batchSize;
int batch = dbType.getMaxBatch(batchSize, partIds.size());
AtomicLong batchIdx = new AtomicLong(1);
AtomicLong timeSpent = new AtomicLong(0);
Batchable.runBatched(batchSize, partIds, new Batchable<Long, Void>() {
Batchable.runBatched(batch, partIds, new Batchable<Long, Void>() {
@Override
public List<Void> run(List<Long> input) throws Exception {
StringBuilder progress = new StringBuilder("Dropping partitions, batch: ");
Expand Down Expand Up @@ -2730,7 +2731,8 @@ public boolean deleteTableColumnStatistics(long tableId, List<String> colNames,

public boolean deletePartitionColumnStats(String catName, String dbName, String tblName,
List<String> partNames, List<String> colNames, String engine) throws MetaException {
Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() {
int batch = dbType.getMaxBatch(batchSize, partNames.size() + 3);
Batchable.runBatched(batch, partNames, new Batchable<String, Void>() {
@Override
public List<Void> run(List<String> input) throws Exception {
String sqlFilter = PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2995,11 +2995,11 @@ private void dropPartitionsViaJdo(String catName, String dbName, String tblName,
}
openTransaction();

int batch = batchSize == NO_BATCHING ? 1 : (partNames.size() + batchSize) / batchSize;
int batch = dbType.getMaxBatch(batchSize, partNames.size() + 3);
AtomicLong batchIdx = new AtomicLong(1);
AtomicLong timeSpent = new AtomicLong(0);
try {
Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() {
Batchable.runBatched(batch, partNames, new Batchable<String, Void>() {
@Override
public List<Void> run(List<String> input) throws MetaException {
StringBuilder progress = new StringBuilder("Dropping partitions, batch: ");
Expand Down Expand Up @@ -4065,7 +4065,8 @@ private List<Partition> getPartitionsViaOrmFilter(String catName, String dbName,
if (partNames.isEmpty()) {
return Collections.emptyList();
}
return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() {
int batch = dbType.getMaxBatch(batchSize, partNames.size() + 3);
return Batchable.runBatched(batch, partNames, new Batchable<String, Partition>() {
@Override
public List<Partition> run(List<String> input) throws MetaException {
Pair<Query, Map<String, String>> queryWithParams =
Expand Down Expand Up @@ -9421,8 +9422,9 @@ private List<MTableColumnStatistics> getMTableColumnStatistics(Table table, List

List<MTableColumnStatistics> result = Collections.emptyList();
try (Query query = pm.newQuery(MTableColumnStatistics.class)) {
result =
Batchable.runBatched(batchSize, colNames, new Batchable<String, MTableColumnStatistics>() {
int batch = dbType.getMaxBatch(batchSize, colNames.size() + 4);
result =
Batchable.runBatched(batch, colNames, new Batchable<String, MTableColumnStatistics>() {
@Override
public List<MTableColumnStatistics> run(List<String> input)
throws MetaException {
Expand Down Expand Up @@ -10058,7 +10060,9 @@ public List<Void> run(List<String> input) throws Exception {
}
};
try {
Batchable.runBatched(batchSize, partNames, b);
int batch = dbType.getMaxBatch(batchSize,
partNames.size() + (colNames != null ? colNames.size() : 0) + (engine != null ? 4 : 3));
Batchable.runBatched(batch, partNames, b);
} finally {
b.closeAllQueries();
}
Expand Down
Loading