Skip to content

Commit

Permalink
[TABLE SERVICE] Move integration tests under `stream/tests/integratio…
Browse files Browse the repository at this point in the history
…n` to `tests/integration/cluster`

Descriptions of the changes in this PR:

The original integration tests were written based a non-dockerized standalone stream cluster. Moved them to use
the dockerized integration test framework. So all the integration tests are actually testing the table service run as part of bookies.

This change is based on #1422 .
a371ff2 is the change in this PR to be reviewed.

Master Issue: #1205

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #1423 from sijie/move_more_stream_it_tests
  • Loading branch information
sijie committed May 23, 2018
1 parent 206443d commit e34dd38
Show file tree
Hide file tree
Showing 26 changed files with 208 additions and 523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
serverManager.close();
closeFuture.complete(null);
SharedResourceManager.shared().release(resources.scheduler(), scheduler);
scheduler.shutdown();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.NameResolver;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.clients.resolver.EndpointResolver;
import org.apache.bookkeeper.clients.utils.ClientConstants;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
Expand Down Expand Up @@ -56,6 +57,15 @@ public interface StorageClientSettings {
*/
List<Endpoint> endpoints();

/**
* Return the endpoint resolver for resolving individual endpoints.
*
* <p>The default resolver is an identity resolver.
*
* @return the endpoint resolver for resolving endpoints.
*/
EndpointResolver endpointResolver();

/**
* Returns the builder to create the managed channel.
*
Expand Down Expand Up @@ -99,6 +109,7 @@ class Builder extends StorageClientSettings_Builder {
numWorkerThreads(Runtime.getRuntime().availableProcessors());
usePlaintext(true);
backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
endpointResolver(EndpointResolver.identity());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.resolver.EndpointResolver;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
Expand All @@ -42,8 +44,12 @@
*/
public class StorageServerChannel implements AutoCloseable {

public static Function<Endpoint, StorageServerChannel> factory(boolean usePlaintext) {
return (endpoint) -> new StorageServerChannel(endpoint, Optional.empty(), usePlaintext);
public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) {
return (endpoint) -> new StorageServerChannel(
endpoint,
Optional.empty(),
settings.usePlaintext(),
settings.endpointResolver());
}

private final Optional<String> token;
Expand All @@ -63,14 +69,17 @@ public static Function<Endpoint, StorageServerChannel> factory(boolean usePlaint
*
* @param endpoint range server endpoint.
* @param token token used to access range server
* @param usePlainText whether to plain text protocol or not
*/
public StorageServerChannel(Endpoint endpoint,
Optional<String> token,
boolean usePlainText) {
boolean usePlainText,
EndpointResolver endpointResolver) {
this.token = token;
Endpoint resolvedEndpoint = endpointResolver.resolve(endpoint);
this.channel = ManagedChannelBuilder.forAddress(
endpoint.getHostname(),
endpoint.getPort())
resolvedEndpoint.getHostname(),
resolvedEndpoint.getPort())
.usePlaintext(usePlainText)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class StorageServerChannelManager implements AutoCloseable {
private final Function<Endpoint, StorageServerChannel> channelFactory;

public StorageServerChannelManager(StorageClientSettings settings) {
this(StorageServerChannel.factory(settings.usePlaintext()));
this(StorageServerChannel.factory(settings));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public StorageServerClientManagerImpl(StorageClientSettings settings,
this(
settings,
schedulerResource,
StorageServerChannel.factory(settings.usePlaintext()));
StorageServerChannel.factory(settings));
}

public StorageServerClientManagerImpl(StorageClientSettings settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.clients.resolver;

import org.apache.bookkeeper.stream.proto.common.Endpoint;

/**
* Resolve an endpoint to another endpoint.
*
* <p>The resolver can be used for resolving the right ip address for an advertised endpoint. It is typically useful
* in dockerized integration tests, where the test clients are typically outside of the docker network.
*/
public interface EndpointResolver {

/**
* Returns a resolver that always returns its input endpoint.
*
* @return a function that always returns its input endpoint
*/
static EndpointResolver identity() {
return endpoint -> endpoint;
}

/**
* Resolve <tt>endpoint</tt> to another endpoint.
*
* @param endpoint endpoint to resolve
* @return the resolved endpoint.
*/
Endpoint resolve(Endpoint endpoint);

}
1 change: 0 additions & 1 deletion stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
<module>storage</module>
<module>server</module>
<module>cli</module>
<module>tests</module>
</modules>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,16 @@
/**
* Placement policy to place ranges to group.
*/
@FunctionalInterface
public interface StorageContainerPlacementPolicy {

@FunctionalInterface
interface Factory {

StorageContainerPlacementPolicy newPlacementPolicy();

}

long placeStreamRange(long streamId, long rangeId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ private LifecycleComponent startServer() throws Exception {
bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
server = StorageServer.buildStorageServer(
serverConf,
grpcPort,
spec.numServers() * 2);
grpcPort);
server.start();
log.info("Started storage server at (bookie port = {}, grpc port = {})",
bookiePort, grpcPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
import org.apache.commons.configuration.CompositeConfiguration;
Expand Down Expand Up @@ -143,8 +144,7 @@ static int doMain(String[] args) {
try {
storageServer = buildStorageServer(
conf,
grpcPort,
1024);
grpcPort);
} catch (ConfigurationException e) {
log.error("Invalid storage configuration", e);
return ExitCode.INVALID_CONF.code();
Expand All @@ -168,15 +168,13 @@ static int doMain(String[] args) {
}

public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
int grpcPort,
int numStorageContainers)
int grpcPort)
throws UnknownHostException, ConfigurationException {
return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE);
}

public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
int grpcPort,
int numStorageContainers,
boolean startBookieAndStartProvider,
StatsLogger externalStatsLogger)
throws ConfigurationException, UnknownHostException {
Expand Down Expand Up @@ -250,12 +248,21 @@ public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
.withStorageConfiguration(storageConf)
// the storage resources shared across multiple components
.withStorageResources(storageResources)
// the number of storage containers
.withNumStorageContainers(numStorageContainers)
// the placement policy
.withStorageContainerPlacementPolicyFactory(() -> {
long numStorageContainers;
try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH)) {
numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
}
return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
})
// the default log backend uri
.withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
// with zk-based storage container manager
.withStorageContainerManagerFactory((ignored, storeConf, registry) ->
.withStorageContainerManagerFactory((storeConf, registry) ->
new ZkStorageContainerManager(
myEndpoint,
storageConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public StreamStorageLifecycleComponent(BookieConfiguration conf, StatsLogger sta
this.streamStorage = StorageServer.buildStorageServer(
conf.getUnderlyingConf(),
ssConf.getGrpcPort(),
1024, /* indicator */
false,
statsLogger.scope("stream"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ public interface StorageContainerManagerFactory {
/**
* Create a storage container manager to manage lifecycles of {@link StorageContainer}.
*
* @param numStorageContainers num of storage containers.
* @param conf storage configuration
* @param registry storage container registry
* @return storage container manager.
*/
StorageContainerManager create(int numStorageContainers,
StorageConfiguration conf,
StorageContainerManager create(StorageConfiguration conf,
StorageContainerRegistry registry);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import java.net.URI;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.RangeStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;

/**
Expand All @@ -38,8 +40,9 @@ public static RangeStoreBuilder newBuilder() {
private StorageConfiguration storeConf = null;
private StorageResources storeResources = null;
private StorageContainerManagerFactory scmFactory = null;
private StorageContainerPlacementPolicy.Factory placementPolicyFactory = () ->
StorageContainerPlacementPolicyImpl.of(1024);
private MVCCStoreFactory mvccStoreFactory = null;
private int numStorageContainers = 1024;
private URI defaultBackendUri = null;

private RangeStoreBuilder() {
Expand All @@ -52,7 +55,19 @@ private RangeStoreBuilder() {
* @return range store builder
*/
public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) {
this.numStorageContainers = numStorageContainers;
this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers);
return this;
}

/**
* Build the range store with the provided <tt>placementPolicyFactory</tt>.
*
* @param placementPolicyFactory placement policy factor to create placement policies.
* @return range store builder.
*/
public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
this.placementPolicyFactory = placementPolicyFactory;
return this;
}

Expand Down Expand Up @@ -130,14 +145,15 @@ public RangeStore build() {
checkNotNull(storeConf, "StorageConfiguration is not provided");
checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
checkNotNull(defaultBackendUri, "Default backend uri is not provided");
checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");

return new RangeStoreImpl(
storeConf,
storeResources.scheduler(),
scmFactory,
mvccStoreFactory,
defaultBackendUri,
numStorageContainers,
placementPolicyFactory,
statsLogger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;

Expand All @@ -71,15 +70,14 @@ public RangeStoreImpl(StorageConfiguration conf,
StorageContainerManagerFactory factory,
MVCCStoreFactory mvccStoreFactory,
URI defaultBackendUri,
int numStorageContainers,
StorageContainerPlacementPolicy.Factory placementPolicyFactory,
StatsLogger statsLogger) {
super("range-service", conf, statsLogger);
this.schedulerResource = schedulerResource;
this.scheduler = SharedResourceManager.shared().get(schedulerResource);
this.scmFactory = factory;
StorageContainerPlacementPolicy placementPolicy =
StorageContainerPlacementPolicyImpl.of(numStorageContainers);
this.storeFactory = mvccStoreFactory;
StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy();
this.scRegistry = new StorageContainerRegistryImpl(
new DefaultStorageContainerFactory(
conf,
Expand All @@ -88,7 +86,7 @@ public RangeStoreImpl(StorageConfiguration conf,
storeFactory,
defaultBackendUri),
scheduler);
this.scManager = scmFactory.create(numStorageContainers, conf, scRegistry);
this.scManager = scmFactory.create(conf, scRegistry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void setUp() throws Exception {
rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
.withStorageConfiguration(storageConf)
.withStorageResources(storageResources)
.withStorageContainerManagerFactory((numScs, storeConf, rgRegistry)
.withStorageContainerManagerFactory((storeConf, rgRegistry)
-> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2))
.withRangeStoreFactory(storeFactory)
.withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
Expand Down
Loading

0 comments on commit e34dd38

Please sign in to comment.