Skip to content

Commit

Permalink
Use NewTableConfiguration when creating tables
Browse files Browse the repository at this point in the history
Use Accumulo's NewTableConfiguration when creating new application
tables. This way, the initial table configuration, including locality
group settings and compaction iterators, are already set on the table at
the moment it is created, eliminating the possibility of race conditions
related to setting configuration after the table is created.

This change requires at least Accumulo 1.7.0.

Travis-CI configuration was updated to test against the latest version
of Accumulo (1.8.1) with the corresponding Thrift version (0.9.3)

Include test to verify locality group serialization can be deserialized
by Accumulo's public API, and adds some constants to refer to Accumulo
properties and column family and locality group names.
  • Loading branch information
ctubbsii committed Nov 10, 2017
1 parent 22d17f9 commit eb0889b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 31 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# the License.
language: java
jdk:
- oraclejdk8
script: mvn verify javadoc:jar
- openjdk8
script: mvn clean verify javadoc:jar -Daccumulo.version=1.8.1 -Dthrift.version=0.9.3
notifications:
irc:
channels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
Expand All @@ -21,5 +21,8 @@ public class AccumuloProps {
public static final String TABLE_CLASSPATH = "table.classpath.context";
public static final String TABLE_BLOCKCACHE_ENABLED = "table.cache.block.enable";
public static final String TABLE_FORMATTER_CLASS = "table.formatter";
public static final String TABLE_GROUP_PREFIX = "table.group.";
public static final String TABLE_GROUPS_ENABLED = "table.groups.enabled";
public static final Object TABLE_ITERATOR_PREFIX = "table.iterator.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
Expand All @@ -32,6 +32,8 @@ public class ColumnConstants {
public static final long DATA_PREFIX = 0xa000000000000000L;
public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
public static final Bytes NOTIFY_CF = Bytes.of("ntfy");
public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify";
public static final Bytes GC_CF = Bytes.of("gc");

private ColumnConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator;
import org.apache.fluo.accumulo.iterators.NotificationIterator;
Expand All @@ -47,6 +49,7 @@
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.observer.ObserverUtil;
Expand All @@ -56,7 +59,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
Expand All @@ -77,7 +79,6 @@ public class FluoAdminImpl implements FluoAdmin {
public FluoAdminImpl(FluoConfiguration config) {
this.config = config;


appRootDir = ZookeeperUtil.parseRoot(config.getAppZookeepers());
rootCurator = CuratorUtil.newRootFluoCurator(config);
rootCurator.start();
Expand Down Expand Up @@ -145,7 +146,8 @@ public void initialize(InitializationOptions opts)
}

try {
initialize(conn);
initializeApplicationInZooKeeper(conn);
Map<String, String> ntcProps = initializeApplicationTableProps();

String accumuloJars;
if (!config.getAccumuloJars().trim().isEmpty()) {
Expand All @@ -167,17 +169,19 @@ public void initialize(InitializationOptions opts)
String contextName = "fluo-" + config.getApplicationName();
conn.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
conn.tableOperations().setProperty(config.getAccumuloTable(), AccumuloProps.TABLE_CLASSPATH,
contextName);
ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
}

if (config.getObserverJarsUrl().isEmpty() && !config.getObserverInitDir().trim().isEmpty()) {
String observerUrl = copyDirToDfs(config.getObserverInitDir().trim(), "lib/observers");
config.setObserverJarsUrl(observerUrl);
}

conn.tableOperations().setProperty(config.getAccumuloTable(),
AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
ntcProps.put(AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");

NewTableConfiguration ntc = new NewTableConfiguration().withoutDefaultIterators();
ntc.setProperties(ntcProps);
conn.tableOperations().create(config.getAccumuloTable(), ntc);

updateSharedConfig();
} catch (NodeExistsException nee) {
Expand All @@ -190,7 +194,7 @@ public void initialize(InitializationOptions opts)
}
}

private void initialize(Connector conn) throws Exception {
private void initializeApplicationInZooKeeper(Connector conn) throws Exception {

final String accumuloInstanceName = conn.getInstance().getInstanceName();
final String accumuloInstanceID = conn.getInstance().getInstanceID();
Expand Down Expand Up @@ -221,23 +225,49 @@ private void initialize(Connector conn) throws Exception {
CuratorUtil.NodeExistsPolicy.FAIL);
CuratorUtil.putData(curator, ZookeeperPath.ORACLE_GC_TIMESTAMP, new byte[] {'0'},
CuratorUtil.NodeExistsPolicy.FAIL);
}

conn.tableOperations().create(config.getAccumuloTable(), false);
Map<String, Set<Text>> groups = new HashMap<>();
groups.put("notify", Collections.singleton(ByteUtil.toText(ColumnConstants.NOTIFY_CF)));
conn.tableOperations().setLocalityGroups(config.getAccumuloTable(), groups);

IteratorSetting gcIter = new IteratorSetting(10, "gc", GarbageCollectionIterator.class);
GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
private String encodeColumnFamily(Bytes cf) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < cf.length(); i++) {
int c = 0xff & cf.byteAt(i);
if (c == '\\') {
sb.append("\\\\");
} else if (c >= 32 && c <= 126 && c != ',') {
sb.append((char) c);
} else {
sb.append("\\x").append(String.format("%02X", c));
}
}
return sb.toString();
}

conn.tableOperations().attachIterator(config.getAccumuloTable(), gcIter,
EnumSet.of(IteratorUtil.IteratorScope.majc, IteratorUtil.IteratorScope.minc));
private Map<String, String> initializeApplicationTableProps() {
Map<String, String> ntcProps = new HashMap<>();
ntcProps.put(AccumuloProps.TABLE_GROUP_PREFIX + ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
encodeColumnFamily(ColumnConstants.NOTIFY_CF));
ntcProps.put(AccumuloProps.TABLE_GROUPS_ENABLED, ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME);

IteratorSetting gcIter =
new IteratorSetting(10, ColumnConstants.GC_CF.toString(), GarbageCollectionIterator.class);
GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
// the order relative to gc iter should not matter
IteratorSetting ntfyIter = new IteratorSetting(11, "ntfy", NotificationIterator.class);
IteratorSetting ntfyIter =
new IteratorSetting(11, ColumnConstants.NOTIFY_CF.toString(), NotificationIterator.class);

for (IteratorSetting setting : new IteratorSetting[] {gcIter, ntfyIter}) {
for (IteratorScope scope : EnumSet.of(IteratorUtil.IteratorScope.majc,
IteratorUtil.IteratorScope.minc)) {
String root = String.format("%s%s.%s", AccumuloProps.TABLE_ITERATOR_PREFIX,
scope.name().toLowerCase(), setting.getName());
for (Entry<String, String> prop : setting.getOptions().entrySet()) {
ntcProps.put(root + ".opt." + prop.getKey(), prop.getValue());
}
ntcProps.put(root, setting.getPriority() + "," + setting.getIteratorClass());
}
}

conn.tableOperations().attachIterator(config.getAccumuloTable(), ntfyIter,
EnumSet.of(IteratorUtil.IteratorScope.majc, IteratorUtil.IteratorScope.minc));
return ntcProps;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.integration.ITBase;
Expand All @@ -39,7 +40,7 @@ public class TimeskippingIT extends ITBase {
@Test
public void testTimestampSkippingIterPerformance() throws Exception {

conn.tableOperations().create("ttsi", false);
conn.tableOperations().create("ttsi", new NewTableConfiguration().withoutDefaultIterators());

BatchWriter bw = conn.createBatchWriter("ttsi", new BatchWriterConfig());
Mutation m = new Mutation("r1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
Expand All @@ -15,7 +15,16 @@

package org.apache.fluo.integration.client;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
Expand All @@ -26,6 +35,7 @@
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -92,6 +102,23 @@ public void testInitializeConfig() throws Exception {
InitializationOptions opts =
new InitializationOptions().setClearZookeeper(true).setClearTable(true);
admin.initialize(opts);

// verify locality groups were set on the table
Instance inst =
new ZooKeeperInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers());
Connector conn = inst.getConnector(config.getAccumuloUser(),
new PasswordToken(config.getAccumuloPassword()));
Map<String, Set<Text>> localityGroups =
conn.tableOperations().getLocalityGroups(config.getAccumuloTable());
Assert.assertEquals("Unexpected locality group count.", 1, localityGroups.size());
Entry<String, Set<Text>> localityGroup = localityGroups.entrySet().iterator().next();
Assert.assertEquals("'notify' locality group not found.",
ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
Assert.assertEquals("'notify' locality group does not contain exactly 1 column family.", 1,
localityGroup.getValue().size());
Text colFam = localityGroup.getValue().iterator().next();
Assert.assertTrue("'notify' locality group does not contain the correct column family.",
ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0, colFam.getLength()));
}

try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
Expand Down Expand Up @@ -156,4 +183,5 @@ public void testInitializeLongChroot() throws Exception {
Assert.assertNotNull(curator.checkExists().forPath(ZookeeperUtil.parseRoot(zk + longPath)));
}
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<url>https://github.com/apache/fluo/issues</url>
</issueManagement>
<properties>
<accumulo.version>1.6.5</accumulo.version>
<accumulo.version>1.7.3</accumulo.version>
<curator.version>2.7.1</curator.version>
<dropwizard.version>0.8.1</dropwizard.version>
<findbugs.maxRank>11</findbugs.maxRank>
Expand Down

0 comments on commit eb0889b

Please sign in to comment.