Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8da5933
Added support for grouping ranges per tablet when using AccumuloInput…
echeipesh Mar 23, 2015
47a2fae
Avoid casts by using AccumuloInputSplit interface
echeipesh Mar 25, 2015
b4101a3
upgrade warning to exception if batch scan is requested but not avail…
echeipesh Mar 25, 2015
ad83769
close underlying scanner with the RecordReader
echeipesh Mar 25, 2015
46a45c3
Upgrade AccumuloInputSplit to abstract class for code reuse
echeipesh Mar 31, 2015
7a09974
merging upstream master
echeipesh Mar 31, 2015
41e79cf
fix merge errors
echeipesh Mar 31, 2015
7b3beeb
add batch scan setters/getters to AccumuloInputFormat
echeipesh Mar 31, 2015
c84c8c0
fix casting error
echeipesh Mar 31, 2015
3549514
make findbugs happy
echeipesh Mar 31, 2015
7ac1cef
test BatchInputSplit generation
echeipesh Mar 31, 2015
9a47722
match code flow between mapred and mapreduce
echeipesh Mar 31, 2015
23f6d40
test all splits for correct type, not just head
echeipesh Mar 31, 2015
17fa276
Test MR job with BatchScan option enable
echeipesh Mar 31, 2015
4614efe
remove unused import
echeipesh Apr 1, 2015
4df19d7
Merge branch 'master' of https://github.com/apache/accumulo into ACCU…
echeipesh Apr 1, 2015
bc2a8ef
fix: broken merge, getting instance from job context
echeipesh Apr 1, 2015
6a0bbff
fix: creating iterator before column family set
echeipesh Apr 6, 2015
a23d8dc
move BatchInputSplit and AccumuloInputSplit to 'impl' package
echeipesh Apr 6, 2015
c5d1728
BatchInputSplit constructor public
echeipesh Apr 6, 2015
100813f
Deprecating RecordReaderBase.setupIterators with Scanner arguments
echeipesh Apr 6, 2015
15bbef9
expand test for BatchScan option
echeipesh Apr 6, 2015
b2a8997
factor out setting common split configuration
echeipesh Apr 6, 2015
cde47bc
fix: isBatchScan is public and correct
echeipesh Apr 6, 2015
ecc659c
improve progress reporting on BatchInputSplit
echeipesh Apr 6, 2015
ab322a0
fix: null reference in BatchInputSplit.getProgress
echeipesh Apr 6, 2015
951afd4
Preserving API of AbstractRecordReader.setupIterators
echeipesh Apr 6, 2015
070aef0
fix: use the imported type
echeipesh Apr 6, 2015
095b825
make deprecated AbstractRecordReader.setupIterators concrete with cal…
echeipesh Apr 6, 2015
fe8a1c2
unused import
echeipesh Apr 6, 2015
3d4cb40
ACCUMULO-3602 Get findbugs passing on new IT
joshelser Apr 7, 2015
6ed0175
Merge branch 'master' of https://github.com/apache/accumulo into ACCU…
echeipesh Apr 7, 2015
089afec
Merge pull request #1 from joshelser/ACCUMULO-3602
echeipesh Apr 7, 2015
00e7aa6
remove redundant conjunction
echeipesh Apr 7, 2015
77702c1
AbstractRecordReader.setupIterators mad concrete and private, abstrac…
echeipesh Apr 10, 2015
0157382
Check AccumuloInputFormat.getSplits for test success between testing …
echeipesh Apr 10, 2015
d3d628e
Removing configurable number of threads for BatchScan
echeipesh Apr 10, 2015
1299fdf
Removing trailing space and unused imports
echeipesh Apr 10, 2015
7a5c8c0
Expanded javadoc for setBatchScan
echeipesh Apr 10, 2015
0f23369
Added mapred.InputFormatBase.setBatchan and isBatchScan
echeipesh Apr 10, 2015
eaeef80
killing whitespace
echeipesh Apr 10, 2015
0b84321
merging upstream
echeipesh Apr 16, 2015
4409db9
Added mention and a warning about AutoAdjustRanges
echeipesh Apr 17, 2015
e86541a
Added warning to AutoAdjustRanges and upgraded to exception
echeipesh Apr 17, 2015
79af3e5
fix: missing semicolons
echeipesh Apr 17, 2015
964bf3d
moving check down to after autoAdjust config is read
echeipesh Apr 17, 2015
c264325
tailing whitespace
echeipesh Apr 17, 2015
a1c59a9
re-adding setupIterators overload for 1.6 compatability
echeipesh Apr 17, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/findbugs/exclude-filter.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<Package name="org.apache.accumulo.core.iterators" />
<Package name="org.apache.accumulo.core.trace" />
<Class name="org.apache.accumulo.core.client.mapred.RangeInputSplit" />
<Class name="org.apache.accumulo.core.client.mapred.impl.BatchInputSplit" />
<Class name="org.apache.accumulo.core.util.AddressUtil" />
<Class name="org.apache.accumulo.core.zookeeper.ZooUtil" />
</Or>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ public RecordReader<Key,Value> getRecordReader(InputSplit split, JobConf job, Re
log.setLevel(getLogLevel(job));

// Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use.
if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) {
org.apache.accumulo.core.client.mapreduce.RangeInputSplit risplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
Level level = risplit.getLogLevel();
if (split instanceof org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit) {
org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit accSplit = (org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit) split;
Level level = accSplit.getLogLevel();
if (null != level) {
log.setLevel(level);
}
} else {
throw new IllegalArgumentException("No RecordReader for " + split.getClass().toString());
}

RecordReaderBase<Key,Value> recordReader = new RecordReaderBase<Key,Value>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public static void setAutoAdjustRanges(JobConf job, boolean enableFeature) {

/**
* Determines whether a configuration has auto-adjust ranges enabled.
* Must be enabled when {@link #setBatchScan(JobConf, boolean)} is true.
*
* @param job
* the Hadoop context for the configured job
Expand Down Expand Up @@ -296,6 +297,48 @@ protected static boolean isOfflineScan(JobConf job) {
return InputConfigurator.isOfflineScan(CLASS, job);
}

/**
* Controls the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
* Using this feature will group Ranges by their source tablet, producing an InputSplit per tablet
* rather than per Range. This batching helps to reduce overhead when querying a large number of small ranges.
* (ex: when doing quad-tree decomposition for spatial queries)
* <p>
* In order to achieve good locality of InputSplits this option always clips the input Ranges to tablet boundaries.
* This may result in one input Range contributing to several InputSplits.
* <p>
* Note: that the value of {@link #setAutoAdjustRanges(JobConf, boolean)} is ignored and is assumed to be true when BatchScan option is enabled.
* <p>
* This configuration is incompatible with:
* <ul>
* <li>{@link #setOfflineTableScan(JobConf, boolean)}</li>
* <li>{@link #setLocalIterators(JobConf, boolean)}</li>
* <li>{@link #setScanIsolation(JobConf, boolean)}</li>
* </ul>
* <p>
* By default, this feature is <b>disabled</b>.
*
* @param job
* the Hadoop job instance to be configured
* @param enableFeature
* the feature is enabled if true, disabled otherwise
* @since 1.7.0
*/
public static void setBatchScan(JobConf job, boolean enableFeature) {
InputConfigurator.setBatchScan(CLASS, job, enableFeature);
}

/**
* Determines whether a configuration has the {@link org.apache.accumulo.core.client.BatchScanner} feature enabled.
*
* @param job
* the Hadoop context for the configured job
* @since 1.7.0
* @see #setBatchScan(JobConf, boolean)
*/
public static boolean isBatchScan(JobConf job) {
return InputConfigurator.isBatchScan(CLASS, job);
}

/**
* Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration.
*
Expand All @@ -315,19 +358,8 @@ protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoun
protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> {

@Override
protected void setupIterators(JobConf job, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapred.RangeInputSplit split) {
List<IteratorSetting> iterators = null;

if (null == split) {
iterators = getIterators(job);
} else {
iterators = split.getIterators();
if (null == iterators) {
iterators = getIterators(job);
}
}

setupIterators(iterators, scanner);
protected List<IteratorSetting> jobIterators(JobConf job, String tableName) {
return getIterators(job);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing where this is implemented. Am I just blind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n/m, pulled it into Eclipse and realized it's just me :)

}

/**
Expand All @@ -337,7 +369,9 @@ protected void setupIterators(JobConf job, Scanner scanner, String tableName, or
* the iterators to set
* @param scanner
* the scanner to configure
* @deprecated since 1.7.0; Use {@link #jobIterators} instead.
*/
@Deprecated
protected void setupIterators(List<IteratorSetting> iterators, Scanner scanner) {
for (IteratorSetting iterator : iterators) {
scanner.addScanIterator(iterator);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.mapred.impl;

import java.io.IOException;
import java.util.Collection;

import org.apache.accumulo.core.data.Range;
import org.apache.hadoop.mapred.InputSplit;

/**
* The Class BatchInputSplit. Encapsulates Accumulo ranges for use in Map Reduce jobs.
* Can contain several Ranges per InputSplit.
*/
public class BatchInputSplit extends org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit implements InputSplit {

public BatchInputSplit() {
super();
}

public BatchInputSplit(BatchInputSplit split) throws IOException {
super(split);
}

public BatchInputSplit(String table, String tableId, Collection<Range> ranges, String[] location) {
super(table, tableId, ranges, location);
}
}
Loading