Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ protected void setup() throws Exception {
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);

doReturn(mockZookKeeper).when(persistentTopics).globalZk();
doReturn(mockZookKeeper).when(persistentTopics).localZk();
doReturn(mockZooKeeper).when(persistentTopics).globalZk();
doReturn(mockZooKeeper).when(persistentTopics).localZk();
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(persistentTopics).tenantsCache();
doReturn(pulsar.getConfigurationCache().policiesCache()).when(persistentTopics).policiesCache();
doReturn(false).when(persistentTopics).isRequestHttps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,14 @@ public void testInvalidDynamicConfigContentInZK() throws Exception {
final int newValue = 10;
stopBroker();
// set invalid data into dynamic-config znode so, broker startup fail to deserialize data
mockZookKeeper.setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH, "$".getBytes(), -1);
mockZooKeeper.setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH, "$".getBytes(), -1);
// start broker: it should have set watch even if with failure of deserialization
startBroker();
Assert.assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
// update zk with config-value which should fire watch and broker should update the config value
Map<String, String> configMap = Maps.newHashMap();
configMap.put("brokerShutdownTimeoutMs", Integer.toString(newValue));
mockZookKeeper.setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH,
mockZooKeeper.setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMap), -1);
// wait config to be updated
for (int i = 0; i < 5; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setup() throws Exception {

clusters = spy(new Clusters());
clusters.setPulsar(pulsar);
doReturn(mockZookKeeper).when(clusters).globalZk();
doReturn(mockZooKeeper).when(clusters).globalZk();
doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();
Expand All @@ -135,16 +135,16 @@ public void setup() throws Exception {
properties = spy(new Properties());
properties.setServletContext(new MockServletContext());
properties.setPulsar(pulsar);
doReturn(mockZookKeeper).when(properties).globalZk();
doReturn(mockZooKeeper).when(properties).globalZk();
doReturn(configurationCache.propertiesCache()).when(properties).tenantsCache();
doReturn("test").when(properties).clientAppId();
doNothing().when(properties).validateSuperUserAccess();

namespaces = spy(new Namespaces());
namespaces.setServletContext(new MockServletContext());
namespaces.setPulsar(pulsar);
doReturn(mockZookKeeper).when(namespaces).globalZk();
doReturn(mockZookKeeper).when(namespaces).localZk();
doReturn(mockZooKeeper).when(namespaces).globalZk();
doReturn(mockZooKeeper).when(namespaces).localZk();
doReturn(configurationCache.propertiesCache()).when(namespaces).tenantsCache();
doReturn(configurationCache.policiesCache()).when(namespaces).policiesCache();
doReturn("test").when(namespaces).clientAppId();
Expand All @@ -156,8 +156,8 @@ public void setup() throws Exception {
brokers = spy(new Brokers());
brokers.setServletContext(new MockServletContext());
brokers.setPulsar(pulsar);
doReturn(mockZookKeeper).when(brokers).globalZk();
doReturn(mockZookKeeper).when(brokers).localZk();
doReturn(mockZooKeeper).when(brokers).globalZk();
doReturn(mockZooKeeper).when(brokers).localZk();
doReturn(configurationCache.clustersListCache()).when(brokers).clustersListCache();
doReturn("test").when(brokers).clientAppId();
doNothing().when(brokers).validateSuperUserAccess();
Expand All @@ -168,8 +168,8 @@ public void setup() throws Exception {
persistentTopics = spy(new PersistentTopics());
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn(mockZookKeeper).when(persistentTopics).globalZk();
doReturn(mockZookKeeper).when(persistentTopics).localZk();
doReturn(mockZooKeeper).when(persistentTopics).globalZk();
doReturn(mockZooKeeper).when(persistentTopics).localZk();
doReturn(configurationCache.propertiesCache()).when(persistentTopics).tenantsCache();
doReturn(configurationCache.policiesCache()).when(persistentTopics).policiesCache();
doReturn("test").when(persistentTopics).clientAppId();
Expand All @@ -182,16 +182,16 @@ public void setup() throws Exception {
resourceQuotas = spy(new ResourceQuotas());
resourceQuotas.setServletContext(new MockServletContext());
resourceQuotas.setPulsar(pulsar);
doReturn(mockZookKeeper).when(resourceQuotas).globalZk();
doReturn(mockZookKeeper).when(resourceQuotas).localZk();
doReturn(mockZooKeeper).when(resourceQuotas).globalZk();
doReturn(mockZooKeeper).when(resourceQuotas).localZk();
doReturn(configurationCache.propertiesCache()).when(resourceQuotas).tenantsCache();
doReturn(configurationCache.policiesCache()).when(resourceQuotas).policiesCache();

brokerStats = spy(new BrokerStats());
brokerStats.setServletContext(new MockServletContext());
brokerStats.setPulsar(pulsar);
doReturn(mockZookKeeper).when(brokerStats).globalZk();
doReturn(mockZookKeeper).when(brokerStats).localZk();
doReturn(mockZooKeeper).when(brokerStats).globalZk();
doReturn(mockZooKeeper).when(brokerStats).localZk();
doReturn(configurationCache.propertiesCache()).when(brokerStats).tenantsCache();
doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache();

Expand All @@ -203,8 +203,8 @@ public void setup() throws Exception {
schemasResource = spy(new SchemasResource(mockClock));
schemasResource.setServletContext(new MockServletContext());
schemasResource.setPulsar(pulsar);
doReturn(mockZookKeeper).when(schemasResource).globalZk();
doReturn(mockZookKeeper).when(schemasResource).localZk();
doReturn(mockZooKeeper).when(schemasResource).globalZk();
doReturn(mockZooKeeper).when(schemasResource).localZk();
doReturn(configurationCache.propertiesCache()).when(schemasResource).tenantsCache();
doReturn(configurationCache.policiesCache()).when(schemasResource).policiesCache();
}
Expand Down Expand Up @@ -318,7 +318,7 @@ void clusters() throws Exception {
}

// Test zk failures
mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
configurationCache.clustersListCache().clear();
try {
clusters.getClusters();
Expand All @@ -327,39 +327,39 @@ void clusters() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
clusters.createCluster("test", new ClusterData("http://broker.messaging.test.example.com"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
clusters.updateCluster("test", new ClusterData("http://broker.messaging.test.example.com"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
clusters.getCluster("test");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failAfter(0, Code.SESSIONEXPIRED);
mockZooKeeper.failAfter(0, Code.SESSIONEXPIRED);
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failAfter(1, Code.SESSIONEXPIRED);
mockZooKeeper.failAfter(1, Code.SESSIONEXPIRED);
try {
clusters.deleteCluster("use");
fail("should have failed");
Expand Down Expand Up @@ -439,39 +439,39 @@ void properties() throws Exception {
}

// Test zk failures
mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
properties.getTenants();
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
properties.getTenantAdmin("my-tenant");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
properties.updateTenant("my-tenant", newPropertyAdmin);
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
properties.createTenant("test", tenantInfo);
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
properties.deleteTenant("my-tenant");
fail("should have failed");
Expand All @@ -480,7 +480,7 @@ void properties() throws Exception {
}

properties.createTenant("error-property", tenantInfo);
mockZookKeeper.failAfter(2, Code.SESSIONEXPIRED);
mockZooKeeper.failAfter(2, Code.SESSIONEXPIRED);
try {
properties.deleteTenant("error-property");
fail("should have failed");
Expand Down Expand Up @@ -609,7 +609,7 @@ void resourceQuotas() throws Exception {
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
mockZookKeeper.create(PulsarWebResource.path(POLICIES, property),
mockZooKeeper.create(PulsarWebResource.path(POLICIES, property),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(admin), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

Expand Down Expand Up @@ -668,7 +668,7 @@ void persistentTopics() throws Exception {
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
ZkUtils.createFullPathOptimistic(mockZookKeeper, PulsarWebResource.path(POLICIES, property, cluster, namespace),
ZkUtils.createFullPathOptimistic(mockZooKeeper, PulsarWebResource.path(POLICIES, property, cluster, namespace),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public void setup() throws Exception {
namespaces = spy(new Namespaces());
namespaces.setServletContext(new MockServletContext());
namespaces.setPulsar(pulsar);
doReturn(mockZookKeeper).when(namespaces).globalZk();
doReturn(mockZookKeeper).when(namespaces).localZk();
doReturn(mockZooKeeper).when(namespaces).globalZk();
doReturn(mockZooKeeper).when(namespaces).localZk();
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(namespaces).tenantsCache();
doReturn(pulsar.getConfigurationCache().policiesCache()).when(namespaces).policiesCache();
doReturn(false).when(namespaces).isRequestHttps();
Expand Down Expand Up @@ -219,7 +219,7 @@ public void testCreateNamespaces() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.createNamespace(this.testTenant, "use", "my-namespace-3", new BundlesData());
fail("should have failed");
Expand Down Expand Up @@ -255,15 +255,15 @@ public void testGetNamespaces() throws Exception {
}

// ZK Errors
mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.getTenantNamespaces(this.testTenant);
fail("should have failed");
} catch (RestException e) {
// Ok
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.getNamespacesForCluster(this.testTenant, this.testLocalCluster);
fail("should have failed");
Expand Down Expand Up @@ -339,23 +339,23 @@ public void testGrantAndRevokePermissions() throws Exception {

NamespaceName testNs = this.testLocalNamespaces.get(1);

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.getPolicies(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
fail("should have failed");
} catch (RestException e) {
// Ok
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.getPermissions(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
fail("should have failed");
} catch (RestException e) {
// Ok
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.grantPermissionOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
"other-role", EnumSet.of(AuthAction.consume));
Expand All @@ -364,7 +364,7 @@ public void testGrantAndRevokePermissions() throws Exception {
// Ok
}

mockZookKeeper.failNow(Code.BADVERSION);
mockZooKeeper.failNow(Code.BADVERSION);
try {
namespaces.grantPermissionOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
"other-role", EnumSet.of(AuthAction.consume));
Expand All @@ -373,7 +373,7 @@ public void testGrantAndRevokePermissions() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}

mockZookKeeper.failNow(Code.BADVERSION);
mockZooKeeper.failNow(Code.BADVERSION);
try {
namespaces.revokePermissionsOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
"other-role");
Expand All @@ -382,7 +382,7 @@ public void testGrantAndRevokePermissions() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
try {
namespaces.revokePermissionsOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
"other-role");
Expand Down Expand Up @@ -449,7 +449,7 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {

// Sometimes watcher event consumes scheduled exception, so set to always fail to ensure exception is
// thrown for api call.
mockZookKeeper.setAlwaysFail(Code.SESSIONEXPIRED);
mockZooKeeper.setAlwaysFail(Code.SESSIONEXPIRED);
pulsar.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES, this.testTenant,
"global", this.testGlobalNamespaces.get(0).getLocalName()));
try {
Expand All @@ -459,10 +459,10 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
} finally {
mockZookKeeper.unsetAlwaysFail();
mockZooKeeper.unsetAlwaysFail();
}

mockZookKeeper.failNow(Code.BADVERSION);
mockZooKeeper.failNow(Code.BADVERSION);
try {
namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
Expand All @@ -486,7 +486,7 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}

mockZookKeeper.failNow(Code.SESSIONEXPIRED);
mockZooKeeper.failNow(Code.SESSIONEXPIRED);
pulsar.getConfigurationCache().policiesCache().clear();

// ensure the ZooKeeper read happens, bypassing the cache
Expand Down Expand Up @@ -604,7 +604,7 @@ public void testDeleteNamespaces() throws Exception {

NamespaceName testNs = this.testLocalNamespaces.get(1);
TopicName topicName = TopicName.get(testNs.getPersistentTopicName("my-topic"));
ZkUtils.createFullPathOptimistic(mockZookKeeper, "/managed-ledgers/" + topicName.getPersistenceNamingEncoding(),
ZkUtils.createFullPathOptimistic(mockZooKeeper, "/managed-ledgers/" + topicName.getPersistenceNamingEncoding(),
new byte[0], null, null);

// setup ownership to localhost
Expand All @@ -620,9 +620,9 @@ public void testDeleteNamespaces() throws Exception {
assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.CONFLICT.getStatusCode());

// delete the topic from ZK
mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);
mockZooKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);

ZkUtils.createFullPathOptimistic(mockZookKeeper,
ZkUtils.createFullPathOptimistic(mockZooKeeper,
"/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
new byte[0], null, null);

Expand All @@ -633,7 +633,7 @@ public void testDeleteNamespaces() throws Exception {
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.CONFLICT.getStatusCode());

mockZookKeeper.delete("/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), -1);
mockZooKeeper.delete("/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), -1);

testNs = this.testGlobalNamespaces.get(0);
// setup ownership to localhost
Expand Down
Loading