Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for null arguments in Namespaces Rest API #7247

Merged
merged 2 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;

Expand Down Expand Up @@ -397,7 +396,7 @@ protected void validateBrokerName(String broker) throws MalformedURLException {
if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, "Invalid broker url %s", broker);
checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
String host = parts[0];
int port = Integer.parseInt(parts[1]);

Expand Down Expand Up @@ -844,4 +843,16 @@ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Thr
asyncResponse.resume(new RestException(throwable));
}
}

protected void checkNotNull(Object o, String errorMessage) {
if (o == null) {
throw new RestException(Status.BAD_REQUEST, errorMessage);
}
}

protected void checkArgument(boolean b, String errorMessage) {
if (!b) {
throw new RestException(Status.BAD_REQUEST, errorMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
Expand Down Expand Up @@ -110,6 +108,7 @@ public abstract class NamespacesBase extends AdminResource {
private static final long MAX_BUNDLES = ((long) 1) << 32;

protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
Expand Down Expand Up @@ -380,6 +379,8 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori

protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(role, "Role should not be null");
checkNotNull(actions, "Actions should not be null");

try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
Expand Down Expand Up @@ -411,6 +412,8 @@ protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> a

protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(roles, "Roles should not be null");

try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
Expand Down Expand Up @@ -442,6 +445,7 @@ protected void internalGrantPermissionOnSubscription(String subscription, Set<St
protected void internalRevokePermissionsOnNamespace(String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
checkNotNull(role, "Role should not be null");

try {
Stat nodeStat = new Stat();
Expand Down Expand Up @@ -472,6 +476,8 @@ protected void internalRevokePermissionsOnNamespace(String role) {
protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
checkNotNull(subscriptionName, "SubscriptionName should not be null");
checkNotNull(role, "Role should not be null");

AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
Expand All @@ -497,6 +503,7 @@ protected Set<String> internalGetNamespaceReplicationClusters() {
protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
checkNotNull(clusterIds, "ClusterIds should not be null");

Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
if (!namespaceName.isGlobal()) {
Expand Down Expand Up @@ -1021,6 +1028,7 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
@SuppressWarnings("deprecation")
public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);

Policies policies = getNamespacePolicies(namespaceName);
Expand Down Expand Up @@ -1069,6 +1077,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);

Policies policies = getNamespacePolicies(namespaceName);
Expand Down Expand Up @@ -1581,6 +1590,7 @@ protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolea
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

Expand All @@ -1602,6 +1612,7 @@ protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean a
protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(subscription, "Subscription should not be null");

final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
Expand Down Expand Up @@ -1645,6 +1656,8 @@ protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncR
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

Expand All @@ -1666,6 +1679,7 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri
protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
checkNotNull(subscription, "Subscription should not be null");

final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
Expand Down Expand Up @@ -1708,6 +1722,8 @@ protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String
@SuppressWarnings("deprecation")
protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

Expand Down Expand Up @@ -1845,6 +1861,7 @@ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliver

protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
validatePoliciesReadOnlyAccess();

log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
Expand Down Expand Up @@ -1922,6 +1939,9 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String tenant) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
checkNotNull(cluster, "Cluster should not be null");
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
checkNotNull(tenant, "Tenant should not be null");

log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);

Expand Down Expand Up @@ -1952,24 +1972,21 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
}

private void validatePersistencePolicies(PersistencePolicies persistence) {
try {
checkNotNull(persistence);
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= %s", config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= %s", config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= %s", config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
"Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
persistence.getBookkeeperAckQuorum());
} catch (NullPointerException | IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
}
checkNotNull(persistence, "persistence policies should not be null");
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= " + config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= " + config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= " + config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
persistence.getBookkeeperAckQuorum()));

}

protected RetentionPolicies internalGetRetention() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import static org.apache.pulsar.common.util.Codec.decode;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -1404,11 +1404,11 @@ protected void internalSkipMessages(String subName, int numMessages, boolean aut
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
Preconditions.checkNotNull(repl);
repl.skipMessages(numMessages).get();
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Preconditions.checkNotNull(sub);
sub.skipMessages(numMessages).get();
}
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, topicName, subName);
Expand Down Expand Up @@ -1798,7 +1798,7 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Preconditions.checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
Expand Down Expand Up @@ -1907,7 +1907,7 @@ private void verifyReadOperation(boolean authoritative) {
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
checkNotNull(entry);
Preconditions.checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

Expand Down Expand Up @@ -2151,11 +2151,11 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
Preconditions.checkNotNull(repl);
repl.expireMessages(expireTimeInSeconds);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Preconditions.checkNotNull(sub);
sub.expireMessages(expireTimeInSeconds);
}
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName,
Expand Down Expand Up @@ -2374,7 +2374,7 @@ private Topic getOrCreateTopic(TopicName topicName) {
private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
try {
Subscription sub = topic.getSubscription(subName);
return checkNotNull(sub);
return Preconditions.checkNotNull(sub);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
Expand All @@ -2387,7 +2387,7 @@ private PersistentReplicator getReplicatorReference(String replName, PersistentT
try {
String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
return checkNotNull(repl);
return Preconditions.checkNotNull(repl);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Replicator not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,19 @@ public void testSetPersistencepolicies() throws Exception {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
assertEquals(e.getStatusCode(), 400);
}

// make sure policies has not been changed
Expand Down