Skip to content

Commit

Permalink
Make zookeeper, kafka, controller, broker use random port for integra…
Browse files Browse the repository at this point in the history
…tion tests (#6872)

* Make zookeeper, kafka, controller use random port for integration tests

* Address comments
  • Loading branch information
xiangfu0 committed May 5, 2021
1 parent fe10fc1 commit fe596b6
Show file tree
Hide file tree
Showing 32 changed files with 237 additions and 82 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pinot_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
- name: Unit Test
env:
RUN_INTEGRATION_TESTS: false
MAVEN_OPTS: -Xmx2G -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=3 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
run: .github/workflows/scripts/.pinot_test.sh
- name: Upload coverage to Codecov
run: |
Expand All @@ -65,6 +66,7 @@ jobs:
- name: Integration Test
env:
RUN_INTEGRATION_TESTS: true
MAVEN_OPTS: -Xmx2G -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=3 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
run: .github/workflows/scripts/.pinot_test.sh
- name: Upload coverage to Codecov
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.helix.ControllerTest;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void setUp()
properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, 18099);

_brokerStarter =
new HelixBrokerStarter(new PinotConfiguration(properties), getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
new HelixBrokerStarter(new PinotConfiguration(properties), getHelixClusterName(), getZkUrl());
_brokerStarter.start();

addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKERS - 1, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ public void beforeTest() {

private HelixManager initHelixManager(String helixClusterName) {
return new FakeHelixManager(helixClusterName, BROKER_INSTANCE_ID, InstanceType.PARTICIPANT,
ZkStarter.DEFAULT_ZK_STR);
_zookeeperInstance.getZkUrl());
}

public class FakeHelixManager extends ZKHelixManager {
private ZkHelixPropertyStore<ZNRecord> _propertyStore;

FakeHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddress) {
super(clusterName, instanceName, instanceType, zkAddress);
super._zkclient = new ZkClient(StringUtil.join("/", StringUtils.chomp(ZkStarter.DEFAULT_ZK_STR, "/")),
super._zkclient = new ZkClient(StringUtil.join("/", StringUtils.chomp(_zookeeperInstance.getZkUrl(), "/")),
ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
_zkclient.deleteRecursively("/" + clusterName + "/PROPERTYSTORE");
_zkclient.createPersistent("/" + clusterName + "/PROPERTYSTORE", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class SegmentPrunerTest {
public void setUp() {
_zkInstance = ZkStarter.startLocalZkServer();
_zkClient =
new ZkClient(ZkStarter.DEFAULT_ZK_STR, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZkClient(_zkInstance.getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZNRecordSerializer());
_propertyStore =
new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/SegmentPrunerTest/PROPERTYSTORE", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class TimeBoundaryManagerTest {
public void setUp() {
_zkInstance = ZkStarter.startLocalZkServer();
_zkClient =
new ZkClient(ZkStarter.DEFAULT_ZK_STR, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZkClient(_zkInstance.getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZNRecordSerializer());
_propertyStore =
new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/TimeBoundaryManagerTest/PROPERTYSTORE", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.admin.AdminServer;
Expand All @@ -34,16 +35,21 @@
public class ZkStarter {
private static final Logger LOGGER = LoggerFactory.getLogger(ZkStarter.class);
public static final int DEFAULT_ZK_TEST_PORT = 2191;
public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT;
private static final int DEFAULT_ZK_CLIENT_RETRIES = 10;

public static class ZookeeperInstance {
private PublicZooKeeperServerMain _serverMain;
private String _dataDirPath;
private int _port;

private ZookeeperInstance(PublicZooKeeperServerMain serverMain, String dataDirPath) {
private ZookeeperInstance(PublicZooKeeperServerMain serverMain, String dataDirPath, int port) {
_serverMain = serverMain;
_dataDirPath = dataDirPath;
_port = port;
}

public String getZkUrl() {
return "localhost:" + _port;
}
}

Expand Down Expand Up @@ -130,7 +136,11 @@ public void shutdown() {
* Starts an empty local Zk instance on the default port
*/
public static ZookeeperInstance startLocalZkServer() {
return startLocalZkServer(DEFAULT_ZK_TEST_PORT);
return startLocalZkServer(NetUtils.findOpenPort(DEFAULT_ZK_TEST_PORT));
}

public static String getDefaultZkStr() {
return "localhost:" + DEFAULT_ZK_TEST_PORT;
}

/**
Expand Down Expand Up @@ -180,7 +190,7 @@ public void run() {
}
}
}
return new ZookeeperInstance(zookeeperServerMain, dataDirPath);
return new ZookeeperInstance(zookeeperServerMain, dataDirPath, port);
} catch (Exception e) {
LOGGER.warn("Caught exception while starting ZK", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public void testMultiGet() {
}
} catch (IOException e) {
++errors;
} finally {
if (getMethod != null) {
getMethod.releaseConnection();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static Map<String, Object> getDefaultControllerConfiguration() {
properties.put(ControllerConf.CONTROLLER_HOST, LOCAL_HOST);
properties.put(ControllerConf.CONTROLLER_PORT, DEFAULT_CONTROLLER_PORT);
properties.put(ControllerConf.DATA_DIR, DEFAULT_DATA_DIR);
properties.put(ControllerConf.ZK_STR, ZkStarter.DEFAULT_ZK_STR);
properties.put(ControllerConf.ZK_STR, _zookeeperInstance.getZkUrl());
properties.put(ControllerConf.HELIX_CLUSTER_NAME, getHelixClusterName());

return properties;
Expand Down Expand Up @@ -234,7 +234,7 @@ public static void addFakeBrokerInstanceToAutoJoinHelixCluster(String instanceId
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
ZkStarter.DEFAULT_ZK_STR);
_zookeeperInstance.getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeBrokerResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeBrokerResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
Expand Down Expand Up @@ -333,7 +333,7 @@ protected static void addFakeServerInstanceToAutoJoinHelixCluster(String instanc
int adminPort) throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
ZkStarter.DEFAULT_ZK_STR);
_zookeeperInstance.getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeSegmentOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeSegmentOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ public void setup() {
@Test
public void testHelixResourceManagerDuringControllerStart() {
startController();
stopController();
}

@AfterClass
public void teardown() {
stopController();
stopZk();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -132,13 +133,17 @@ protected void stopZk() {
}
}

protected String getZkUrl() {
return _zookeeperInstance.getZkUrl();
}

public Map<String, Object> getDefaultControllerConfiguration() {
Map<String, Object> properties = new HashMap<>();

properties.put(ControllerConf.CONTROLLER_HOST, LOCAL_HOST);
properties.put(ControllerConf.CONTROLLER_PORT, DEFAULT_CONTROLLER_PORT);
properties.put(ControllerConf.CONTROLLER_PORT, NetUtils.findOpenPort(DEFAULT_CONTROLLER_PORT));
properties.put(ControllerConf.DATA_DIR, DEFAULT_DATA_DIR);
properties.put(ControllerConf.ZK_STR, ZkStarter.DEFAULT_ZK_STR);
properties.put(ControllerConf.ZK_STR, getZkUrl());
properties.put(ControllerConf.HELIX_CLUSTER_NAME, getHelixClusterName());

return properties;
Expand Down Expand Up @@ -193,6 +198,10 @@ protected ControllerStarter getControllerStarter(ControllerConf config) {
return new ControllerStarter(config);
}

protected int getControllerPort() {
return _controllerPort;
}

protected void stopController() {
_controllerStarter.stop();
_controllerStarter = null;
Expand All @@ -210,7 +219,7 @@ protected void addFakeBrokerInstanceToAutoJoinHelixCluster(String instanceId, bo
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
ZkStarter.DEFAULT_ZK_STR);
getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeBrokerResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeBrokerResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
Expand Down Expand Up @@ -295,7 +304,7 @@ protected void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, bo
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
ZkStarter.DEFAULT_ZK_STR);
getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeSegmentOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeSegmentOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
Expand Down Expand Up @@ -394,7 +403,7 @@ protected void addFakeMinionInstanceToAutoJoinHelixCluster(String instanceId)
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT,
ZkStarter.DEFAULT_ZK_STR);
getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeMinionResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeMinionResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
Expand Down Expand Up @@ -114,7 +113,6 @@ public void testDualModeController() {
// Start the second dual-mode controller
properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CONTROLLER_MODE, ControllerConf.ControllerMode.DUAL);
properties.put(ControllerConf.CONTROLLER_PORT, DEFAULT_CONTROLLER_PORT + 1);
ControllerStarter secondDualModeController = getControllerStarter(new ControllerConf(properties));
secondDualModeController.start();
TestUtils
Expand Down Expand Up @@ -153,15 +151,14 @@ public void testDualModeController() {
return result;
}, TIMEOUT_IN_MS, "No one should be the partition leader for tables");

ZkClient zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
ZkClient zkClient = new ZkClient(getZkUrl());
TestUtils
.waitForCondition(aVoid -> !zkClient.exists("/" + getHelixClusterName() + "/CONTROLLER/LEADER"), TIMEOUT_IN_MS,
"No cluster leader should be shown in Helix cluster");
zkClient.close();

properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CONTROLLER_MODE, ControllerConf.ControllerMode.DUAL);
properties.put(ControllerConf.CONTROLLER_PORT, DEFAULT_CONTROLLER_PORT + 2);

ControllerStarter thirdDualModeController = getControllerStarter(new ControllerConf(properties));
thirdDualModeController.start();
Expand Down Expand Up @@ -215,7 +212,6 @@ public void testPinotOnlyController() {
// Start a Helix-only controller
properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CONTROLLER_MODE, ControllerConf.ControllerMode.HELIX_ONLY);
properties.put(ControllerConf.CONTROLLER_PORT, DEFAULT_CONTROLLER_PORT + 1);

ControllerStarter helixOnlyController = new ControllerStarter(new ControllerConf(properties));
helixOnlyController.start();
Expand All @@ -235,7 +231,6 @@ public void testPinotOnlyController() {
// Start the second Pinot-only controller
properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CONTROLLER_MODE, ControllerConf.ControllerMode.PINOT_ONLY);
properties.put(ControllerConf.CONTROLLER_PORT, DEFAULT_CONTROLLER_PORT + 2);

ControllerStarter secondPinotOnlyController = getControllerStarter(new ControllerConf(properties));
secondPinotOnlyController.start();
Expand Down Expand Up @@ -291,7 +286,7 @@ private void checkInstanceState(HelixAdmin helixAdmin) {

@AfterMethod
public void cleanUpCluster() {
ZkClient zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
ZkClient zkClient = new ZkClient(getZkUrl());
if (zkClient.exists("/" + getHelixClusterName())) {
zkClient.deleteRecursive("/" + getHelixClusterName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ public void testRemoveDeletedSegments()
createTestFileWithAge(dummyDir2.getAbsolutePath() + File.separator + "file" + i, i);
}

// Sleep 1 second to ensure the clock moves.
Thread.sleep(1000L);

// Check that dummy directories and files are successfully created.
Assert.assertEquals(dummyDir1.list().length, 3);
Assert.assertEquals(dummyDir2.list().length, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.Request;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
Expand Down Expand Up @@ -74,6 +74,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected static final long DEFAULT_COUNT_STAR_RESULT = 115545L;
protected static final int DEFAULT_LLC_SEGMENT_FLUSH_SIZE = 5000;
protected static final int DEFAULT_HLC_SEGMENT_FLUSH_SIZE = 20000;
protected static final int DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS = 3;
protected static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
protected static final int DEFAULT_HLC_NUM_KAFKA_BROKERS = 1;
protected static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
Expand Down Expand Up @@ -147,6 +148,9 @@ protected int getRealtimeSegmentFlushSize() {
}

protected int getNumKafkaBrokers() {
if (useKafkaTransaction()) {
return DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS;
}
if (useLlc()) {
return DEFAULT_LLC_NUM_KAFKA_BROKERS;
} else {
Expand All @@ -159,7 +163,7 @@ protected int getBaseKafkaPort() {
}

protected String getKafkaZKAddress() {
return KafkaStarterUtils.DEFAULT_ZK_STR;
return getZkUrl() + "/kafka";
}

protected int getNumKafkaPartitions() {
Expand Down Expand Up @@ -315,7 +319,7 @@ protected Map<String, String> getStreamConfigMap() {
StreamConfig.ConsumerType.LOWLEVEL.toString());
streamConfigMap.put(KafkaStreamConfigProperties
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST),
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
"localhost:" + _kafkaStarters.get(0).getPort());
if (useKafkaTransaction()) {
streamConfigMap.put(KafkaStreamConfigProperties
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL),
Expand All @@ -328,10 +332,10 @@ protected Map<String, String> getStreamConfigMap() {
StreamConfig.ConsumerType.HIGHLEVEL.toString());
streamConfigMap.put(KafkaStreamConfigProperties
.constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
KafkaStarterUtils.DEFAULT_ZK_STR);
getKafkaZKAddress());
streamConfigMap.put(KafkaStreamConfigProperties
.constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER),
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
"localhost:" + _kafkaStarters.get(0).getPort());
}
streamConfigMap.put(StreamConfigProperties
.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
Expand Down Expand Up @@ -398,7 +402,7 @@ protected TableConfig getRealtimeTableConfig() {
*/
protected org.apache.pinot.client.Connection getPinotConnection() {
if (_pinotConnection == null) {
_pinotConnection = ConnectionFactory.fromZookeeper(ZkStarter.DEFAULT_ZK_STR + "/" + getHelixClusterName());
_pinotConnection = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName());
}
return _pinotConnection;
}
Expand Down Expand Up @@ -500,8 +504,9 @@ protected List<File> getRealtimeAvroFiles(List<File> avroFiles, int numRealtimeS
}

protected void startKafka() {
Properties kafkaConfig = KafkaStarterUtils.getDefaultKafkaConfiguration();
_kafkaStarters = KafkaStarterUtils.startServers(getNumKafkaBrokers(), getBaseKafkaPort(), getKafkaZKAddress(),
KafkaStarterUtils.getDefaultKafkaConfiguration());
kafkaConfig);
_kafkaStarters.get(0)
.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
}
Expand Down

0 comments on commit fe596b6

Please sign in to comment.