Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.geaflow.analytics.service.client;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.geaflow.analytics.service.query.QueryResults;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.rpc.HostAndPort;
import org.apache.geaflow.pipeline.service.ServiceType;
import org.testng.Assert;
import org.testng.annotations.Test;

public class AbstractQueryRunnerTest {

@Test
public void testQueryRunnerStatusMethods() {
try (TestQueryRunner runner = new TestQueryRunner()) {
QueryRunnerContext context = QueryRunnerContext.newBuilder()
.setConfiguration(new Configuration())
.setHost(new HostAndPort("localhost", 8080))
.build();

runner.init(context);

Assert.assertTrue(runner.isRunning());
Assert.assertFalse(runner.isAborted());
Assert.assertFalse(runner.isError());
Assert.assertFalse(runner.isFinished());

runner.setStatus(QueryRunnerStatus.ERROR);
Assert.assertFalse(runner.isRunning());
Assert.assertTrue(runner.isError());

runner.setStatus(QueryRunnerStatus.ABORTED);
Assert.assertTrue(runner.isAborted());

runner.setStatus(QueryRunnerStatus.FINISHED);
Assert.assertTrue(runner.isFinished());
}
}

@Test
public void testInitWithHostAndPort() {
try (TestQueryRunner runner = new TestQueryRunner()) {
Configuration config = new Configuration();
HostAndPort hostAndPort = new HostAndPort("test-host", 9090);

QueryRunnerContext context = QueryRunnerContext.newBuilder()
.setConfiguration(config)
.setHost(hostAndPort)
.build();

runner.init(context);

Assert.assertNotNull(runner.getAnalyticsServiceInfo());
Assert.assertEquals(runner.getAnalyticsServiceInfo().getCoordinatorNum(), 1);
Assert.assertEquals(runner.getAnalyticsServiceInfo().getCoordinatorAddresses(0), hostAndPort);
}
}

private static class TestQueryRunner extends AbstractQueryRunner {

@Override
protected void initManagedChannel(HostAndPort address) {
// No-op for test
}

@Override
public QueryResults executeQuery(String queryScript) {
return null;
}

@Override
public ServiceType getServiceType() {
return ServiceType.analytics_http;
}

@Override
public QueryResults cancelQuery(long queryId) {
return null;
}

@Override
public void close() {
// No-op for test
}

public void setStatus(QueryRunnerStatus status) {
if (this.queryRunnerStatus == null) {
this.queryRunnerStatus = new AtomicReference<>(status);
} else {
this.queryRunnerStatus.set(status);
}
}

public AnalyticsServiceInfo getAnalyticsServiceInfo() {
return this.analyticsServiceInfo;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.geaflow.analytics.service.client;

import org.apache.geaflow.analytics.service.config.AnalyticsClientConfigKeys;
import org.apache.geaflow.common.config.Configuration;
import org.testng.Assert;
import org.testng.annotations.Test;

public class AnalyticsClientBuilderTest {

@Test
public void testBuilderWithHost() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withHost("localhost");
Assert.assertEquals(builder.getHost(), "localhost");
}

@Test
public void testBuilderWithPort() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withPort(8080);
Assert.assertEquals(builder.getPort(), 8080);
}

@Test
public void testBuilderWithUser() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withUser("testUser");
Assert.assertEquals(builder.getUser(), "testUser");
}

@Test
public void testBuilderWithTimeoutMs() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withTimeoutMs(5000);
Assert.assertEquals(
builder.getConfiguration().getString(AnalyticsClientConfigKeys.ANALYTICS_CLIENT_CONNECT_TIMEOUT_MS),
"5000"
);
}

@Test
public void testBuilderWithRetryNum() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withRetryNum(3);
Assert.assertEquals(builder.getQueryRetryNum(), 3);
Assert.assertEquals(
builder.getConfiguration().getString(AnalyticsClientConfigKeys.ANALYTICS_CLIENT_CONNECT_RETRY_NUM),
"3"
);
}

@Test
public void testBuilderWithInitChannelPools() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withInitChannelPools(true);
Assert.assertTrue(builder.enableInitChannelPools());
}

@Test
public void testBuilderWithConfiguration() {
Configuration config = new Configuration();
config.put("test.key", "test.value");

AnalyticsClientBuilder builder = new AnalyticsClientBuilder();
builder.withConfiguration(config);

Assert.assertEquals(builder.getConfiguration().getString("test.key"), "test.value");
}

@Test
public void testBuilderChaining() {
AnalyticsClientBuilder builder = new AnalyticsClientBuilder()
.withHost("localhost")
.withPort(8080)
.withUser("testUser")
.withTimeoutMs(5000)
.withRetryNum(3)
.withInitChannelPools(true);

Assert.assertEquals(builder.getHost(), "localhost");
Assert.assertEquals(builder.getPort(), 8080);
Assert.assertEquals(builder.getUser(), "testUser");
Assert.assertEquals(builder.getQueryRetryNum(), 3);
Assert.assertTrue(builder.enableInitChannelPools());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.geaflow.analytics.service.client;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.geaflow.common.rpc.HostAndPort;
import org.testng.Assert;
import org.testng.annotations.Test;

public class AnalyticsServiceInfoTest {

@Test
public void testAnalyticsServiceInfoWithServerName() {
List<HostAndPort> addresses = Arrays.asList(
new HostAndPort("host1", 8080),
new HostAndPort("host2", 8081)
);

AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo("testServer", addresses);

Assert.assertEquals(serviceInfo.getServerName(), "testServer");
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(), addresses);
Assert.assertEquals(serviceInfo.getCoordinatorNum(), 2);
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0), addresses.get(0));
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(1), addresses.get(1));
}

@Test
public void testAnalyticsServiceInfoWithoutServerName() {
List<HostAndPort> addresses = Collections.singletonList(
new HostAndPort("localhost", 9090)
);

AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo(addresses);

Assert.assertNull(serviceInfo.getServerName());
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(), addresses);
Assert.assertEquals(serviceInfo.getCoordinatorNum(), 1);
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0), addresses.get(0));
}

@Test
public void testGetCoordinatorAddressByIndex() {
List<HostAndPort> addresses = Arrays.asList(
new HostAndPort("host1", 8080),
new HostAndPort("host2", 8081),
new HostAndPort("host3", 8082)
);

AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo(addresses);

Assert.assertEquals(serviceInfo.getCoordinatorNum(), 3);
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0).getHost(), "host1");
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0).getPort(), 8080);
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(1).getHost(), "host2");
Assert.assertEquals(serviceInfo.getCoordinatorAddresses(2).getHost(), "host3");
}

@Test
public void testEmptyCoordinatorAddresses() {
List<HostAndPort> addresses = Collections.emptyList();
AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo(addresses);

Assert.assertEquals(serviceInfo.getCoordinatorNum(), 0);
Assert.assertTrue(serviceInfo.getCoordinatorAddresses().isEmpty());
}
}
Loading
Loading