Skip to content

Commit 22dacc4

Browse files
committed
Persistence implementation for pagination in some requests
Following up on #1555 * Refactor pagination code to delineate API-level page tokens and internal "pointers to data" * Requests deal with the "previous" token, user-provided page size (optional) and the previous request's page size. * Concentrate the logic of combining page size requests and previous tokens in PageTokenUtil * PageToken subclasses are no longer necessary. EntityIdPaging handles pagination over ordered result sets with static helper methods.
1 parent af1643c commit 22dacc4

File tree

38 files changed

+922
-542
lines changed

38 files changed

+922
-542
lines changed

integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ public List<Namespace> listNamespaces(String catalog, Namespace parent) {
101101
}
102102
}
103103

104+
public ListNamespacesResponse listNamespaces(
105+
String catalog, Namespace parent, String pageToken, String pageSize) {
106+
Map<String, String> queryParams = new HashMap<>();
107+
if (!parent.isEmpty()) {
108+
// TODO change this for Iceberg 1.7.2:
109+
// queryParams.put("parent", RESTUtil.encodeNamespace(parent));
110+
queryParams.put("parent", Joiner.on('\u001f').join(parent.levels()));
111+
}
112+
queryParams.put("pageToken", pageToken);
113+
queryParams.put("pageSize", pageSize);
114+
try (Response response =
115+
request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) {
116+
assertThat(response.getStatus()).isEqualTo(OK.getStatusCode());
117+
ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class);
118+
return res;
119+
}
120+
}
121+
104122
public List<Namespace> listAllNamespacesChildFirst(String catalog) {
105123
List<Namespace> result = new ArrayList<>();
106124
for (int idx = -1; idx < result.size(); idx++) {
@@ -142,6 +160,20 @@ public List<TableIdentifier> listTables(String catalog, Namespace namespace) {
142160
}
143161
}
144162

163+
public ListTablesResponse listTables(
164+
String catalog, Namespace namespace, String pageToken, String pageSize) {
165+
String ns = RESTUtil.encodeNamespace(namespace);
166+
Map<String, String> queryParams = new HashMap<>();
167+
queryParams.put("pageToken", pageToken);
168+
queryParams.put("pageSize", pageSize);
169+
try (Response res =
170+
request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams)
171+
.get()) {
172+
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
173+
return res.readEntity(ListTablesResponse.class);
174+
}
175+
}
176+
145177
public void dropTable(String catalog, TableIdentifier id) {
146178
String ns = RESTUtil.encodeNamespace(id.namespace());
147179
try (Response res =

integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.iceberg.rest.RESTUtil;
6868
import org.apache.iceberg.rest.requests.CreateTableRequest;
6969
import org.apache.iceberg.rest.responses.ErrorResponse;
70+
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
71+
import org.apache.iceberg.rest.responses.ListTablesResponse;
7072
import org.apache.iceberg.types.Types;
7173
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
7274
import org.apache.polaris.core.admin.model.Catalog;
@@ -164,7 +166,8 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests<RESTCatalog>
164166

165167
private static final String[] DEFAULT_CATALOG_PROPERTIES = {
166168
"polaris.config.allow.unstructured.table.location", "true",
167-
"polaris.config.allow.external.table.location", "true"
169+
"polaris.config.allow.external.table.location", "true",
170+
"polaris.config.list-pagination-enabled", "true"
168171
};
169172

170173
@Retention(RetentionPolicy.RUNTIME)
@@ -1558,4 +1561,72 @@ public void testUpdateTableWithReservedProperty() {
15581561
.hasMessageContaining("reserved prefix");
15591562
genericTableApi.purge(currentCatalogName, namespace);
15601563
}
1564+
1565+
@Test
1566+
public void testPaginatedListNamespaces() {
1567+
String prefix = "testPaginatedListNamespaces";
1568+
for (int i = 0; i < 20; i++) {
1569+
Namespace namespace = Namespace.of(prefix + i);
1570+
restCatalog.createNamespace(namespace);
1571+
}
1572+
1573+
try {
1574+
Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty()))
1575+
.hasSize(20);
1576+
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
1577+
int total = 0;
1578+
String pageToken = null;
1579+
do {
1580+
ListNamespacesResponse response =
1581+
catalogApi.listNamespaces(
1582+
currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize));
1583+
Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize);
1584+
total += response.namespaces().size();
1585+
pageToken = response.nextPageToken();
1586+
} while (pageToken != null);
1587+
Assertions.assertThat(total)
1588+
.as("Total paginated results for pageSize = " + pageSize)
1589+
.isEqualTo(20);
1590+
}
1591+
} finally {
1592+
for (int i = 0; i < 20; i++) {
1593+
Namespace namespace = Namespace.of(prefix + i);
1594+
restCatalog.dropNamespace(namespace);
1595+
}
1596+
}
1597+
}
1598+
1599+
@Test
1600+
public void testPaginatedListTables() {
1601+
String prefix = "testPaginatedListTables";
1602+
Namespace namespace = Namespace.of(prefix);
1603+
restCatalog.createNamespace(namespace);
1604+
for (int i = 0; i < 20; i++) {
1605+
restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA);
1606+
}
1607+
1608+
try {
1609+
Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20);
1610+
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
1611+
int total = 0;
1612+
String pageToken = null;
1613+
do {
1614+
ListTablesResponse response =
1615+
catalogApi.listTables(
1616+
currentCatalogName, namespace, pageToken, String.valueOf(pageSize));
1617+
Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize);
1618+
total += response.identifiers().size();
1619+
pageToken = response.nextPageToken();
1620+
} while (pageToken != null);
1621+
Assertions.assertThat(total)
1622+
.as("Total paginated results for pageSize = " + pageSize)
1623+
.isEqualTo(20);
1624+
}
1625+
} finally {
1626+
for (int i = 0; i < 20; i++) {
1627+
restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i));
1628+
}
1629+
restCatalog.dropNamespace(namespace);
1630+
}
1631+
}
15611632
}

persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
5454
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5555
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
56-
import org.apache.polaris.core.persistence.pagination.HasPageSize;
56+
import org.apache.polaris.core.persistence.pagination.EntityIdPaging;
5757
import org.apache.polaris.core.persistence.pagination.Page;
5858
import org.apache.polaris.core.persistence.pagination.PageToken;
5959
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
@@ -468,6 +468,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
468468
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
469469
@Nonnull Function<PolarisBaseEntity, T> transformer,
470470
@Nonnull PageToken pageToken) {
471+
471472
// full range scan under the parent for that type
472473
Stream<PolarisBaseEntity> data =
473474
this.store
@@ -477,11 +478,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
477478
.map(ModelEntity::toEntity)
478479
.filter(entityFilter);
479480

480-
if (pageToken instanceof HasPageSize hasPageSize) {
481-
data = data.limit(hasPageSize.getPageSize());
482-
}
483-
484-
return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
481+
return Page.mapped(pageToken, data, transformer, EntityIdPaging::encodedDataReference);
485482
}
486483

487484
/** {@inheritDoc} */

persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.polaris.core.entity.PolarisEntityType;
3636
import org.apache.polaris.core.entity.PolarisGrantRecord;
3737
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
38+
import org.apache.polaris.core.persistence.pagination.EntityIdPaging;
3839
import org.apache.polaris.core.persistence.pagination.PageToken;
3940
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
4041
import org.apache.polaris.core.policy.PolicyEntity;
@@ -294,14 +295,25 @@ List<ModelEntity> lookupFullEntitiesActive(
294295

295296
// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
296297
String hql =
297-
"SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
298+
"SELECT m from ModelEntity m where"
299+
+ " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode and m.id > :tokenId";
300+
301+
long tokenId = -1;
302+
if (pageToken.paginationRequested()) {
303+
hql += " order by m.id asc";
304+
305+
if (pageToken.hasDataReference()) {
306+
tokenId = EntityIdPaging.entityIdBoundary(pageToken);
307+
}
308+
}
298309

299310
TypedQuery<ModelEntity> query =
300311
session
301312
.createQuery(hql, ModelEntity.class)
302313
.setParameter("catalogId", catalogId)
303314
.setParameter("parentId", parentId)
304-
.setParameter("typeCode", entityType.getCode());
315+
.setParameter("typeCode", entityType.getCode())
316+
.setParameter("tokenId", tokenId);
305317

306318
return query.getResultList();
307319
}

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.polaris.persistence.relational.jdbc;
2020

21+
import static org.apache.polaris.core.persistence.pagination.EntityIdPaging.entityIdBoundary;
2122
import static org.apache.polaris.persistence.relational.jdbc.QueryGenerator.PreparedQuery;
2223

2324
import com.google.common.base.Preconditions;
@@ -31,6 +32,7 @@
3132
import java.util.LinkedHashMap;
3233
import java.util.List;
3334
import java.util.Map;
35+
import java.util.concurrent.atomic.AtomicReference;
3436
import java.util.function.Function;
3537
import java.util.function.Predicate;
3638
import java.util.stream.Collectors;
@@ -50,7 +52,7 @@
5052
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
5153
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5254
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
53-
import org.apache.polaris.core.persistence.pagination.HasPageSize;
55+
import org.apache.polaris.core.persistence.pagination.EntityIdPaging;
5456
import org.apache.polaris.core.persistence.pagination.Page;
5557
import org.apache.polaris.core.persistence.pagination.PageToken;
5658
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -449,7 +451,7 @@ public <T> Page<T> listEntities(
449451
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
450452
@Nonnull Function<PolarisBaseEntity, T> transformer,
451453
@Nonnull PageToken pageToken) {
452-
Map<String, Object> params =
454+
Map<String, Object> whereEquals =
453455
Map.of(
454456
"catalog_id",
455457
catalogId,
@@ -459,29 +461,37 @@ public <T> Page<T> listEntities(
459461
entityType.getCode(),
460462
"realm_id",
461463
realmId);
464+
Map<String, Object> whereGreater = Map.of();
462465

463466
// Limit can't be pushed down, due to client side filtering
464467
// absence of transaction.
468+
String orderByColumnName = null;
469+
if (pageToken.paginationRequested()) {
470+
orderByColumnName = ModelEntity.ID_COLUMN;
471+
if (pageToken.hasDataReference()) {
472+
long boundary = entityIdBoundary(pageToken);
473+
whereGreater = Map.of(ModelEntity.ID_COLUMN, boundary);
474+
}
475+
}
476+
465477
try {
466478
PreparedQuery query =
467479
QueryGenerator.generateSelectQuery(
468-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params);
469-
List<PolarisBaseEntity> results = new ArrayList<>();
480+
ModelEntity.ALL_COLUMNS,
481+
ModelEntity.TABLE_NAME,
482+
whereEquals,
483+
whereGreater,
484+
orderByColumnName);
485+
AtomicReference<Page<T>> results = new AtomicReference<>();
470486
datasourceOperations.executeSelectOverStream(
471487
query,
472488
new ModelEntity(),
473489
stream -> {
474490
var data = stream.filter(entityFilter);
475-
if (pageToken instanceof HasPageSize hasPageSize) {
476-
data = data.limit(hasPageSize.getPageSize());
477-
}
478-
data.forEach(results::add);
491+
results.set(
492+
Page.mapped(pageToken, data, transformer, EntityIdPaging::encodedDataReference));
479493
});
480-
List<T> resultsOrEmpty =
481-
results == null
482-
? Collections.emptyList()
483-
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
484-
return Page.fromItems(resultsOrEmpty);
494+
return results.get();
485495
} catch (SQLException e) {
486496
throw new RuntimeException(
487497
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import jakarta.annotation.Nonnull;
23+
import jakarta.annotation.Nullable;
2324
import java.util.*;
2425
import java.util.stream.Collectors;
2526
import org.apache.polaris.core.entity.PolarisEntityCore;
@@ -52,8 +53,27 @@ public static PreparedQuery generateSelectQuery(
5253
@Nonnull List<String> projections,
5354
@Nonnull String tableName,
5455
@Nonnull Map<String, Object> whereClause) {
55-
QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause);
56-
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql());
56+
return generateSelectQuery(projections, tableName, whereClause, Map.of(), null);
57+
}
58+
59+
/**
60+
* Generates a SELECT query with projection and filtering.
61+
*
62+
* @param projections List of columns to retrieve.
63+
* @param tableName Target table name.
64+
* @param whereEquals Column-value pairs used in WHERE filtering.
65+
* @return A parameterized SELECT query.
66+
* @throws IllegalArgumentException if any whereClause column isn't in projections.
67+
*/
68+
public static PreparedQuery generateSelectQuery(
69+
@Nonnull List<String> projections,
70+
@Nonnull String tableName,
71+
@Nonnull Map<String, Object> whereEquals,
72+
@Nonnull Map<String, Object> whereGreater,
73+
@Nullable String orderByColumn) {
74+
QueryFragment where =
75+
generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater);
76+
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn);
5777
return new PreparedQuery(query.sql(), where.parameters());
5878
}
5979

@@ -101,7 +121,8 @@ public static PreparedQuery generateSelectQueryWithEntityIds(
101121
params.add(realmId);
102122
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?";
103123
return new PreparedQuery(
104-
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params);
124+
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(),
125+
params);
105126
}
106127

107128
/**
@@ -150,7 +171,7 @@ public static PreparedQuery generateUpdateQuery(
150171
@Nonnull List<Object> values,
151172
@Nonnull Map<String, Object> whereClause) {
152173
List<Object> bindingParams = new ArrayList<>(values);
153-
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause);
174+
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of());
154175
String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", "));
155176
String sql =
156177
"UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql();
@@ -170,34 +191,49 @@ public static PreparedQuery generateDeleteQuery(
170191
@Nonnull List<String> tableColumns,
171192
@Nonnull String tableName,
172193
@Nonnull Map<String, Object> whereClause) {
173-
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause);
194+
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of());
174195
return new PreparedQuery(
175196
"DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters());
176197
}
177198

178199
private static PreparedQuery generateSelectQuery(
179-
@Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull String filter) {
200+
@Nonnull List<String> columnNames,
201+
@Nonnull String tableName,
202+
@Nonnull String filter,
203+
@Nullable String orderByColumn) {
180204
String sql =
181205
"SELECT "
182206
+ String.join(", ", columnNames)
183207
+ " FROM "
184208
+ getFullyQualifiedTableName(tableName)
185209
+ filter;
210+
if (orderByColumn != null) {
211+
sql += " ORDER BY " + orderByColumn + " ASC";
212+
}
186213
return new PreparedQuery(sql, Collections.emptyList());
187214
}
188215

189216
@VisibleForTesting
190217
static QueryFragment generateWhereClause(
191-
@Nonnull Set<String> tableColumns, @Nonnull Map<String, Object> whereClause) {
218+
@Nonnull Set<String> tableColumns,
219+
@Nonnull Map<String, Object> whereEquals,
220+
@Nonnull Map<String, Object> whereGreater) {
192221
List<String> conditions = new ArrayList<>();
193222
List<Object> parameters = new ArrayList<>();
194-
for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
223+
for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
195224
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
196225
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
197226
}
198227
conditions.add(entry.getKey() + " = ?");
199228
parameters.add(entry.getValue());
200229
}
230+
for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
231+
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
232+
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
233+
}
234+
conditions.add(entry.getKey() + " > ?");
235+
parameters.add(entry.getValue());
236+
}
201237
String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
202238
return new QueryFragment(clause, parameters);
203239
}

0 commit comments

Comments
 (0)