Skip to content

Commit

Permalink
Merge 4a426f9 into 9d9c067
Browse files Browse the repository at this point in the history
  • Loading branch information
dongzl committed Feb 12, 2020
2 parents 9d9c067 + 4a426f9 commit 53e8295
Show file tree
Hide file tree
Showing 19 changed files with 601 additions and 391 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -82,6 +82,7 @@
<curator.version>2.10.0</curator.version>
<opentracing.version>0.30.0</opentracing.version>
<apollo.client.version>1.5.0</apollo.client.version>
<nacos.client.verison>1.1.4</nacos.client.verison>

<lombok.version>1.16.20</lombok.version>

Expand Down
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.orchestration.center.util;

/**
* Config key utils.
*
* @author dongzonglei
*/
public final class ConfigKeyUtils {

private static final String DOT_SEPARATOR = ".";

private static final String PATH_SEPARATOR = "/";

/**
* Convert path to key.
*
* @param path config path
* @return config key
*/
public static String path2Key(final String path) {
String key = path.replace(PATH_SEPARATOR, DOT_SEPARATOR);
return key.substring(key.indexOf(DOT_SEPARATOR) + 1);
}

/**
* Convert key to path.
*
* @param key config key
* @return config path
*/
public static String key2Path(final String key) {
return new StringBuilder(PATH_SEPARATOR).append(key.replace(DOT_SEPARATOR, PATH_SEPARATOR)).toString();
}
}
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.orchestration.center.util;

import org.junit.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

public final class ConfigKeyUtilsTest {

@Test
public void assertPath2Key() {
assertThat(ConfigKeyUtils.path2Key("/orchestration_ds/config/schema/test/rule"), is("orchestration_ds.config.schema.test.rule"));
}

@Test
public void assertKey2Path() {
assertThat(ConfigKeyUtils.key2Path("orchestration_ds.config.schema.test.rule"), is("/orchestration_ds/config/schema/test/rule"));
}
}
Expand Up @@ -17,177 +17,101 @@

package org.apache.shardingsphere.orchestration.center.instance;

import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.enums.PropertyChangeType;
import com.ctrip.framework.apollo.model.ConfigChange;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import com.ctrip.framework.apollo.openapi.client.ApolloOpenApiClient;
import com.ctrip.framework.apollo.openapi.client.constant.ApolloOpenApiConstants;
import com.ctrip.framework.apollo.openapi.dto.NamespaceReleaseDTO;
import com.ctrip.framework.apollo.openapi.dto.OpenItemDTO;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.orchestration.center.api.ConfigCenter;
import org.apache.shardingsphere.orchestration.center.configuration.InstanceConfiguration;
import org.apache.shardingsphere.orchestration.center.instance.node.ConfigTreeNode;
import org.apache.shardingsphere.orchestration.center.instance.wrapper.ApolloConfigWrapper;
import org.apache.shardingsphere.orchestration.center.instance.wrapper.ApolloOpenApiWrapper;
import org.apache.shardingsphere.orchestration.center.listener.DataChangedEvent;
import org.apache.shardingsphere.orchestration.center.listener.DataChangedEventListener;
import org.apache.shardingsphere.orchestration.center.util.ConfigKeyUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Registry center for Apollo.
* Config center for Apollo.
*
* @author dongzonglei
*/
@Slf4j
public final class ApolloInstance implements ConfigCenter {

private static final String SHARDING_SPHERE_KEY_ROOT = "/";
private final Map<String, DataChangedEventListener> caches = new HashMap<>();

private static final String SHARDING_SPHERE_KEY_SEPARATOR = "/";
private ApolloConfigWrapper configWrapper;

private static final String APOLLO_KEY_SEPARATOR = ".";

private static final String APOLLO_KEY_APP_ID = "app.id";

private static final String APOLLO_KEY_ENV = "env";

private static final String APOLLO_KEY_CLUSTER = ConfigConsts.APOLLO_CLUSTER_KEY;

private static final String APOLLO_KEY_META = ConfigConsts.APOLLO_META_KEY;

private String namespace;

private String appId;

private String env;

private String clusterName;

private String administrator;

private Config apolloConfig;

private ApolloOpenApiClient client;

private ConfigTreeNode tree;
private ApolloOpenApiWrapper openApiWrapper;

@Getter
@Setter
private Properties properties = new Properties();

@Override
public void init(final InstanceConfiguration config) {
initApolloConfig(config);
initApolloOpenApiClient();
initKeysRelationship();
}

private void initApolloConfig(final InstanceConfiguration config) {
namespace = config.getNamespace();
appId = properties.getProperty("appId", "APOLLO_SHARDING_SPHERE");
env = properties.getProperty("env", "DEV");
clusterName = properties.getProperty("clusterName", ConfigConsts.CLUSTER_NAME_DEFAULT);
System.setProperty(APOLLO_KEY_APP_ID, appId);
System.setProperty(APOLLO_KEY_ENV, env);
System.setProperty(APOLLO_KEY_CLUSTER, clusterName);
System.setProperty(APOLLO_KEY_META, config.getServerLists());
apolloConfig = ConfigService.getConfig(namespace);
}

private void initApolloOpenApiClient() {
administrator = properties.getProperty("administrator");
String apolloToken = properties.getProperty("token");
String portalUrl = properties.getProperty("portalUrl");
Integer connectTimeout = Ints.tryParse(properties.getProperty("connectTimeout"));
Integer readTimeout = Ints.tryParse(properties.getProperty("readTimeout"));
client = ApolloOpenApiClient.newBuilder().withPortalUrl(portalUrl)
.withConnectTimeout(connectTimeout == null ? ApolloOpenApiConstants.DEFAULT_CONNECT_TIMEOUT : connectTimeout)
.withReadTimeout(readTimeout == null ? ApolloOpenApiConstants.DEFAULT_READ_TIMEOUT : readTimeout)
.withToken(apolloToken).build();
}

private void initKeysRelationship() {
List<OpenItemDTO> items = client.getNamespace(appId, env, clusterName, namespace).getItems();
Set<String> keys = Sets.newHashSet();
for (OpenItemDTO each : items) {
keys.add(each.getKey());
}
tree = ConfigTreeNode.create(keys, ".");
configWrapper = new ApolloConfigWrapper(config, properties);
openApiWrapper = new ApolloOpenApiWrapper(config, properties);
}

@Override
public String get(final String key) {
return apolloConfig.getProperty(convertKey(key), "");
}

private String convertKey(final String shardingSphereKey) {
return shardingSphereKey.replace(SHARDING_SPHERE_KEY_SEPARATOR, APOLLO_KEY_SEPARATOR).substring(1);
}

private String deConvertKey(final String apolloKey) {
return new StringBuilder(SHARDING_SPHERE_KEY_ROOT).append(apolloKey.replace(APOLLO_KEY_SEPARATOR, SHARDING_SPHERE_KEY_SEPARATOR)).toString();
String value = configWrapper.getProperty(ConfigKeyUtils.path2Key(key));
return Strings.isNullOrEmpty(value) ? openApiWrapper.getValue(ConfigKeyUtils.path2Key(key)) : value;
}

@Override
public List<String> getChildrenKeys(final String key) {
return new ArrayList<>(tree.getChildrenKeys(key));
return null;
}

@Override
public void persist(final String key, final String value) {
String apolloKey = convertKey(key);
updateKey(apolloKey, value);
publishNamespace();
tree.refresh(apolloKey, APOLLO_KEY_SEPARATOR);
}

private void updateKey(final String key, final String value) {
OpenItemDTO openItem = new OpenItemDTO();
openItem.setKey(key);
openItem.setValue(value);
openItem.setComment("ShardingSphere create or update config");
openItem.setDataChangeCreatedBy(administrator);
client.createOrUpdateItem(appId, env, clusterName, namespace, openItem);
}

private void publishNamespace() {
NamespaceReleaseDTO release = new NamespaceReleaseDTO();
release.setReleaseTitle("ShardingSphere namespace release");
release.setReleaseComment("ShardingSphere namespace release");
release.setReleasedBy(administrator);
release.setEmergencyPublish(true);
client.publishNamespace(appId, env, clusterName, namespace, release);
try {
openApiWrapper.persist(ConfigKeyUtils.path2Key(key), value);
// CHECKSTYLE:OFF
} catch (Exception ex) {
// CHECKSTYLE:ON
log.error("Apollo persist key '{}' throw exception: {}", key, ex);
}
}

@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
apolloConfig.addChangeListener(new ConfigChangeListener() {
String apolloKey = ConfigKeyUtils.path2Key(key);
caches.put(apolloKey, dataChangedEventListener);
ConfigChangeListener listener = new ConfigChangeListener() {

@Override
public void onChange(final ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
for (String changeKey : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(changeKey);
DataChangedEvent.ChangedType changedType = getChangedType(change.getChangeType());
if (DataChangedEvent.ChangedType.IGNORED != changedType) {
dataChangedEventListener.onChange(new DataChangedEvent(deConvertKey(key), change.getNewValue(), changedType));
if (DataChangedEvent.ChangedType.IGNORED == changedType) {
continue;
}
if (caches.get(changeKey) == null) {
continue;
}
caches.get(changeKey).onChange(new DataChangedEvent(ConfigKeyUtils.key2Path(changeKey), change.getNewValue(), changedType));
}
}
}, Sets.newHashSet(key));
};
configWrapper.addChangeListener(listener, Sets.newHashSet(apolloKey), Sets.newHashSet(apolloKey));
}

private DataChangedEvent.ChangedType getChangedType(final PropertyChangeType changeType) {
switch (changeType) {
case ADDED:
case MODIFIED:
return DataChangedEvent.ChangedType.UPDATED;
case DELETED:
Expand Down

0 comments on commit 53e8295

Please sign in to comment.