Skip to content

Commit

Permalink
Kubernetes Discovery SPI implementation based on fabric8io java client
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Sep 30, 2015
0 parents commit a248712
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 0 deletions.
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
target/
build/
.gradle/
.project
.classpath
.settings/
.idea/
.patch
.diff
*.iml
*.ipr
*.iws
.DS_Store
atlassian-ide-plugin.xml
34 changes: 34 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.noctarius.hazelcast</groupId>
<artifactId>hazelcast-kubernetes-discovery</artifactId>
<version>0.0.1-SNAPSHOT</version>

<properties>
<hazelcast.version>3.6-SNAPSHOT</hazelcast.version>
<kubernetes.version>1.3.3</kubernetes.version>
<dnsjava.version>2.1.7</dnsjava.version>
</properties>

<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>${hazelcast.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.version}</version>
</dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
<version>${dnsjava.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2015, Christoph Engelbert (aka noctarius) and
* contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.noctarius.hazelcast.kubernetes;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.discovery.DiscoveredNode;
import com.hazelcast.spi.discovery.SimpleDiscoveredNode;
import org.xbill.DNS.AAAARecord;
import org.xbill.DNS.ARecord;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.Record;
import org.xbill.DNS.TextParseException;
import org.xbill.DNS.Type;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static com.noctarius.hazelcast.kubernetes.KubernetesProperties.IpType;

final class DnsEndpointResolver extends HazelcastKubernetesDiscoveryStrategy.EndpointResolver {

private static final ILogger LOGGER = Logger.getLogger(DnsEndpointResolver.class);

private final String serviceDns;
private final IpType serviceDnsIpType;

public DnsEndpointResolver(String serviceDns, IpType serviceDnsIpType) {
this.serviceDns = serviceDns;
this.serviceDnsIpType = serviceDnsIpType;
}

List<DiscoveredNode> resolve() {
try {
Lookup lookup = buildLookup();
Record[] records = lookup.run();

if (lookup.getResult() != Lookup.SUCCESSFUL) {
LOGGER.warning("DNS lookup for serviceDns '" + serviceDns + "' failed");
return Collections.emptyList();
}

List<DiscoveredNode> discoveredNodes = new ArrayList<DiscoveredNode>();
for (Record record : records) {
if (record.getType() != Type.A && record.getType() != Type.AAAA) {
continue;
}

InetAddress inetAddress = getInetAddress(record);

int port = getServicePort(null);

Address address = new Address(inetAddress, port);
discoveredNodes.add(new SimpleDiscoveredNode(address, Collections.<String, Object>emptyMap()));
}

return discoveredNodes;
} catch (TextParseException e) {
throw new RuntimeException("Could not resolve services via DNS", e);
}
}

private InetAddress getInetAddress(Record record) {
if (record.getType() == Type.A) {
return ((ARecord) record).getAddress();
}
return ((AAAARecord) record).getAddress();
}

private Lookup buildLookup() throws TextParseException {
if (serviceDnsIpType == IpType.IPV6) {
return new Lookup(serviceDns, Type.AAAA);
}
return new Lookup(serviceDns, Type.A);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2015, Christoph Engelbert (aka noctarius) and
* contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.noctarius.hazelcast.kubernetes;

import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.properties.PropertyDefinition;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.discovery.DiscoveredNode;
import com.hazelcast.spi.discovery.DiscoveryMode;
import com.hazelcast.spi.discovery.DiscoveryStrategy;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;

import static com.noctarius.hazelcast.kubernetes.KubernetesProperties.IpType;

public final class HazelcastKubernetesDiscoveryStrategy implements DiscoveryStrategy {

private static final ILogger LOGGER = Logger.getLogger(HazelcastKubernetesDiscoveryStrategy.class);

private static final String HAZELCAST_SERVICE_PORT = "hazelcast-service-port";

private final EndpointResolver endpointResolver;

public HazelcastKubernetesDiscoveryStrategy(Map<String, Comparable> properties) {
String serviceDns = getOrNull(properties, KubernetesProperties.SERVICE_DNS);
IpType serviceDnsIpType = getOrDefault(properties, KubernetesProperties.SERVICE_DNS_IP_TYPE, IpType.IPV4);
String serviceName = getOrNull(properties, KubernetesProperties.SERVICE_NAME);
String namespace = getOrNull(properties, KubernetesProperties.NAMESPACE);

if (serviceDns != null && (serviceName == null || namespace == null)) {
throw new RuntimeException("For kubernetes discovery either 'service-dns' or " +
"'service-name' and 'namespace' must be set");
}

EndpointResolver endpointResolver;
if (serviceDns != null) {
endpointResolver = new DnsEndpointResolver(serviceDns, serviceDnsIpType);
} else {
endpointResolver = new ServiceEndpointResolver(serviceName, namespace);
}
this.endpointResolver = endpointResolver;
}

public void start(DiscoveryMode discoveryMode) {
}

public Iterable<DiscoveredNode> discoverNodes() {
return endpointResolver.resolve();
}

public void destroy() {
endpointResolver.destroy();
}

private <T extends Comparable> T getOrNull(Map<String, Comparable> properties, PropertyDefinition property) {
return getOrDefault(properties, property, null);
}

private <T extends Comparable> T getOrDefault(Map<String, Comparable> properties,
PropertyDefinition property, T defaultValue) {

if (properties == null || property == null) {
return defaultValue;
}

Comparable value = properties.get(property.key());
if (value == null) {
return defaultValue;
}

return (T) value;
}

static abstract class EndpointResolver {
abstract List<DiscoveredNode> resolve();

void destroy() {
}

protected InetAddress mapAddress(String address) {
if (address == null) {
return null;
}
try {
return InetAddress.getByName(address);
} catch (UnknownHostException e) {
LOGGER.warning("Address '" + address + "' could not be resolved");
}
return null;
}

protected int getServicePort(Map<String, Object> properties) {
int port = NetworkConfig.DEFAULT_PORT;
if (properties != null) {
String servicePort = (String) properties.get(HAZELCAST_SERVICE_PORT);
if (servicePort != null) {
port = Integer.parseInt(servicePort);
}
}
return port;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2015, Christoph Engelbert (aka noctarius) and
* contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.noctarius.hazelcast.kubernetes;

import com.hazelcast.config.properties.PropertyDefinition;
import com.hazelcast.spi.discovery.DiscoveryStrategy;
import com.hazelcast.spi.discovery.DiscoveryStrategyFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class HazelcastKubernetesDiscoveryStrategyFactory implements DiscoveryStrategyFactory {

private static final Collection<PropertyDefinition> PROPERTY_DEFINITIONS;

static {
List<PropertyDefinition> propertyDefinitions = new ArrayList<PropertyDefinition>();
propertyDefinitions.add(KubernetesProperties.SERVICE_DNS);
propertyDefinitions.add(KubernetesProperties.SERVICE_DNS_IP_TYPE);
propertyDefinitions.add(KubernetesProperties.SERVICE_NAME);
propertyDefinitions.add(KubernetesProperties.NAMESPACE);
PROPERTY_DEFINITIONS = Collections.unmodifiableCollection(propertyDefinitions);
}

public Class<? extends DiscoveryStrategy> getDiscoveryStrategyType() {
return HazelcastKubernetesDiscoveryStrategy.class;
}

public DiscoveryStrategy newDiscoveryStrategy(Map<String, Comparable> properties) {
return new HazelcastKubernetesDiscoveryStrategy(properties);
}

public Collection<PropertyDefinition> getConfigurationProperties() {
return PROPERTY_DEFINITIONS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2015, Christoph Engelbert (aka noctarius) and
* contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.noctarius.hazelcast.kubernetes;

import com.hazelcast.config.properties.PropertyDefinition;
import com.hazelcast.config.properties.SimplePropertyDefinition;
import com.hazelcast.core.TypeConverter;

import static com.hazelcast.config.properties.PropertyTypeConverter.STRING;

public final class KubernetesProperties {

public static final PropertyDefinition SERVICE_DNS = property("service-dns", STRING);
public static final PropertyDefinition SERVICE_DNS_IP_TYPE = property("service-dns-ip-type", new IpTypeConverter());
public static final PropertyDefinition SERVICE_NAME = property("service-name", STRING);
public static final PropertyDefinition NAMESPACE = property("namespace", STRING);

// Prevent instantiation
private KubernetesProperties() {
}

private static PropertyDefinition property(String key, TypeConverter typeConverter) {
return new SimplePropertyDefinition(key, true, typeConverter);
}

public enum IpType {
IPV4,
IPV6
}

private static class IpTypeConverter implements TypeConverter {

public Comparable convert(Comparable value) {
if (!(value instanceof String)) {
throw new RuntimeException("Cannot convert from type '" + value.getClass() + "'");
}

String v = (String) value;
if (v == null || v.length() == 0) {
return IpType.IPV4;
}

IpType ipType = IpType.valueOf(v);
if (ipType == null) {
throw new RuntimeException("service-name-ip-type must either be IPV4, IPV6 or empty");
}

return ipType;
}
}
}
Loading

0 comments on commit a248712

Please sign in to comment.