Skip to content

Commit

Permalink
Add compaction coordinator and compactor to cluster start / stop scripts
Browse files Browse the repository at this point in the history
Modified the cluster start/stop scripts to use a new file (cluster.yml)
for defining the hosts that will run the different server components. Added
a class (and test) that parses the yaml file into a form that is usable by
the scripts. Added ability to specify and start/stop the external compaction
server processes.

Closes apache#2138
  • Loading branch information
dlmarion committed Aug 20, 2021
1 parent bd46219 commit 232b7bb
Show file tree
Hide file tree
Showing 8 changed files with 536 additions and 112 deletions.
343 changes: 239 additions & 104 deletions assemble/bin/accumulo-cluster

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions assemble/bin/accumulo-service
Expand Up @@ -23,12 +23,14 @@ function print_usage {
Usage: accumulo-service <service> <command>
Services:
gc Accumulo garbage collector
monitor Accumulo monitor
manager Accumulo manager
master Accumulo master (Deprecated)
tserver Accumulo tserver
tracer Accumulo tracter
gc Accumulo garbage collector
monitor Accumulo monitor
manager Accumulo manager
master Accumulo master (Deprecated)
tserver Accumulo tserver
tracer Accumulo tracer
compaction-coordinator Accumulo compaction coordinator (experimental)
compactor Accumulo compactor (experimental)
Commands:
start Starts service
Expand Down Expand Up @@ -77,7 +79,7 @@ function start_service() {
rotate_log "$outfile"
rotate_log "$errfile"

nohup "${bin}/accumulo" "$service" >"$outfile" 2>"$errfile" < /dev/null &
nohup "${bin}/accumulo" "$service" "${@:2}" >"$outfile" 2>"$errfile" < /dev/null &
echo "$!" > "${pid_file}"

# Check the max open files limit and selectively warn
Expand Down Expand Up @@ -146,7 +148,7 @@ function main() {

pid_file="${ACCUMULO_PID_DIR}/accumulo-${service}${ACCUMULO_SERVICE_INSTANCE}.pid"
case "$service" in
gc|manager|monitor|tserver|tracer)
gc|manager|monitor|tserver|tracer|coordinator|compactor)
if [[ -z $2 ]]; then
invalid_args "<command> cannot be empty"
fi
Expand Down
4 changes: 4 additions & 0 deletions core/pom.xml
Expand Up @@ -129,6 +129,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
Expand Down
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.conf.cluster;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.yaml.snakeyaml.Yaml;

public class ClusterConfigParser {

public static Map<String,String> parseConfiguration(String configFile) throws IOException {
Map<String,String> results = new HashMap<>();
try (InputStream fis = Files.newInputStream(Paths.get(configFile), StandardOpenOption.READ)) {
Yaml y = new Yaml();
Map<String,Object> config = y.load(fis);
config.forEach((k, v) -> flatten("", k, v, results));
}
return results;
}

private static String addTheDot(String key) {
return (key.endsWith(".")) ? "" : ".";
}

@SuppressWarnings("unchecked")
private static void flatten(String parentKey, String key, Object value,
Map<String,String> results) {
String parent = (parentKey == null || parentKey == "") ? "" : parentKey + addTheDot(parentKey);
if (value instanceof String) {
results.put(parent + key, (String) value);
return;
} else if (value instanceof List) {
((List<?>) value).forEach(l -> {
if (l instanceof String) {
// remove the [] at the ends of toString()
String val = value.toString();
results.put(parent + key, val.substring(1, val.length() - 1).replace(", ", ","));
return;
} else {
flatten(parent, key, l, results);
}
});
} else if (value instanceof Map) {
((Map<String,Object>) value).forEach((k, v) -> flatten(parent + key, k, v, results));
} else {
throw new RuntimeException("Unhandled object type: " + value.getClass());
}
}

public static void main(String[] args) throws IOException {
if (args == null || args.length != 1) {
System.err.println("Usage: ClusterConfigParser <configFile>");
System.exit(1);
}
parseConfiguration(args[0]).forEach((k, v) -> System.out.println(k + ":" + v));
}

}
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.conf.cluster;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.net.URL;
import java.util.Map;

import org.junit.Test;

public class ClusterConfigParserTest {

@Test
public void testParse() throws Exception {
URL configFile = ClusterConfigParserTest.class.getResource("/cluster.yml");
assertNotNull(configFile);

Map<String,String> contents =
ClusterConfigParser.parseConfiguration(new File(configFile.toURI()).getAbsolutePath());
assertEquals(5, contents.size());
assertTrue(contents.containsKey("managers"));
assertEquals("localhost1,localhost2", contents.get("managers"));
assertTrue(contents.containsKey("monitors"));
assertEquals("localhost1,localhost2", contents.get("monitors"));
assertTrue(contents.containsKey("tracer"));
assertEquals("localhost", contents.get("tracer"));
assertTrue(contents.containsKey("gc"));
assertEquals("localhost", contents.get("gc"));
assertTrue(contents.containsKey("tservers"));
assertEquals("localhost1,localhost2,localhost3,localhost4", contents.get("tservers"));
assertFalse(contents.containsKey("compaction"));
assertFalse(contents.containsKey("compaction.coordinators"));
assertFalse(contents.containsKey("compaction.compactors"));
assertFalse(contents.containsKey("compaction.compactors.queues"));
assertFalse(contents.containsKey("compaction.compactors.q1"));
assertFalse(contents.containsKey("compaction.compactors.q2"));
}

@Test
public void testParseWithExternalCompactions() throws Exception {
URL configFile =
ClusterConfigParserTest.class.getResource("/cluster-with-external-compactions.yml");
assertNotNull(configFile);

Map<String,String> contents =
ClusterConfigParser.parseConfiguration(new File(configFile.toURI()).getAbsolutePath());
assertEquals(9, contents.size());
assertTrue(contents.containsKey("managers"));
assertEquals("localhost1,localhost2", contents.get("managers"));
assertTrue(contents.containsKey("monitors"));
assertEquals("localhost1,localhost2", contents.get("monitors"));
assertTrue(contents.containsKey("tracer"));
assertEquals("localhost", contents.get("tracer"));
assertTrue(contents.containsKey("gc"));
assertEquals("localhost", contents.get("gc"));
assertTrue(contents.containsKey("tservers"));
assertEquals("localhost1,localhost2,localhost3,localhost4", contents.get("tservers"));
assertFalse(contents.containsKey("compaction"));
assertTrue(contents.containsKey("compaction.coordinators"));
assertEquals("localhost1,localhost2", contents.get("compaction.coordinators"));
assertFalse(contents.containsKey("compaction.compactors"));
assertTrue(contents.containsKey("compaction.compactors.queues"));
assertEquals("q1,q2", contents.get("compaction.compactors.queues"));
assertTrue(contents.containsKey("compaction.compactors.q1"));
assertEquals("localhost1,localhost2", contents.get("compaction.compactors.q1"));
assertTrue(contents.containsKey("compaction.compactors.q2"));
assertEquals("localhost1,localhost2", contents.get("compaction.compactors.q2"));
}

}
53 changes: 53 additions & 0 deletions core/src/test/resources/cluster-with-external-compactions.yml
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

managers:
- localhost1
- localhost2

monitors:
- localhost1
- localhost2

tracer:
- localhost

gc:
- localhost

tservers:
- localhost1
- localhost2
- localhost3
- localhost4

compaction:
coordinators:
- localhost1
- localhost2
compactors:
- queues:
- q1
- q2
- q1:
- localhost1
- localhost2
- q2:
- localhost1
- localhost2
53 changes: 53 additions & 0 deletions core/src/test/resources/cluster.yml
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

managers:
- localhost1
- localhost2

monitors:
- localhost1
- localhost2

tracer:
- localhost

gc:
- localhost

tservers:
- localhost1
- localhost2
- localhost3
- localhost4

#compaction:
# coordinators:
# - localhost1
# - localhost2
# compactors:
# - queues:
# - q1
# - q2
# - q1:
# - localhost1
# - localhost2
# - q2:
# - localhost1
# - localhost2
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -566,6 +566,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.29</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
Expand Down

0 comments on commit 232b7bb

Please sign in to comment.