Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-5478 IndexTool mapper task should not timeout #603

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@

@RunWith(Parameterized.class)
public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
private static final Logger LOGGER = LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
private final boolean localIndex;
private final boolean mutable;
private final boolean transactional;
Expand Down Expand Up @@ -120,6 +119,7 @@ public static void setup() throws Exception {
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
public static final String LOCAL_INDEX = "_LocalIndex";
public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
* Needed for backward compatibility purposes. TODO: get rid of this in next major release.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
Expand Down Expand Up @@ -130,7 +131,6 @@
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
import org.apache.phoenix.schema.stats.StatisticsScanner;
import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
Expand Down Expand Up @@ -1087,116 +1087,136 @@ private static PTable deserializeTable(byte[] b) {
throw new RuntimeException(e);
}
}

private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
Configuration config) throws IOException {
byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
boolean useProto = true;
// for backward compatibility fall back to look up by the old attribute
if (indexMetaData == null) {
useProto = false;
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
private class IndexRebuildRegionScanner extends BaseRegionScanner {
private long pageSizeInRows = Long.MAX_VALUE;
private boolean hasMore;
private final int maxBatchSize;
private MutationList mutations;
private final long maxBatchSizeBytes;
private final long blockingMemstoreSize;
private final byte[] clientVersionBytes;
private List<Cell> results = new ArrayList<Cell>();
private byte[] indexMetaData;
private boolean useProto = true;
private Scan scan;
private RegionScanner innerScanner;
final Region region;

IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final Configuration config) {
super(innerScanner);
if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
}

maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
mutations = new MutationList(maxBatchSize);
maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
blockingMemstoreSize = getBlockingMemstoreSize(region, config);
clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
if (indexMetaData == null) {
useProto = false;
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
this.scan = scan;
this.innerScanner = innerScanner;
this.region = region;
}
byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
boolean hasMore;
int rowCount = 0;
try {
int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
final long blockingMemstoreSize = getBlockingMemstoreSize(region, config);
MutationList mutations = new MutationList(maxBatchSize);
region.startRegionOperation();
byte[] uuidValue = ServerCacheClient.generateId();
synchronized (innerScanner) {
do {
List<Cell> results = new ArrayList<Cell>();
hasMore = innerScanner.nextRaw(results);
if (!results.isEmpty()) {
Put put = null;
Delete del = null;
for (Cell cell : results) {

if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
mutations.add(put);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
put.setDurability(Durability.SKIP_WAL);
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
mutations.add(del);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
del.setDurability(Durability.SKIP_WAL);
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}

@Override
public boolean isFilterDone() { return hasMore; }

@Override
public void close() throws IOException { innerScanner.close(); }

private void setMutationAttributes(Mutation m, byte[] uuidValue) {
m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
m.setDurability(Durability.SKIP_WAL);
}

@Override
public boolean next(List<Cell> results) throws IOException {
int rowCount = 0;
try {
byte[] uuidValue = ServerCacheClient.generateId();
synchronized (innerScanner) {
do {
List<Cell> row = new ArrayList<Cell>();
hasMore = innerScanner.nextRaw(row);
if (!row.isEmpty()) {
Put put = null;
Delete del = null;
for (Cell cell : row) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
setMutationAttributes(put, uuidValue);
mutations.add(put);
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
setMutationAttributes(del, uuidValue);
mutations.add(del);
}
del.addDeleteMarker(cell);
}
del.addDeleteMarker(cell);
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
checkForRegionClosing();
commitBatchWithRetries(region, mutations, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
mutations.clear();
}
rowCount++;
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
checkForRegionClosing();
commitBatchWithRetries(region, mutations, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
mutations.clear();
}
rowCount++;

} while (hasMore && rowCount < pageSizeInRows);
if (!mutations.isEmpty()) {
checkForRegionClosing();
commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}

} while (hasMore);
if (!mutations.isEmpty()) {
checkForRegionClosing();
commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}
} catch (IOException e) {
hasMore = false;
LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
throw e;
} finally {
if (!hasMore) {
region.closeRegionOperation();
}
}
} catch (IOException e) {
LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
throw e;
} finally {
region.closeRegionOperation();
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
Copy link
Contributor

@binshi-bing binshi-bing Oct 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does rowCount here mean "the total count of rows successfully committed by commitBatchWithRetries"? If that's the case, we might have inconsistency issue -- suppose above loop called commitBatchWithRetries multiple times but the last one failed (thrown IOException) then rowCount here counted those uncommitted rows (MaxBatchSize - 1) -- is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The row count is used to show the progress for the MR job. It is not really for correctness. However, if there is an exception, then the mapper task will get an exception and no row count will be returned to the mapper. The mapper task will start from the beginning to rebuild those rows again. When that happens, I expect the row count will be wrong because this mapper task will rebuild and double count for the rows that are rebuilt again. I will think about improving this, maybe not in this JIRA.

final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
results.add(aggKeyValue);
return hasMore;
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);

RegionScanner scanner = new BaseRegionScanner(innerScanner) {
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}

@Override
public boolean isFilterDone() {
return true;
}

@Override
public void close() throws IOException {
innerScanner.close();
}

@Override
public boolean next(List<Cell> results) throws IOException {
results.add(aggKeyValue);
return false;
}
@Override
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
}

private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final Configuration config) throws IOException {

@Override
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
};
region.startRegionOperation();
RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config);
return scanner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,6 @@ public byte[] getRowKey() {
return result.getRow();
}
};
for (Cell cell : result.rawCells()) {
String cellString = cell.toString();
LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
}
byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
indexRowKey, 0, indexRowKey.length) != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.base.Preconditions;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;

/**
* {@link InputFormat} implementation from Phoenix for building index
Expand Down Expand Up @@ -89,6 +90,7 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration
Scan scan = queryPlan.getContext().getScan();
try {
scan.setTimeRange(0, scn);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
} catch (IOException e) {
throw new SQLException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,17 +501,6 @@ private Job configureJobForAysncIndex()
private Job configureJobForServerBuildIndex()
throws Exception {

long indexRebuildQueryTimeoutMs =
configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
// Set various phoenix and hbase level timeouts and rpc retries
configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
Long.toString(indexRebuildQueryTimeoutMs));
configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
Long.toString(indexRebuildQueryTimeoutMs));
configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY,
Long.toString(indexRebuildQueryTimeoutMs));

PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);

Expand All @@ -527,7 +516,6 @@ private Job configureJobForServerBuildIndex()
fs = outputPath.getFileSystem(configuration);
fs.delete(outputPath, true);
}
configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
final Job job = Job.getInstance(configuration, jobName);
job.setJarByClass(IndexTool.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled";
// Enable support for long view index(default is false)
public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";


// Before 4.15 when we created a view we included the parent table column metadata in the view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins
// 30 min rpc timeout * 5 tries, with 2100ms total pause time between retries
public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = 60000 * 60 * 24; // 24 hrs
public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this number come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the change that I introduced in a previous JIRA for increasing timeouts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also matches the the comment on line 210 right above.

public static final long DEFAULT_INDEX_REBUILD_RPC_TIMEOUT = 30000 * 60; // 30 mins
public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level
Expand Down Expand Up @@ -357,6 +357,7 @@ public class QueryServicesOptions {

public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 1024*1024;

public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;

Expand Down