Skip to content

Commit

Permalink
Merge branch 'STORM-2438' of https://github.com/revans2/incubator-storm
Browse files Browse the repository at this point in the history
… into STORM-2438

STORM-2438: added in rebalance changes to support RAS

This closes #2345
  • Loading branch information
Robert Evans committed Oct 10, 2017
2 parents f867e3a + d1e6bbb commit a66c4a5
Show file tree
Hide file tree
Showing 52 changed files with 3,982 additions and 1,541 deletions.
23 changes: 12 additions & 11 deletions bin/storm.py
Expand Up @@ -561,25 +561,26 @@ def deactivate(*args):
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])

def rebalance(*args):
"""Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]
"""Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]* [-r '{"component1": {"resource1": new_amount, "resource2": new_amount, ... }*}'] [-t '{"conf1": newValue, *}']]
Sometimes you may wish to spread out where the workers for a topology
are running. For example, let's say you have a 10 node cluster running
Sometimes you may wish to spread out the workers for a running topology.
For example, let's say you have a 10 node cluster running
4 workers per node, and then let's say you add another 10 nodes to
the cluster. You may wish to have Storm spread out the workers for the
running topology so that each node runs 2 workers. One way to do this
is to kill the topology and resubmit it, but Storm provides a "rebalance"
command that provides an easier way to do this.
Rebalance will first deactivate the topology for the duration of the
message timeout (overridable with the -w flag) and then redistribute
the workers evenly around the cluster. The topology will then return to
its previous state of activation (so a deactivated topology will still
be deactivated and an activated topology will go back to being activated).
The rebalance command can also be used to change the parallelism of a running topology.
Use the -n and -e switches to change the number of workers or number of executors of a component
respectively.
message timeout (overridable with the -w flag) make requested adjustments to the topology
and let the scheduler try to find a better scheduling based off of the
new situation. The topology will then return to its previous state of activation
(so a deactivated topology will still be deactivated and an activated
topology will go back to being activated).
Some of what you can change about a topology includes the number of requested workers (-n flag)
The number of executors for a given component (-e flag) the resources each component is
requesting as used by the resource aware scheduler (-r flag) and configs (-t flag).
"""
if not args:
print_usage(command="rebalance")
Expand Down
Expand Up @@ -117,6 +117,11 @@ public void createStateInZookeeper(String key) {

@Override
public void shutdown() {
close();
}

@Override
public void close() {
if(client != null) {
client.close();
client = null;
Expand Down
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.blobstore;

import org.apache.storm.daemon.Shutdownable;
Expand Down Expand Up @@ -45,11 +46,8 @@
*
* For more detailed implementation
* @see org.apache.storm.blobstore.NimbusBlobStore
* @see org.apache.storm.blobstore.LocalFsBlobStore
* @see org.apache.storm.hdfs.blobstore.HdfsClientBlobStore
* @see org.apache.storm.hdfs.blobstore.HdfsBlobStore
*/
public abstract class ClientBlobStore implements Shutdownable {
public abstract class ClientBlobStore implements Shutdownable, AutoCloseable {
protected Map<String, Object> conf;

public interface WithBlobstore {
Expand All @@ -58,12 +56,8 @@ public interface WithBlobstore {

public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception {
Map<String, Object> conf = ConfigUtils.readStormConfig();
ClientBlobStore blobStore = Utils.getClientBlobStore(conf);

try {
try (ClientBlobStore blobStore = Utils.getClientBlobStore(conf)) {
withBlobstore.run(blobStore);
} finally {
blobStore.shutdown();
}
}

Expand Down Expand Up @@ -168,6 +162,8 @@ public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exce
*/
public abstract void createStateInZookeeper(String key);

public abstract void close();

/**
* Client facing API to create a blob.
* @param key blob key name.
Expand Down
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.blobstore;

import java.util.Iterator;
import java.util.Map;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.utils.NimbusClient;

/**
* A Client blob store for LocalMode.
*/
public class LocalModeClientBlobStore extends ClientBlobStore {
private final BlobStore wrapped;

public LocalModeClientBlobStore(BlobStore wrapped) {
this.wrapped = wrapped;
}

@Override
public void shutdown() {
wrapped.shutdown();
}

@Override
public void prepare(Map<String, Object> conf) {
//NOOP prepare should have already been called
}

@Override
protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException {
return wrapped.createBlob(key, meta, null);
}

@Override
public AtomicOutputStream updateBlob(String key) throws AuthorizationException, KeyNotFoundException {
return wrapped.updateBlob(key, null);
}

@Override
public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException {
return wrapped.getBlobMeta(key, null);
}

@Override
protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException {
wrapped.setBlobMeta(key, meta, null);
}

@Override
public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
wrapped.deleteBlob(key, null);
}

@Override
public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException {
return wrapped.getBlob(key, null);
}

@Override
public Iterator<String> listKeys() {
return wrapped.listKeys();
}

@Override
public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
try {
return wrapped.getBlobReplication(key, null);
} catch (AuthorizationException | KeyNotFoundException rethrow) {
throw rethrow;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
try {
return wrapped.updateBlobReplication(key, replication, null);
} catch (AuthorizationException | KeyNotFoundException rethrow) {
throw rethrow;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public boolean setClient(Map<String, Object> conf, NimbusClient client) {
return true;
}

@Override
public void createStateInZookeeper(String key) {
//NOOP
}

@Override
public void close() {
wrapped.shutdown();
}
}
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -279,6 +280,25 @@ public void forceMkdir(File path) throws IOException {
FileUtils.forceMkdir(path);
}

/**
* Makes a directory, including any necessary but nonexistent parent
* directories.
*
* @param path the directory to create
* @throws IOException on any error
*/
public void forceMkdir(Path path) throws IOException {
Files.createDirectories(path);
}

public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
return Files.newDirectoryStream(dir, filter);
}

public DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException {
return Files.newDirectoryStream(dir);
}

/**
* Check if a file exists or not
* @param path the path to check
Expand All @@ -289,6 +309,16 @@ public boolean fileExists(File path) throws IOException {
return path.exists();
}

/**
* Check if a file exists or not
* @param path the path to check
* @return true if it exists else false
* @throws IOException on any error.
*/
public boolean fileExists(Path path) throws IOException {
return Files.exists(path);
}

/**
* Get a writer for the given location
* @param file the file to write to
Expand Down
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.nio.file.DirectoryStream;
import java.nio.file.Path;
import java.util.Map;

public interface IAdvancedFSOps {
Expand Down Expand Up @@ -115,6 +117,32 @@ public interface IAdvancedFSOps {
*/
void forceMkdir(File path) throws IOException;

/**
* Makes a directory, including any necessary but nonexistent parent
* directories.
*
* @param path the directory to create
* @throws IOException on any error
*/
void forceMkdir(Path path) throws IOException;

/**
* List the contents of a directory.
* @param dir the driectory to list the contents of
* @param filter a filter to decide if it should be included or not
* @return A stream of directory entries
* @throws IOException on any error
*/
DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException;

/**
* List the contents of a directory.
* @param dir the driectory to list the contents of
* @return A stream of directory entries
* @throws IOException on any error
*/
DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException;

/**
* Check if a file exists or not
* @param path the path to check
Expand All @@ -123,6 +151,14 @@ public interface IAdvancedFSOps {
*/
boolean fileExists(File path) throws IOException;

/**
* Check if a file exists or not
* @param path the path to check
* @return true if it exists else false
* @throws IOException on any error.
*/
boolean fileExists(Path path) throws IOException;

/**
* Get a writer for the given location
* @param file the file to write to
Expand Down

0 comments on commit a66c4a5

Please sign in to comment.