Skip to content

Commit

Permalink
Renamed "property" into "tenant" (#1578)
Browse files Browse the repository at this point in the history
* Renamed "property" into "tenant"

* Fixed merging issue

* Fixed tests

* Fixed typo
  • Loading branch information
merlimat committed Apr 16, 2018
1 parent d8af136 commit cc9a9d9
Show file tree
Hide file tree
Showing 105 changed files with 1,364 additions and 1,230 deletions.
2 changes: 1 addition & 1 deletion deployment/terraform-ansible/deploy-pulsar.yaml
Expand Up @@ -154,7 +154,7 @@
tasks:
- name: Create default property and namespace
shell: |
bin/pulsar-admin properties create public \
bin/pulsar-admin tenants create public \
--allowed-clusters local \
--admin-roles all
bin/pulsar-admin namespaces create public/local/default
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
Expand All @@ -53,7 +53,7 @@ public class ConfigurationCacheService {
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationCacheService.class);

private final ZooKeeperCache cache;
private ZooKeeperDataCache<PropertyAdmin> propertiesCache;
private ZooKeeperDataCache<TenantInfo> propertiesCache;
private ZooKeeperDataCache<Policies> policiesCache;
private ZooKeeperDataCache<ClusterData> clustersCache;
private ZooKeeperChildrenCache clustersListCache;
Expand All @@ -76,10 +76,10 @@ public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterN

initZK();

this.propertiesCache = new ZooKeeperDataCache<PropertyAdmin>(cache) {
this.propertiesCache = new ZooKeeperDataCache<TenantInfo>(cache) {
@Override
public PropertyAdmin deserialize(String path, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, PropertyAdmin.class);
public TenantInfo deserialize(String path, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
}
};

Expand Down Expand Up @@ -170,7 +170,7 @@ public ZooKeeperCache cache() {
return cache;
}

public ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
public ZooKeeperDataCache<TenantInfo> propertiesCache() {
return this.propertiesCache;
}

Expand Down
Expand Up @@ -20,10 +20,13 @@

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Lists;
import java.io.IOException;

import java.io.IOException;
import java.util.List;

import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
Expand All @@ -32,7 +35,7 @@
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
Expand All @@ -44,9 +47,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

/**
* Setup the metadata for a new Pulsar cluster
*/
Expand Down Expand Up @@ -150,12 +150,12 @@ public static void main(String[] args) throws Exception {
}

// Create public tenant
PropertyAdmin publicProperty = new PropertyAdmin();
byte[] publicPropertyDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
TenantInfo publicTenant = new TenantInfo();
byte[] publicPropertyDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant);
try {
ZkUtils.createFullPathOptimistic(
globalZk,
POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY,
POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT,
publicPropertyDataJson,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Expand All @@ -170,7 +170,7 @@ public static void main(String[] args) throws Exception {
try {
ZkUtils.createFullPathOptimistic(
globalZk,
POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + TopicName.DEFAULT_NAMESPACE,
POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE,
defaultNamespaceDataJson,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Expand Down
Expand Up @@ -33,7 +33,7 @@
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand Down Expand Up @@ -229,9 +229,9 @@ void start() throws Exception {
admin.clusters().createCluster(globalCluster, new ClusterData(null, null));
}

if (!admin.properties().getProperties().contains(property)) {
admin.properties().createProperty(property,
new PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
if (!admin.tenants().getTenants().contains(property)) {
admin.tenants().createTenant(property,
new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
}

if (!admin.namespaces().getNamespaces(property).contains(namespace)) {
Expand All @@ -242,13 +242,12 @@ void start() throws Exception {
}

// Create a public tenant and default namespace
final String publicTenant = TopicName.PUBLIC_PROPERTY;
final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + TopicName.DEFAULT_NAMESPACE;
final String publicTenant = TopicName.PUBLIC_TENANT;
final String defaultNamespace = TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE;
try {
if (!admin.properties().getProperties().contains(publicTenant)) {
admin.properties().createProperty(
publicTenant,
new PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
if (!admin.tenants().getTenants().contains(publicTenant)) {
admin.tenants().createTenant(publicTenant,
new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
}
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
Expand Down
Expand Up @@ -25,7 +25,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.net.URI;
import java.util.List;
Expand All @@ -47,13 +49,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
Expand All @@ -74,13 +69,20 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
Expand All @@ -103,8 +105,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response;

/**
* Main class for Pulsar broker service
*/
Expand Down Expand Up @@ -832,7 +832,7 @@ private void startWorkerService() throws InterruptedException, IOException, Keep
this.getGlobalZkCache().getZooKeeper().create(
AdminResource.path(POLICIES, property),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
new PropertyAdmin(
new TenantInfo(
Sets.newHashSet(config.getSuperUserRoles()),
Sets.newHashSet(cluster))),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Expand Down
Expand Up @@ -50,7 +50,7 @@
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -123,8 +123,8 @@ protected void validateSuperUserAccess() {

// This is a stub method for Mockito
@Override
protected void validateAdminAccessOnProperty(String property) {
super.validateAdminAccessOnProperty(property);
protected void validateAdminAccessForTenant(String property) {
super.validateAdminAccessForTenant(property);
}

// This is a stub method for Mockito
Expand Down Expand Up @@ -312,7 +312,7 @@ public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}

public ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
public ZooKeeperDataCache<TenantInfo> tenantsCache() {
return pulsar().getConfigurationCache().propertiesCache();
}

Expand Down Expand Up @@ -371,7 +371,7 @@ protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicNa
try {
checkConnect(topicName);
} catch (WebApplicationException e) {
validateAdminAccessOnProperty(topicName.getProperty());
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
Expand Down

0 comments on commit cc9a9d9

Please sign in to comment.