Skip to content

Commit 8fb6b29

Browse files
oneby-wangoneby-wang
authored andcommitted
[fix][admin] Refactor bookie affinity group sync operations to async in rest api (apache#25050)
Co-authored-by: oneby-wang <onebywang@qq.com> (cherry picked from commit 4e5364f) (cherry picked from commit cd70356)
1 parent 6f040ae commit 8fb6b29

File tree

6 files changed

+180
-63
lines changed

6 files changed

+180
-63
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public void setLocalPoliciesWithCreate(NamespaceName ns, Function<Optional<Local
5555
setWithCreate(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), createFunction);
5656
}
5757

58+
public CompletableFuture<Void> setLocalPoliciesWithCreateAsync(NamespaceName ns, Function<Optional<LocalPolicies>,
59+
LocalPolicies> createFunction) {
60+
return setWithCreateAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), createFunction);
61+
}
62+
5863
public CompletableFuture<Void> createLocalPoliciesAsync(NamespaceName ns, LocalPolicies policies) {
5964
return getCache().create(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), policies);
6065
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,49 +1013,32 @@ protected CompletableFuture<Void> internalUnloadNamespaceAsync() {
10131013
});
10141014
}
10151015

1016-
1017-
protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) {
1018-
validateSuperUserAccess();
1019-
log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
1020-
this.namespaceName);
1021-
1022-
if (namespaceName.isGlobal()) {
1023-
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
1024-
validateGlobalNamespaceOwnership(namespaceName);
1025-
} else {
1026-
validateClusterOwnership(namespaceName.getCluster());
1027-
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
1028-
}
1029-
1030-
try {
1031-
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> {
1032-
LocalPolicies localPolicies = oldPolicies.map(
1033-
policies -> new LocalPolicies(policies.bundles,
1034-
bookieAffinityGroup,
1035-
policies.namespaceAntiAffinityGroup,
1036-
policies.migrated))
1037-
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), bookieAffinityGroup, null));
1038-
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
1039-
namespaceName, localPolicies);
1040-
return localPolicies;
1041-
});
1042-
} catch (NotFoundException e) {
1043-
log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
1044-
namespaceName);
1045-
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
1046-
} catch (RestException re) {
1047-
throw re;
1048-
} catch (Exception e) {
1049-
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
1050-
e);
1051-
throw new RestException(e);
1052-
}
1016+
protected CompletableFuture<Void> internalSetBookieAffinityGroupAsync(BookieAffinityGroupData bookieAffinityGroup) {
1017+
return validateSuperUserAccessAsync().thenCompose(__ -> {
1018+
log.info("[{}] Setting bookie affinity group {} for namespace {}", clientAppId(), bookieAffinityGroup,
1019+
this.namespaceName);
1020+
if (namespaceName.isGlobal()) {
1021+
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
1022+
return validateGlobalNamespaceOwnershipAsync(namespaceName);
1023+
} else {
1024+
return validateClusterOwnershipAsync(namespaceName.getCluster()).thenCompose(
1025+
unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster()));
1026+
}
1027+
}).thenCompose(__ -> getDefaultBundleDataAsync().thenCompose(
1028+
defaultBundleData -> getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName, oldPolicies ->
1029+
oldPolicies.map(policies -> new LocalPolicies(policies.bundles, bookieAffinityGroup,
1030+
policies.namespaceAntiAffinityGroup, policies.migrated))
1031+
.orElseGet(() -> new LocalPolicies(defaultBundleData, bookieAffinityGroup, null)))))
1032+
.thenAccept(__ -> log.info(
1033+
"[{}] Successfully updated bookie affinity group: namespace={}, bookieAffinityGroup={}", clientAppId(),
1034+
namespaceName, bookieAffinityGroup));
10531035
}
10541036

1055-
protected void internalDeleteBookieAffinityGroup() {
1056-
internalSetBookieAffinityGroup(null);
1037+
protected CompletableFuture<Void> internalDeleteBookieAffinityGroupAsync() {
1038+
return internalSetBookieAffinityGroupAsync(null);
10571039
}
10581040

1041+
@Deprecated
10591042
protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
10601043
validateSuperUserAccess();
10611044

@@ -1084,6 +1067,21 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
10841067
}
10851068
}
10861069

1070+
protected CompletableFuture<BookieAffinityGroupData> internalGetBookieAffinityGroupAsync() {
1071+
return validateSuperUserAccessAsync().thenCompose(__ -> {
1072+
if (namespaceName.isGlobal()) {
1073+
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
1074+
return validateGlobalNamespaceOwnershipAsync(namespaceName);
1075+
} else {
1076+
return validateClusterOwnershipAsync(namespaceName.getCluster()).thenCompose(
1077+
unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster()));
1078+
}
1079+
}).thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName))
1080+
.thenApply(policies -> policies.orElseThrow(
1081+
() -> new RestException(Status.NOT_FOUND, "Namespace local-policies does not exist"))
1082+
.bookieAffinityGroup);
1083+
}
1084+
10871085
private CompletableFuture<Void> validateLeaderBrokerAsync() {
10881086
if (this.isLeaderBroker()) {
10891087
return CompletableFuture.completedFuture(null);
@@ -2988,6 +2986,7 @@ protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsyn
29882986
}
29892987

29902988
// TODO remove this sync method after async refactor
2989+
@Deprecated
29912990
private BundlesData getDefaultBundleData() {
29922991
try {
29932992
return getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,10 +1249,18 @@ public void setPersistence(@Suspended final AsyncResponse asyncResponse, @PathPa
12491249
@ApiResponse(code = 403, message = "Don't have admin permission"),
12501250
@ApiResponse(code = 404, message = "Namespace does not exist"),
12511251
@ApiResponse(code = 409, message = "Concurrent modification") })
1252-
public void setBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
1253-
@PathParam("namespace") String namespace, BookieAffinityGroupData bookieAffinityGroup) {
1252+
public void setBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
1253+
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
1254+
BookieAffinityGroupData bookieAffinityGroup) {
12541255
validateNamespaceName(property, cluster, namespace);
1255-
internalSetBookieAffinityGroup(bookieAffinityGroup);
1256+
internalSetBookieAffinityGroupAsync(bookieAffinityGroup)
1257+
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
1258+
.exceptionally(ex -> {
1259+
log.error("[{}] Failed to set bookie affinity group for namespace {}", clientAppId(),
1260+
namespaceName, ex);
1261+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1262+
return null;
1263+
});
12561264
}
12571265

12581266
@GET
@@ -1263,10 +1271,17 @@ public void setBookieAffinityGroup(@PathParam("property") String property, @Path
12631271
@ApiResponse(code = 403, message = "Don't have admin permission"),
12641272
@ApiResponse(code = 404, message = "Namespace does not exist"),
12651273
@ApiResponse(code = 409, message = "Concurrent modification") })
1266-
public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") String property,
1267-
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
1274+
public void getBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
1275+
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
12681276
validateNamespaceName(property, cluster, namespace);
1269-
return internalGetBookieAffinityGroup();
1277+
internalGetBookieAffinityGroupAsync()
1278+
.thenAccept(asyncResponse::resume)
1279+
.exceptionally(ex -> {
1280+
log.error("[{}] Failed to get bookie affinity group for namespace {}", clientAppId(),
1281+
namespaceName, ex);
1282+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1283+
return null;
1284+
});
12701285
}
12711286

12721287
@DELETE
@@ -1277,10 +1292,18 @@ public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") Str
12771292
@ApiResponse(code = 403, message = "Don't have admin permission"),
12781293
@ApiResponse(code = 404, message = "Namespace does not exist"),
12791294
@ApiResponse(code = 409, message = "Concurrent modification") })
1280-
public void deleteBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
1281-
@PathParam("namespace") String namespace) {
1295+
public void deleteBookieAffinityGroup(@Suspended AsyncResponse asyncResponse,
1296+
@PathParam("property") String property, @PathParam("cluster") String cluster,
1297+
@PathParam("namespace") String namespace) {
12821298
validateNamespaceName(property, cluster, namespace);
1283-
internalDeleteBookieAffinityGroup();
1299+
internalDeleteBookieAffinityGroupAsync()
1300+
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
1301+
.exceptionally(ex -> {
1302+
log.error("[{}] Failed to delete bookie affinity group for namespace {}", clientAppId(),
1303+
namespaceName, ex);
1304+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1305+
return null;
1306+
});
12841307
}
12851308

12861309
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,11 +1421,19 @@ public void deletePersistence(@Suspended final AsyncResponse asyncResponse, @Pat
14211421
@ApiResponse(code = 403, message = "Don't have admin permission"),
14221422
@ApiResponse(code = 404, message = "Namespace does not exist"),
14231423
@ApiResponse(code = 409, message = "Concurrent modification")})
1424-
public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
1424+
public void setBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
1425+
@PathParam("namespace") String namespace,
14251426
@ApiParam(value = "Bookie affinity group for the specified namespace")
1426-
BookieAffinityGroupData bookieAffinityGroup) {
1427+
BookieAffinityGroupData bookieAffinityGroup) {
14271428
validateNamespaceName(tenant, namespace);
1428-
internalSetBookieAffinityGroup(bookieAffinityGroup);
1429+
internalSetBookieAffinityGroupAsync(bookieAffinityGroup)
1430+
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
1431+
.exceptionally(ex -> {
1432+
log.error("[{}] Failed to set bookie affinity group for namespace {}", clientAppId(),
1433+
namespaceName, ex);
1434+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1435+
return null;
1436+
});
14291437
}
14301438

14311439
@GET
@@ -1437,10 +1445,17 @@ public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathPara
14371445
@ApiResponse(code = 403, message = "Don't have admin permission"),
14381446
@ApiResponse(code = 404, message = "Namespace does not exist"),
14391447
@ApiResponse(code = 409, message = "Concurrent modification") })
1440-
public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") String property,
1441-
@PathParam("namespace") String namespace) {
1448+
public void getBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
1449+
@PathParam("namespace") String namespace) {
14421450
validateNamespaceName(property, namespace);
1443-
return internalGetBookieAffinityGroup();
1451+
internalGetBookieAffinityGroupAsync()
1452+
.thenAccept(asyncResponse::resume)
1453+
.exceptionally(ex -> {
1454+
log.error("[{}] Failed to get bookie affinity group for namespace {}", clientAppId(),
1455+
namespaceName, ex);
1456+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1457+
return null;
1458+
});
14441459
}
14451460

14461461
@DELETE
@@ -1451,10 +1466,18 @@ public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") Str
14511466
@ApiResponse(code = 403, message = "Don't have admin permission"),
14521467
@ApiResponse(code = 404, message = "Namespace does not exist"),
14531468
@ApiResponse(code = 409, message = "Concurrent modification") })
1454-
public void deleteBookieAffinityGroup(@PathParam("property") String property,
1455-
@PathParam("namespace") String namespace) {
1469+
public void deleteBookieAffinityGroup(@Suspended AsyncResponse asyncResponse,
1470+
@PathParam("property") String property,
1471+
@PathParam("namespace") String namespace) {
14561472
validateNamespaceName(property, namespace);
1457-
internalDeleteBookieAffinityGroup();
1473+
internalDeleteBookieAffinityGroupAsync()
1474+
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
1475+
.exceptionally(ex -> {
1476+
log.error("[{}] Failed to delete bookie affinity group for namespace {}", clientAppId(),
1477+
namespaceName, ex);
1478+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1479+
return null;
1480+
});
14581481
}
14591482

14601483
@GET

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2399,4 +2399,38 @@ public void testDeleteNamespaceUsesRemoteClusterTlsConfig() throws Exception {
23992399
"Expected TLS service error, got: " + message);
24002400
}
24012401
}
2402+
2403+
@Test
2404+
public void testSetAndDeleteBookieAffinityGroup() throws Exception {
2405+
// 1. create namespace with empty policies
2406+
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
2407+
asyncRequests(
2408+
response -> namespaces.createNamespace(response, testTenant, testLocalCluster, setBookieAffinityGroupNs,
2409+
(Policies) null));
2410+
2411+
// 2.set bookie affinity group
2412+
String primaryAffinityGroup = "primary-affinity-group";
2413+
String secondaryAffinityGroup = "secondary-affinity-group";
2414+
BookieAffinityGroupData bookieAffinityGroupDataReq =
2415+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
2416+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
2417+
asyncRequests(response -> namespaces.setBookieAffinityGroup(response, testTenant, testLocalCluster,
2418+
setBookieAffinityGroupNs, bookieAffinityGroupDataReq));
2419+
2420+
// 3.assert namespace bookie affinity group
2421+
BookieAffinityGroupData bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests(
2422+
response -> namespaces.getBookieAffinityGroup(response, testTenant, testLocalCluster,
2423+
setBookieAffinityGroupNs));
2424+
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
2425+
2426+
// 4.delete bookie affinity group
2427+
asyncRequests(response -> namespaces.deleteBookieAffinityGroup(response, testTenant, testLocalCluster,
2428+
setBookieAffinityGroupNs));
2429+
2430+
// 5.assert namespace bookie affinity group
2431+
bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests(
2432+
response -> namespaces.getBookieAffinityGroup(response, testTenant, testLocalCluster,
2433+
setBookieAffinityGroupNs));
2434+
assertNull(bookieAffinityGroupDataResp);
2435+
}
24022436
}

0 commit comments

Comments
 (0)