Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7193 Fix cluster override for mapreduce jobs for non-ZK registries #1810

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,25 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;

/**
* Utility class to return a {@link Connection} .
*/
public class ConnectionUtil {

private static String TEST_PARAM =
PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;

/**
* Retrieve the configured input Connection.
* @param conf configuration containing connection information
* @return the configured input connection
*/
public static Connection getInputConnection(final Configuration conf) throws SQLException {
Preconditions.checkNotNull(conf);
return getInputConnection(conf, new Properties());
}

Expand All @@ -55,22 +50,17 @@ public static Connection getInputConnection(final Configuration conf) throws SQL
*/
public static Connection getInputConnection(final Configuration conf, final Properties props)
throws SQLException {
Preconditions.checkNotNull(conf);
String zkQuorumOverride = PhoenixConfigurationUtilHelper.getInputClusterZkQuorum(conf);
if (zkQuorumOverride != null) {
return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
PropertiesUtil.combineProperties(props, conf));
} else {
// FIXME find some better way to get tests working
String zkQuorumForTest = PhoenixConfigurationUtilHelper.getZKQuorum(conf);
if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
|| zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumForTest,
PropertiesUtil.combineProperties(props, conf));
}
return DriverManager.getConnection("jdbc:phoenix",
PropertiesUtil.combineProperties(props, conf));
String inputQuorum = PhoenixConfigurationUtilHelper.getInputCluster(conf);
if (inputQuorum != null) {
// This will not override the quorum set with setInputClusterUrl
Properties copyProps = PropertiesUtil.deepCopy(props);
copyProps.setProperty(HConstants.CLIENT_ZOOKEEPER_QUORUM, inputQuorum);
return DriverManager.getConnection(
PhoenixConfigurationUtilHelper.getInputClusterUrl(conf),
PropertiesUtil.combineProperties(copyProps, conf));
}
return DriverManager.getConnection(PhoenixConfigurationUtilHelper.getInputClusterUrl(conf),
PropertiesUtil.combineProperties(props, conf));
}

/**
Expand All @@ -82,16 +72,6 @@ public static Connection getOutputConnection(final Configuration conf) throws SQ
return getOutputConnection(conf, new Properties());
}

/**
* Create the configured output Connection.
* @param conf configuration containing the connection information
* @return the configured output connection
*/
public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf,
Set<String> ignoreTheseProps) throws SQLException {
return getOutputConnection(conf, new Properties(), ignoreTheseProps);
}

/**
* Create the configured output Connection.
* @param conf configuration containing the connection information
Expand All @@ -100,42 +80,17 @@ public static Connection getOutputConnectionWithoutTheseProps(final Configuratio
*/
public static Connection getOutputConnection(final Configuration conf, Properties props)
throws SQLException {
return getOutputConnection(conf, props, Collections.<String>emptySet());
}

public static Connection getOutputConnection(final Configuration conf, Properties props,
Set<String> withoutTheseProps) throws SQLException {
Preconditions.checkNotNull(conf);
String zkQuorumOverride = PhoenixConfigurationUtilHelper.getOutputClusterZkQuorum(conf);
if (zkQuorumOverride != null) {
return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
PropertiesUtil.combineProperties(props, conf));
} else {
// FIXME find some better way to get tests working
String zkQuorumForTest = PhoenixConfigurationUtilHelper.getZKQuorum(conf);
if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
|| zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
return DriverManager.getConnection("jdbc:phoenix:" + zkQuorumForTest,
PropertiesUtil.combineProperties(props, conf));
}
return DriverManager.getConnection("jdbc:phoenix",
PropertiesUtil.combineProperties(props, conf));
String outputQuorum = PhoenixConfigurationUtilHelper.getOutputCluster(conf);
if (outputQuorum != null) {
// This will not override the quorum set with setInputClusterUrl
Properties copyProps = PropertiesUtil.deepCopy(props);
copyProps.setProperty(HConstants.CLIENT_ZOOKEEPER_QUORUM, outputQuorum);
return DriverManager.getConnection(
PhoenixConfigurationUtilHelper.getInputClusterUrl(conf),
PropertiesUtil.combineProperties(copyProps, conf));
}
return DriverManager.getConnection(PhoenixConfigurationUtilHelper.getOutputClusterUrl(conf),
PropertiesUtil.combineProperties(props, conf));
}

/**
* Returns the {@link Connection} from a ZooKeeper cluster string.
* @param quorum a ZooKeeper quorum connection string
* @param clientPort a ZooKeeper client port
* @param znodeParent a zookeeper znode parent
* @return a Phoenix connection to the given connection string
*/
@Deprecated
private static Connection getConnection(final String quorum, final Integer clientPort,
final String znodeParent, Properties props) throws SQLException {
Preconditions.checkNotNull(quorum);
return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent),
props);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.PhoenixRuntime;

public final class PhoenixConfigurationUtilHelper {
// This relies on Hadoop Configuration to handle warning about deprecated configs and
Expand All @@ -29,8 +30,12 @@ public final class PhoenixConfigurationUtilHelper {
Configuration.addDeprecation("phoneix.mapreduce.output.cluster.quorum", PhoenixConfigurationUtilHelper.MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
}

@Deprecated
public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM = "phoenix.mapreduce.input.cluster.quorum";
@Deprecated
public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoenix.mapreduce.output.cluster.quorum";
public static final String MAPREDUCE_INPUT_CLUSTER_URL = "phoenix.mapreduce.input.cluster.url";
public static final String MAPREDUCE_OUTPUT_CLUSTER_URL = "phoenix.mapreduce.output.cluster.url";
public static final String TRANSFORM_MONITOR_ENABLED = "phoenix.transform.monitor.enabled";
public static final boolean DEFAULT_TRANSFORM_MONITOR_ENABLED = true;
/**
Expand Down Expand Up @@ -68,12 +73,31 @@ public static long[] getLongs(Configuration conf, String name) {
public static String getInputCluster(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
if (quorum == null) {
quorum = configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
}
if (quorum == null) {
quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
}
return quorum;
}

/**
* Returns the Phoenix JDBC URL a Phoenix MapReduce job will read
* from. If MAPREDUCE_INPUT_CLUSTER_URL is not set, then it returns the value of
* "jdbc:phoenix"
* @param configuration
* @return URL string
*/
public static String getInputClusterUrl(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String url = configuration.get(MAPREDUCE_INPUT_CLUSTER_URL);
if (url == null) {
url = PhoenixRuntime.JDBC_PROTOCOL;
}
return url;
}

/**
* Returns the HBase Client Port
* @param configuration
Expand Down Expand Up @@ -108,6 +132,9 @@ public static String getZNodeParent(final Configuration configuration) {
public static String getOutputCluster(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
if (quorum == null) {
quorum = configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
}
if (quorum == null) {
quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
}
Expand All @@ -120,27 +147,48 @@ public static String getOutputCluster(final Configuration configuration) {
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
@Deprecated
public static String getInputClusterZkQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
}


/**
* Returns the Phoenix JDBC URL a Phoenix MapReduce job will write to.
* If MAPREDUCE_OUTPUT_CLUSTER_URL is not set, then it returns the value of
* "jdbc:phoenix"
* @param configuration
* @return URL string
*/
public static String getOutputClusterUrl(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_URL);
if (quorum == null) {
quorum = PhoenixRuntime.JDBC_PROTOCOL;
}
return quorum;
}

/**
* Returns the value of HConstants.ZOOKEEPER_QUORUM.
* For tests only
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
@Deprecated
public static String getZKQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(HConstants.ZOOKEEPER_QUORUM);
return configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM,
configuration.get(HConstants.ZOOKEEPER_QUORUM));
}

/**
* Returns the ZooKeeper quorum override MAPREDUCE_OUTPUT_CLUSTER_QUORUM for mapreduce jobs
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
@Deprecated
public static String getOutputClusterZkQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@
*/
public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixOutputFormat.class);
private final Set<String> propsToIgnore;

public PhoenixOutputFormat() {
this(Collections.<String>emptySet());
}

// FIXME Never used, and the ignore feature didn't work anyway
public PhoenixOutputFormat(Set<String> propsToIgnore) {
this.propsToIgnore = propsToIgnore;
}

@Override
Expand All @@ -63,7 +62,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE
@Override
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
try {
return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore);
return new PhoenixRecordWriter<T>(context.getConfiguration());
} catch (SQLException e) {
LOGGER.error("Error calling PhoenixRecordWriter " + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public PhoenixRecordWriter(final Configuration configuration) throws SQLExceptio
public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
Connection connection = null;
try {
connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
connection = ConnectionUtil.getOutputConnection(configuration);
this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.statement = connection.prepareStatement(upsertQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ public static void setBatchSize(final Configuration configuration, final Long ba
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will read from
*/
@Deprecated
public static void setInputCluster(final Configuration configuration,
final String quorum) {
Preconditions.checkNotNull(configuration);
Expand All @@ -388,12 +389,35 @@ public static void setInputCluster(final Configuration configuration,
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to
*/
@Deprecated
public static void setOutputCluster(final Configuration configuration,
final String quorum) {
Preconditions.checkNotNull(configuration);
configuration.set(PhoenixConfigurationUtilHelper.MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum);
}


/**
* Sets which HBase cluster a Phoenix MapReduce job should read from
* @param configuration
* @param url Phoenix JDBC URL
*/
public static void setInputClusterUrl(final Configuration configuration,
final String url) {
Preconditions.checkNotNull(configuration);
configuration.set(PhoenixConfigurationUtilHelper.MAPREDUCE_INPUT_CLUSTER_URL, url);
}

/**
* Sets which HBase cluster a Phoenix MapReduce job should write to
* @param configuration
* @param url Phoenix JDBC URL string for HBase cluster the MapReduce job will write to
*/
public static void setOutputClusterUrl(final Configuration configuration,
final String url) {
Preconditions.checkNotNull(configuration);
configuration.set(PhoenixConfigurationUtilHelper.MAPREDUCE_OUTPUT_CLUSTER_URL, url);
}

public static Class<?> getInputClass(final Configuration configuration) {
return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;

import java.sql.Connection;
import java.util.Arrays;
Expand All @@ -33,6 +37,9 @@

import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexUpgradeTool;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.query.ConnectionlessTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PhoenixRuntime;
Expand All @@ -45,7 +52,7 @@


@RunWith(Parameterized.class)
public class IndexUpgradeToolTest {
public class IndexUpgradeToolTest extends BaseConnectionlessQueryTest{
private static final String INPUT_LIST = "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3";
private final boolean upgrade;
private static final String DUMMY_STRING_VALUE = "anyValue";
Expand Down Expand Up @@ -170,7 +177,11 @@ public static synchronized Collection<Boolean> data() {
}

private void setupConfForConnectionlessQuery(Configuration conf) {
conf.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
String connectionlessUrl = PhoenixRuntime.JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR
+ CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR
+ PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
PhoenixConfigurationUtil.setInputClusterUrl(conf, connectionlessUrl);
PhoenixConfigurationUtil.setOutputClusterUrl(conf, connectionlessUrl);
conf.unset(HConstants.ZOOKEEPER_CLIENT_PORT);
conf.unset(HConstants.ZOOKEEPER_ZNODE_PARENT);
}
Expand Down