Skip to content

Commit

Permalink
PHOENIX-4010 Hash Join cache may not be send to all regionservers whe…
Browse files Browse the repository at this point in the history
…n we have stale HBase meta cache
  • Loading branch information
ankitsinghal committed Sep 9, 2017
1 parent 44c0034 commit 7865a59
Show file tree
Hide file tree
Showing 28 changed files with 1,033 additions and 285 deletions.
Expand Up @@ -17,6 +17,7 @@
*/ */
package org.apache.phoenix.end2end; package org.apache.phoenix.end2end;


import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;


Expand All @@ -28,16 +29,23 @@
import java.sql.Timestamp; import java.sql.Timestamp;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern; import java.util.regex.Pattern;


import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.StringUtil;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;


import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


public abstract class BaseJoinIT extends ParallelStatsDisabledIT { public abstract class BaseJoinIT extends ParallelStatsDisabledIT {

protected static final String JOIN_SCHEMA = "Join"; protected static final String JOIN_SCHEMA = "Join";
protected static final String JOIN_ORDER_TABLE = "OrderTable"; protected static final String JOIN_ORDER_TABLE = "OrderTable";
protected static final String JOIN_CUSTOMER_TABLE = "CustomerTable"; protected static final String JOIN_CUSTOMER_TABLE = "CustomerTable";
Expand Down Expand Up @@ -442,6 +450,12 @@ private static void initValues(Connection conn, String virtualName, String realN
conn.commit(); conn.commit();
} }


protected Connection getConnection() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
return DriverManager.getConnection(getUrl(), props);
}

protected void createIndexes(Connection conn, String virtualName, String realName) throws Exception { protected void createIndexes(Connection conn, String virtualName, String realName) throws Exception {
if (indexDDL != null && indexDDL.length > 0) { if (indexDDL != null && indexDDL.length > 0) {
for (String ddl : indexDDL) { for (String ddl : indexDDL) {
Expand Down

Large diffs are not rendered by default.

122 changes: 57 additions & 65 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java

Large diffs are not rendered by default.

Expand Up @@ -690,7 +690,7 @@ public void testDisableIndexDuringRebuild() throws Throwable {
conn.commit(); conn.commit();
doneSignal.await(30, TimeUnit.SECONDS); doneSignal.await(30, TimeUnit.SECONDS);
// Install coprocessor that will simulate an index write failure during index rebuild // Install coprocessor that will simulate an index write failure during index rebuild
addWriteFailingCoprocessor(conn,fullIndexName); TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class);
clock.time += WAIT_AFTER_DISABLED; clock.time += WAIT_AFTER_DISABLED;
doneSignal.await(30, TimeUnit.SECONDS); doneSignal.await(30, TimeUnit.SECONDS);
WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS); WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
Expand Down Expand Up @@ -843,27 +843,6 @@ public void run() {
t.start(); t.start();
} }


private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null);
int numTries = 10;
try (HBaseAdmin admin = services.getAdmin()) {
admin.modifyTable(Bytes.toBytes(tableName), descriptor);
while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
&& numTries > 0) {
numTries--;
if (numTries == 0) {
throw new Exception(
"Check to detect if delaying co-processor was added failed after "
+ numTries + " retries.");
}
Thread.sleep(1000);
}
}
}

private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception { private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
Expand Down
Expand Up @@ -18,14 +18,18 @@
package org.apache.phoenix.iterate; package org.apache.phoenix.iterate;


import java.sql.SQLException; import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.tuple.Tuple;
Expand All @@ -41,16 +45,16 @@ public DelayedTableResultIteratorFactory(long delay) {
@Override @Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder, return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
renewLeaseThreshold, plan, scanGrouper); renewLeaseThreshold, plan, scanGrouper, caches);
} }


private class DelayedTableResultIterator extends TableResultIterator { private class DelayedTableResultIterator extends TableResultIterator {
public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
ParallelScanGrouper scanGrouper) throws SQLException { ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper); super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches);
} }


@Override @Override
Expand Down

0 comments on commit 7865a59

Please sign in to comment.