Skip to content
Permalink
Browse files
Refactor out DefaultFormatter (#35) (#56)
* Refactor out usage of DefaultFormatter. It is an internal-only unstable
API that is not recommended for users, and as such is unsuitable to
include in examples.
  • Loading branch information
lbschanno authored and milleruntime committed Jul 9, 2019
1 parent 64abb33 commit 09f8d64bbb6d0374ed7858d41353bcc41235e0e5
Showing 4 changed files with 110 additions and 9 deletions.
@@ -36,7 +36,6 @@
<!-- TODO refactor code to remove the following exceptions -->
<allow class="org.apache.accumulo.tracer.TraceDump"/>
<allow class="org.apache.accumulo.core.trace.DistributedTrace"/>
<allow class="org.apache.accumulo.core.util.format.DefaultFormatter"/>
<!-- End TODO section -->

<!-- disallow everything else coming from accumulo -->
@@ -25,7 +25,7 @@
import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.examples.util.FormatUtil;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -40,7 +40,7 @@
public class ChunkInputFormat extends InputFormatBase<List<Entry<Key,Value>>,InputStream> {
@Override
public RecordReader<List<Entry<Key,Value>>,InputStream> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
TaskAttemptContext context) {
return new RecordReaderBase<List<Entry<Key,Value>>,InputStream>() {
private PeekingIterator<Entry<Key,Value>> peekingScannerIterator;

@@ -53,22 +53,27 @@ public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IO
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() throws IOException {
log.debug("nextKeyValue called");

currentK.clear();
if (peekingScannerIterator.hasNext()) {
++numKeysRead;
Entry<Key,Value> entry = peekingScannerIterator.peek();
while (!entry.getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
currentK.add(entry);
peekingScannerIterator.next();
if (!peekingScannerIterator.hasNext())
if (!peekingScannerIterator.hasNext()) {
return true;
}
entry = peekingScannerIterator.peek();
}
currentKey = entry.getKey();
((ChunkInputStream) currentV).setSource(peekingScannerIterator);
if (log.isTraceEnabled())
log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true));
if (log.isTraceEnabled()) {
log.trace("Processing key/value pair: " + FormatUtil.formatTableEntry(entry, true));
}

return true;
}
return false;
@@ -25,8 +25,8 @@
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.examples.cli.ClientOpts;
import org.apache.accumulo.examples.util.FormatUtil;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
import org.apache.hadoop.fs.Path;
@@ -59,7 +59,7 @@ public static class TTFMapper extends Mapper<Key,Value,NullWritable,Text> {
@Override
public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
Map.Entry<Key,Value> entry = new SimpleImmutableEntry<>(row, data);
context.write(NullWritable.get(), new Text(DefaultFormatter.formatEntry(entry, false)));
context.write(NullWritable.get(), new Text(FormatUtil.formatTableEntry(entry, false)));
context.setStatus("Outputed Value");
}
}
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.util;

import java.util.Map;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.io.Text;

public final class FormatUtil {

/**
* Format and return the specified table entry as a human-readable String suitable for logging.
* <br/>
* If {@code includeTimestamp} is true, the entry will be formatted as: <br/>
* {@literal <row> <columnFamily>:<columnQualifier> <columnVisibility> <timestamp>\t<value>} <br/>
* If false, the entry will be formatted as: <br/>
* {@literal <row> <columnFamily>:<columnQualifier> <columnVisibility>\t<value>} <br/>
* Examples: <br/>
* {@literal a ~chunk:\x00\x00\x00d\x00\x00\x00\x00 [A&B] 9223372036854775807 asdfjkl;}
* {@literal a ~chunk:\x00\x00\x00d\x00\x00\x00\x00 [A&B] asdfjkl;}
*
* @param entry
* the table entry to format
* @param includeTimestamp
* if true, include the timestamp in the returned result
* @return the specified entry as a formatted String, or null if the entry is null
*/
public static String formatTableEntry(final Map.Entry<Key,Value> entry,
final boolean includeTimestamp) {
if (entry == null) {
return null;
}

Key key = entry.getKey();
StringBuilder sb = new StringBuilder();
Text buffer = new Text();

// Append row.
appendBytes(sb, key.getRow(buffer).getBytes()).append(" ");

// Append column family.
appendBytes(sb, key.getColumnFamily().getBytes()).append(":");

// Append column qualifier.
appendBytes(sb, key.getColumnQualifier().getBytes()).append(" ");

// Append visibility and timestamp.
sb.append(new ColumnVisibility(key.getColumnVisibility(buffer)));

if (includeTimestamp) {
sb.append(" ").append(entry.getKey().getTimestamp());
}

// Append value.
Value value = entry.getValue();
if (value != null && value.getSize() > 0) {
sb.append("\t");
appendBytes(sb, value.get());
}
return sb.toString();
}

private static StringBuilder appendBytes(final StringBuilder sb, final byte[] ba) {
for (byte b : ba) {
int c = 0xff & b;
if (c == '\\') {
sb.append("\\\\");
} else if (c >= 32 && c <= 126) {
sb.append((char) c);
} else {
sb.append("\\x").append(String.format("%02X", c));
}
}
return sb;
}

private FormatUtil() {
throw new UnsupportedOperationException();
}
}

0 comments on commit 09f8d64

Please sign in to comment.