Skip to content

Commit

Permalink
PHOENIX-4666 Persistent subquery cache for hash joins
Browse files Browse the repository at this point in the history
Signed-off-by: Josh Elser <elserj@apache.org>
  • Loading branch information
ortutay authored and joshelser committed Sep 10, 2018
1 parent 180b8ca commit 87cc9b4
Show file tree
Hide file tree
Showing 21 changed files with 773 additions and 117 deletions.
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.join;

import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.Properties;

import org.apache.phoenix.end2end.join.HashJoinCacheIT.InvalidateHashCache;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;

public class HashJoinPersistentCacheIT extends BaseJoinIT {

@Override
protected String getTableName(Connection conn, String virtualName) throws Exception {
String realName = super.getTableName(conn, virtualName);
TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName),
InvalidateHashCache.class);
return realName;
}

@Test
public void testPersistentCache() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);

createTestTable(getUrl(),
"CREATE TABLE IF NOT EXISTS states (state CHAR(2) " +
"NOT NULL, name VARCHAR NOT NULL CONSTRAINT my_pk PRIMARY KEY (state, name))");
createTestTable(getUrl(),
"CREATE TABLE IF NOT EXISTS cities (state CHAR(2) " +
"NOT NULL, city VARCHAR NOT NULL, population BIGINT " +
"CONSTRAINT my_pk PRIMARY KEY (state, city))");

conn.prepareStatement(
"UPSERT INTO states VALUES ('CA', 'California')").executeUpdate();
conn.prepareStatement(
"UPSERT INTO states VALUES ('AZ', 'Arizona')").executeUpdate();
conn.prepareStatement(
"UPSERT INTO cities VALUES ('CA', 'San Francisco', 50000)").executeUpdate();
conn.prepareStatement(
"UPSERT INTO cities VALUES ('CA', 'Sacramento', 3000)").executeUpdate();
conn.prepareStatement(
"UPSERT INTO cities VALUES ('AZ', 'Phoenix', 20000)").executeUpdate();
conn.commit();

/* First, run query without using the persistent cache. This should return
* different results after an UPSERT takes place.
*/
ResultSet rs = conn.prepareStatement(
"SELECT SUM(population) FROM states s "
+"JOIN cities c ON c.state = s.state").executeQuery();
rs.next();
int population1 = rs.getInt(1);

conn.prepareStatement("UPSERT INTO cities VALUES ('CA', 'Mt View', 1500)").executeUpdate();
conn.commit();
rs = conn.prepareStatement(
"SELECT SUM(population) FROM states s " +
"JOIN cities c ON c.state = s.state").executeQuery();
rs.next();
int population2 = rs.getInt(1);

assertEquals(73000, population1);
assertEquals(74500, population2);

/* Second, run query using the persistent cache. This should return the
* same results after an UPSERT takes place.
*/
rs = conn.prepareStatement(
"SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) FROM states s " +
"JOIN cities c ON c.state = s.state").executeQuery();
rs.next();
int population3 = rs.getInt(1);

conn.prepareStatement(
"UPSERT INTO cities VALUES ('CA', 'Palo Alto', 2000)").executeUpdate();
conn.commit();

rs = conn.prepareStatement(
"SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " +
"FROM states s JOIN cities c ON c.state = s.state").executeQuery();
rs.next();
int population4 = rs.getInt(1);
rs = conn.prepareStatement(
"SELECT SUM(population) FROM states s JOIN cities c ON c.state = s.state")
.executeQuery();
rs.next();
int population5 = rs.getInt(1);

assertEquals(74500, population3);
assertEquals(74500, population4);
assertEquals(76500, population5);

/* Let's make sure caches can be used across queries. We'll set up a
* cache, and make sure it is used on two different queries with the
* same subquery.
*/

String sumQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " +
"FROM cities c JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " +
"ON sq.state = c.state";
String distinctQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ " +
"COUNT(DISTINCT(c.city)) FROM cities c " +
"JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " +
"ON sq.state = c.state";
String sumQueryUncached = sumQueryCached.replace(
"/*+ USE_PERSISTENT_CACHE */", "");
String distinctQueryUncached = distinctQueryCached.replace(
"/*+ USE_PERSISTENT_CACHE */", "");

rs = conn.prepareStatement(sumQueryCached).executeQuery();
rs.next();
int population6 = rs.getInt(1);
rs = conn.prepareStatement(distinctQueryCached).executeQuery();
rs.next();
int distinct1 = rs.getInt(1);
assertEquals(4, distinct1);

// Add a new city that matches the queries. This should not affect results
// using persistent caching.
conn.prepareStatement("UPSERT INTO states VALUES ('CO', 'Colorado')").executeUpdate();
conn.prepareStatement("UPSERT INTO cities VALUES ('CO', 'Denver', 6000)").executeUpdate();
conn.commit();

rs = conn.prepareStatement(sumQueryCached).executeQuery();
rs.next();
int population7 = rs.getInt(1);
assertEquals(population6, population7);
rs = conn.prepareStatement(distinctQueryCached).executeQuery();
rs.next();
int distinct2 = rs.getInt(1);
assertEquals(distinct1, distinct2);

// Finally, make sure uncached queries give up to date results
rs = conn.prepareStatement(sumQueryUncached).executeQuery();
rs.next();
int population8 = rs.getInt(1);
assertEquals(population8, 62500);
rs = conn.prepareStatement(distinctQueryUncached).executeQuery();
rs.next();
int distinct3 = rs.getInt(1);
assertEquals(5, distinct3);
}
}
Expand Up @@ -148,7 +148,12 @@ private static long getMaxMemorySize(Configuration config) {

private GlobalCache(Configuration config) {
super(new GlobalMemoryManager(getMaxMemorySize(config)),
config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
config.getInt(
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS),
config.getInt(
QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS));
this.config = config;
}

Expand All @@ -164,9 +169,18 @@ public Configuration getConfig() {
public TenantCache getChildTenantCache(ImmutableBytesPtr tenantId) {
TenantCache tenantCache = perTenantCacheMap.get(tenantId);
if (tenantCache == null) {
int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
int maxTenantMemoryPerc = config.getInt(
MAX_TENANT_MEMORY_PERC_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
int maxServerCacheTimeToLive = config.getInt(
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
int maxServerCachePersistenceTimeToLive = config.getInt(
QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS);
TenantCacheImpl newTenantCache = new TenantCacheImpl(
new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc),
maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive);
tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
if (tenantCache == null) {
tenantCache = newTenantCache;
Expand Down
Expand Up @@ -41,6 +41,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -50,6 +51,7 @@
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
Expand All @@ -59,6 +61,7 @@
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheFactory;
Expand All @@ -75,6 +78,8 @@
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;

import com.google.protobuf.ByteString;

/**
*
* Client for sending cache to each region server
Expand Down Expand Up @@ -215,22 +220,46 @@ public void close() throws SQLException {
}
}
}

}

public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException {
return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);

public ServerCache createServerCache(byte[] cacheId, QueryPlan delegate)
throws SQLException, IOException {
PTable cacheUsingTable = delegate.getTableRef().getTable();
ConnectionQueryServices services = delegate.getContext().getConnection().getQueryServices();
List<HRegionLocation> locations = services.getAllTableRegions(
cacheUsingTable.getPhysicalName().getBytes());
int nRegions = locations.size();
Set<HRegionLocation> servers = new HashSet<>(nRegions);
cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
return new ServerCache(cacheId, servers, new ImmutableBytesWritable(
new byte[]{}), services, false);
}

public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean storeCacheOnClient)

public ServerCache addServerCache(
ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
final ServerCacheFactory cacheFactory, final PTable cacheUsingTable)
throws SQLException {
return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
}

public ServerCache addServerCache(
ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
final ServerCacheFactory cacheFactory, final PTable cacheUsingTable,
boolean storeCacheOnClient) throws SQLException {
final byte[] cacheId = ServerCacheClient.generateId();
return addServerCache(keyRanges, cacheId, cachePtr, txState, cacheFactory,
cacheUsingTable, false, storeCacheOnClient);
}

public ServerCache addServerCache(
ScanRanges keyRanges, final byte[] cacheId, final ImmutableBytesWritable cachePtr,
final byte[] txState, final ServerCacheFactory cacheFactory,
final PTable cacheUsingTable, final boolean usePersistentCache,
boolean storeCacheOnClient) throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
List<Closeable> closeables = new ArrayList<Closeable>();
ServerCache hashCacheSpec = null;
SQLException firstException = null;
final byte[] cacheId = generateId();
/**
* Execute EndPoint in parallel on each server to send compressed hash cache
*/
Expand All @@ -251,7 +280,7 @@ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWrit
byte[] regionEndKey = entry.getRegion().getEndKey();
if ( ! servers.contains(entry) &&
keyRanges.intersectRegion(regionStartKey, regionEndKey,
cacheUsingTable.getIndexType() == IndexType.LOCAL)) {
cacheUsingTable.getIndexType() == IndexType.LOCAL)) {
// Call RPC once per server
servers.add(entry);
if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));}
Expand All @@ -262,7 +291,7 @@ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWrit

@Override
public Boolean call() throws Exception {
return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState);
return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState, usePersistentCache);
}

/**
Expand Down Expand Up @@ -291,7 +320,7 @@ public TaskExecutionMetricsHolder getTaskExecutionMetric() {
for (Future<Boolean> future : futures) {
future.get(timeoutMs, TimeUnit.MILLISECONDS);
}

cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
success = true;
} catch (SQLException e) {
Expand Down Expand Up @@ -444,7 +473,7 @@ public boolean addServerCache(byte[] startkeyOfRegion, ServerCache cache, HashCa
}
if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
txState);
txState, false);
}
return success;
} finally {
Expand All @@ -453,7 +482,7 @@ public boolean addServerCache(byte[] startkeyOfRegion, ServerCache cache, HashCa
}

public boolean addServerCache(Table htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId,
final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState)
final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState, final boolean usePersistentCache)
throws Exception {
byte[] keyInRegion = getKeyInRegion(key);
final Map<byte[], AddServerCacheResponse> results;
Expand Down Expand Up @@ -483,6 +512,7 @@ public AddServerCacheResponse call(ServerCachingService instance) throws IOExcep
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
}
builder.setCacheId(ByteStringer.wrap(cacheId));
builder.setUsePersistentCache(usePersistentCache);
builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
builder.setHasProtoBufIndexMaintainer(true);
ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory
Expand All @@ -501,7 +531,6 @@ public AddServerCacheResponse call(ServerCachingService instance) throws IOExcep
}
if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); }
return false;

}

}
Expand Up @@ -36,7 +36,7 @@
public interface TenantCache {
MemoryManager getMemoryManager();
Closeable getServerCache(ImmutableBytesPtr cacheId);
Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException;
Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException;
void removeServerCache(ImmutableBytesPtr cacheId);
void removeAllServerCache();
}

0 comments on commit 87cc9b4

Please sign in to comment.