Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #3874] Add a load balance strategy: source ip hash #3875

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
* LoadBalance Interface
*
* <p> see {@link RandomLoadBalanceSelector}
* <p> see {@link WeightRandomLoadBalanceSelector}
* <p> see {@link WeightRoundRobinLoadBalanceSelector}
* <p> see {@link SourceIPHashLoadBalanceSelector}
*
* @param <T> Target type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public enum LoadBalanceType {
RANDOM(0, "random load balance strategy"),
WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy"),
WEIGHT_RANDOM(2, "weight random load balance strategy");
WEIGHT_RANDOM(2, "weight random load balance strategy"),
SOURCE_IP_HASH(3, "source IP hash load balance strategy");

private final int code;
private final String desc;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;


import lombok.extern.slf4j.Slf4j;

/**
* Source IP Hash LoadBalance: make the same client always accessing the same server.
*
* @param <T> Target type
*/
@Slf4j
public class SourceIPHashLoadBalanceSelector<T> implements LoadBalanceSelector<T> {

private final transient List<T> servers;

private String clientKey;

public SourceIPHashLoadBalanceSelector(List<T> servers, String clientKey) {
this.servers = servers;
this.clientKey = clientKey;
}

@Override
public T select() {
// Avoid servers being changed during select().
List<T> targets = Collections.unmodifiableList(servers);
if (StringUtils.isBlank(clientKey)) {
clientKey = "127.0.0.1";
log.warn("Blank client IP has been set default {}", clientKey);
}
int hashCode = hash(clientKey);
int index = hashCode % targets.size();
return targets.get(index);
}

@Override
public LoadBalanceType getType() {
return LoadBalanceType.SOURCE_IP_HASH;
}

/**
* FNV hash algorithm that is suitable for hashing some similar strings, like IP.
* @return
*/
private int hash(String data) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < data.length(); i++) {
hash = (hash ^ data.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.utils;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

/**
* A string utils as supplement of org.apache.commons.lang3.StringUtils
*/
public class CommonStringUtils extends StringUtils {

/**
* Compares given string to a CharSequences vararg of searchStrings,
* returning true if the string is equal to all of the searchStrings.
*
* CommonStringUtils.equalsAll("abc", "abc", "def") = false
* CommonStringUtils.equalsAll(null, "abc", "def") = false
* CommonStringUtils.equalsAll(null, (CharSequence[]) null) = true
* CommonStringUtils.equalsAll(null, null, null) = true
* CommonStringUtils.equalsAll("abc", "abc", "abc") = true
*
* @param string
* @param searchStrings
* @return
*/
public static boolean equalsAll(final CharSequence string, final CharSequence... searchStrings) {
if (ArrayUtils.isNotEmpty(searchStrings)) {
for (final CharSequence next : searchStrings) {
if (!equals(string, next)) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.apache.eventmesh.common.utils.CommonStringUtils;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SourceIPHashLoadBalanceSelectorTest {

private SourceIPHashLoadBalanceSelector<String> loadBalanceSelector;

private List<String> servers;

private String client1Key;

private String client2Key;

@Before
public void init() {
servers = Arrays.asList(new String[]{
"192.168.1.10", "192.168.1.11",
"192.168.1.12", "192.168.1.13",
"192.168.1.14", "192.168.1.15"
});
client1Key = "192.168.1.1-1-tester1-TLSv1.2";
client2Key = "192.168.1.2-1-tester2-TLSv1.2";
loadBalanceSelector = new SourceIPHashLoadBalanceSelector<>(servers, client1Key);
}

@Test
public void testSelect() {
String target1 = loadBalanceSelector.select();
String target2 = loadBalanceSelector.select();
String target3 = loadBalanceSelector.select();
Assert.assertTrue(CommonStringUtils.equalsAll(target1, target2, target3));

loadBalanceSelector = new SourceIPHashLoadBalanceSelector<>(servers, client2Key);
String target4 = loadBalanceSelector.select();
Assert.assertFalse(StringUtils.equals(target1, target4));
}

@Test
public void testType() {
Assert.assertEquals(LoadBalanceType.SOURCE_IP_HASH, loadBalanceSelector.getType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.utils;

import org.junit.Assert;
import org.junit.Test;

public class CommonStringUtilsTest {

@Test
public void testEqualsAll() {
Assert.assertTrue(CommonStringUtils.equalsAll(null, null));
Assert.assertTrue(CommonStringUtils.equalsAll(null, null, null));
Assert.assertTrue(CommonStringUtils.equalsAll("", "", ""));
Assert.assertTrue(CommonStringUtils.equalsAll("abc", "abc", "abc"));
Assert.assertFalse(CommonStringUtils.equalsAll(null, "abc", "def"));
Assert.assertFalse(CommonStringUtils.equalsAll("abc", "def", "ghi"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.RandomLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.SourceIPHashLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.Weight;
import org.apache.eventmesh.common.loadbalance.WeightRandomLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.WeightRoundRobinLoadBalanceSelector;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.SystemUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -55,6 +59,10 @@ public static LoadBalanceSelector<String> createEventMeshServerLoadBalanceSelect
eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(
eventMeshHttpClientConfig));
break;
case SOURCE_IP_HASH:
eventMeshServerSelector = new SourceIPHashLoadBalanceSelector<>(
buildClusterGroupFromConfig(eventMeshHttpClientConfig), toClientKey(eventMeshHttpClientConfig));
break;
default:
// ignore
}
Expand All @@ -64,6 +72,27 @@ public static LoadBalanceSelector<String> createEventMeshServerLoadBalanceSelect
return eventMeshServerSelector;
}

/**
* get client unique key(format: IP-pid-userName-sslClientProtocol) from EventMeshHttpClientConfig.
* @param eventMeshHttpClientConfig
* @return
*/
private static String toClientKey(EventMeshHttpClientConfig eventMeshHttpClientConfig) {
String ip = eventMeshHttpClientConfig.getIp();
if (StringUtils.equals(ip, "localhost")) {
ip = IPUtils.getLocalAddress();
}
String pid = eventMeshHttpClientConfig.getPid();
if (StringUtils.isBlank(pid)) {
pid = SystemUtils.getProcessId();
}
return StringUtils.joinWith("-",
ip, pid,
eventMeshHttpClientConfig.getUserName(),
eventMeshHttpClientConfig.getSslClientProtocol()
);
}

private static List<Weight<String>> buildWeightedClusterGroupFromConfig(
final EventMeshHttpClientConfig eventMeshHttpClientConfig)
throws EventMeshException {
Expand Down