Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion phoenix-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

Expand Down
1 change: 0 additions & 1 deletion phoenix-core-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class PhoenixInputSplit extends InputSplit implements Writable {

private List<Scan> scans;
private KeyRange keyRange;
private List<KeyRange> keyRanges;
private String regionLocation = null;
private long splitSize = 0;

Expand Down Expand Up @@ -68,13 +68,43 @@ public List<Scan> getScans() {
return scans;
}

/**
* Returns the overall KeyRange spanning this split. For coalesced splits, spans from the first
* region's lower bound to the last region's upper bound. Computed on-demand from keyRanges.
* @return KeyRange spanning the entire split, or null if keyRanges is empty
*/
public KeyRange getKeyRange() {
return keyRange;
if (keyRanges == null || keyRanges.isEmpty()) {
return null;
}
return KeyRange.getKeyRange(keyRanges.get(0).getLowerRange(),
keyRanges.get(keyRanges.size() - 1).getUpperRange());
}

/**
* Returns all KeyRanges for this split. For coalesced splits, returns multiple KeyRanges (one per
* region). For non-coalesced splits, returns a single-element list.
* @return List of KeyRanges, never null
*/
public List<KeyRange> getKeyRanges() {
return keyRanges;
}

/**
* Checks if this split is coalesced (contains multiple regions).
* @return true if split contains multiple regions
*/
public boolean isCoalesced() {
return keyRanges.size() > 1;
}

private void init() {
this.keyRange =
KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size() - 1).getStopRow());
// Initialize keyRanges from scans
this.keyRanges = Lists.newArrayListWithExpectedSize(scans.size());
for (Scan scan : scans) {
KeyRange kr = KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow());
this.keyRanges.add(kr);
}
}

@Override
Expand Down Expand Up @@ -126,7 +156,8 @@ public String[] getLocations() throws IOException, InterruptedException {
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + keyRange.hashCode();
KeyRange range = getKeyRange();
result = prime * result + (range == null ? 0 : range.hashCode());
return result;
}

Expand All @@ -142,11 +173,13 @@ public boolean equals(Object obj) {
return false;
}
PhoenixInputSplit other = (PhoenixInputSplit) obj;
if (keyRange == null) {
if (other.keyRange != null) {
KeyRange thisRange = getKeyRange();
KeyRange otherRange = other.getKeyRange();
if (thisRange == null) {
if (otherRange != null) {
return false;
}
} else if (!keyRange.equals(other.keyRange)) {
} else if (!thisRange.equals(otherRange)) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,6 +53,13 @@ public class PhoenixSyncTableInputFormat extends PhoenixInputFormat<DBWritable>

private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);

// Sentinel server name used when a region location lookup returns null or has a null server.
// This can happen transiently during a region-in-transition (RIT) event (e.g. a split).
// Splits that cannot be placed on a specific server are coalesced together under this key
// rather than failing the job, since split coalescing is an optimisation, not a correctness
// requirement.
static final String UNKNOWN_SERVER = "UNKNOWN_SERVER";

public PhoenixSyncTableInputFormat() {
super();
}
Expand Down Expand Up @@ -94,15 +107,31 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (completedRegions.isEmpty()) {
LOGGER.info("No completed regions for table {} - processing all {} splits", tableName,
allSplits.size());
return allSplits;
}

List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits, completedRegions);
LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining",
completedRegions.size(), tableName, unprocessedSplits.size());

boolean enableSplitCoalescing =
conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
LOGGER.info("Split coalescing enabled: {}, for table {}", enableSplitCoalescing, tableName);

if (enableSplitCoalescing && unprocessedSplits.size() > 1) {
try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
byte[] physicalTableName = pConn.getTable(tableName).getPhysicalName().getBytes();
List<InputSplit> coalescedSplits =
coalesceSplits(unprocessedSplits, pConn.getQueryServices(), physicalTableName);
LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced splits for table {}",
unprocessedSplits.size(), coalescedSplits.size(), tableName);
return coalescedSplits;
} catch (Exception e) {
throw new IOException(String.format("Failed to coalesce splits for table %s. "
+ "Split coalescing is enabled but failed due to: %s.", tableName, e.getMessage()), e);
}
}

return unprocessedSplits;
}

Expand Down Expand Up @@ -133,6 +162,9 @@ private List<KeyRange> queryCompletedMapperRegions(Configuration conf, String ta
*/
List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
List<KeyRange> completedRegions) {
if (completedRegions.isEmpty()) {
return allSplits;
}
allSplits.sort((s1, s2) -> {
PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
Expand Down Expand Up @@ -211,4 +243,103 @@ List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
}
return unprocessedSplits;
}

/**
* Coalesces multiple region splits from the same RegionServer into single InputSplits. All
* regions from the same server are coalesced into one split, regardless of count or size. This
* reduces mapper count and avoids hot spotting when many concurrent mappers hit the same server.
* @param unprocessedSplits Splits remaining after filtering completed regions
* @param queryServices ConnectionQueryServices for querying region locations
* @param physicalTableName Physical HBase table name
* @return Coalesced splits with all regions per server combined into one split
*/
List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits,
ConnectionQueryServices queryServices, byte[] physicalTableName)
throws IOException, InterruptedException, SQLException {
// Group splits by RegionServer location
Map<String, List<PhoenixInputSplit>> splitsByServer =
groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName);

List<InputSplit> coalescedSplits = new ArrayList<>();

// For each RegionServer, create one coalesced split with ALL regions from that server
for (Map.Entry<String, List<PhoenixInputSplit>> entry : splitsByServer.entrySet()) {
String serverName = entry.getKey();
List<PhoenixInputSplit> serverSplits = entry.getValue();

// Sort splits by start key for sequential processing
serverSplits.sort((s1, s2) -> Bytes.compareTo(s1.getKeyRange().getLowerRange(),
s2.getKeyRange().getLowerRange()));
// Create single coalesced split with ALL regions from this server
coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
}

return coalescedSplits;
}

/**
* Groups splits by RegionServer location for locality-aware coalescing. Uses
* ConnectionQueryServices to determine which server hosts each region.
* <p>
* If the region location is unavailable (null location or null server name), which can happen
* transiently during a region-in-transition (RIT) event such as a split, the split is assigned to
* {@link #UNKNOWN_SERVER} rather than failing the job. Since split coalescing is an optimisation,
* a transient lookup failure should degrade gracefully, not abort the MR job.
* @param splits List of splits to group
* @param queryServices ConnectionQueryServices for querying region locations
* @param physicalTableName Physical HBase table name
* @return Map of server name to list of splits hosted on that server
*/
private Map<String, List<PhoenixInputSplit>> groupSplitsByServer(List<InputSplit> splits,
ConnectionQueryServices queryServices, byte[] physicalTableName)
throws IOException, SQLException {
Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
for (InputSplit split : splits) {
PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
KeyRange keyRange = pSplit.getKeyRange();
HRegionLocation regionLocation =
queryServices.getTableRegionLocation(physicalTableName, keyRange.getLowerRange());
String serverName;
if (regionLocation == null || regionLocation.getServerName() == null) {
LOGGER.warn(
"Could not determine region server for key: {}. "
+ "Region may be in transition. Assigning split to {} bucket.",
Bytes.toStringBinary(keyRange.getLowerRange()), UNKNOWN_SERVER);
serverName = UNKNOWN_SERVER;
} else {
serverName = regionLocation.getServerName().getAddress().toString();
}
splitsByServer.computeIfAbsent(serverName, k -> new ArrayList<>()).add(pSplit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Split {} assigned to server {}",
Bytes.toStringBinary(keyRange.getLowerRange()), serverName);
}
}

return splitsByServer;
}

/**
* Creates a coalesced PhoenixInputSplit containing multiple regions. Combines scans and KeyRanges
* from individual splits into a single split.
* @param splits List of splits to coalesce (from same RegionServer)
* @param serverLocation RegionServer location for data locality
* @return Coalesced PhoenixInputSplit
*/
private PhoenixInputSplit createCoalescedSplit(List<PhoenixInputSplit> splits,
String serverLocation) throws IOException, InterruptedException {

List<Scan> allScans = new ArrayList<>();
long totalSize = 0;
// Extract all scans from individual splits
for (PhoenixInputSplit split : splits) {
allScans.addAll(split.getScans());
totalSize += split.getLength();
}

LOGGER.info("Created coalesced split with {} regions, {} MB from server {}", splits.size(),
totalSize / (1024 * 1024), serverLocation);
// Create a new PhoenixInputSplit, keyRanges will be derived from scans in init()
return new PhoenixInputSplit(allScans, totalSize, serverLocation);
}
}
Loading