Skip to content

Commit

Permalink
PHOENIX-7193 Fix cluster override for mapreduce jobs for NON-ZK regis…
Browse files Browse the repository at this point in the history
…tries
  • Loading branch information
stoty committed Feb 1, 2024
1 parent 8a8db78 commit e3ecf16
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,25 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;

/**
* Utility class to return a {@link Connection} .
*/
public class ConnectionUtil {

private static String TEST_PARAM =
PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;

/**
* Retrieve the configured input Connection.
* @param conf configuration containing connection information
* @return the configured input connection
*/
public static Connection getInputConnection(final Configuration conf) throws SQLException {
Preconditions.checkNotNull(conf);
return getInputConnection(conf, new Properties());
}

Expand All @@ -55,22 +50,13 @@ public static Connection getInputConnection(final Configuration conf) throws SQL
*/
public static Connection getInputConnection(final Configuration conf, final Properties props)
throws SQLException {
Preconditions.checkNotNull(conf);
String zkQuorumOverride = PhoenixConfigurationUtilHelper.getInputClusterZkQuorum(conf);
if (zkQuorumOverride != null) {
return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
PropertiesUtil.combineProperties(props, conf));
} else {
// FIXME find some better way to get tests working
String zkQuorumForTest = PhoenixConfigurationUtilHelper.getZKQuorum(conf);
if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
|| zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumForTest,
PropertiesUtil.combineProperties(props, conf));
}
return DriverManager.getConnection("jdbc:phoenix",
PropertiesUtil.combineProperties(props, conf));
String outputQuorum = PhoenixConfigurationUtilHelper.getOutputCluster(conf);
if (outputQuorum != null) {
// This will not override the quorum set with setOutputClusterUrl
conf.set(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, outputQuorum);
}
return DriverManager.getConnection(PhoenixConfigurationUtilHelper.getInputClusterUrl(conf),
PropertiesUtil.combineProperties(props, conf));
}

/**
Expand All @@ -82,16 +68,6 @@ public static Connection getOutputConnection(final Configuration conf) throws SQ
return getOutputConnection(conf, new Properties());
}

/**
* Create the configured output Connection.
* @param conf configuration containing the connection information
* @return the configured output connection
*/
public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf,
Set<String> ignoreTheseProps) throws SQLException {
return getOutputConnection(conf, new Properties(), ignoreTheseProps);
}

/**
* Create the configured output Connection.
* @param conf configuration containing the connection information
Expand All @@ -100,42 +76,14 @@ public static Connection getOutputConnectionWithoutTheseProps(final Configuratio
*/
public static Connection getOutputConnection(final Configuration conf, Properties props)
throws SQLException {
return getOutputConnection(conf, props, Collections.<String>emptySet());
}

public static Connection getOutputConnection(final Configuration conf, Properties props,
Set<String> withoutTheseProps) throws SQLException {
Preconditions.checkNotNull(conf);
String zkQuorumOverride = PhoenixConfigurationUtilHelper.getOutputClusterZkQuorum(conf);
if (zkQuorumOverride != null) {
return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
PropertiesUtil.combineProperties(props, conf));
} else {
// FIXME find some better way to get tests working
String zkQuorumForTest = PhoenixConfigurationUtilHelper.getZKQuorum(conf);
if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
|| zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
return DriverManager.getConnection("jdbc:phoenix:" + zkQuorumForTest,
PropertiesUtil.combineProperties(props, conf));
}
return DriverManager.getConnection("jdbc:phoenix",
PropertiesUtil.combineProperties(props, conf));
String inputQuorum = PhoenixConfigurationUtilHelper.getInputCluster(conf);
if (inputQuorum != null) {
// This will not override the quorum set with setOutputClusterUrl
conf.set(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, inputQuorum);
}
}

/**
* Returns the {@link Connection} from a ZooKeeper cluster string.
* @param quorum a ZooKeeper quorum connection string
* @param clientPort a ZooKeeper client port
* @param znodeParent a zookeeper znode parent
* @return a Phoenix connection to the given connection string
*/
@Deprecated
private static Connection getConnection(final String quorum, final Integer clientPort,
final String znodeParent, Properties props) throws SQLException {
Preconditions.checkNotNull(quorum);
return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent),
props);
return DriverManager.getConnection(PhoenixConfigurationUtilHelper.getOutputClusterUrl(conf),
PropertiesUtil.combineProperties(props, conf));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.PhoenixRuntime;

public final class PhoenixConfigurationUtilHelper {
// This relies on Hadoop Configuration to handle warning about deprecated configs and
Expand All @@ -29,8 +30,12 @@ public final class PhoenixConfigurationUtilHelper {
Configuration.addDeprecation("phoneix.mapreduce.output.cluster.quorum", PhoenixConfigurationUtilHelper.MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
}

@Deprecated
public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM = "phoenix.mapreduce.input.cluster.quorum";
@Deprecated
public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoenix.mapreduce.output.cluster.quorum";
public static final String MAPREDUCE_INPUT_CLUSTER_URL = "phoenix.mapreduce.input.cluster.url";
public static final String MAPREDUCE_OUTPUT_CLUSTER_URL = "phoenix.mapreduce.output.cluster.url";
public static final String TRANSFORM_MONITOR_ENABLED = "phoenix.transform.monitor.enabled";
public static final boolean DEFAULT_TRANSFORM_MONITOR_ENABLED = true;
/**
Expand Down Expand Up @@ -74,6 +79,22 @@ public static String getInputCluster(final Configuration configuration) {
return quorum;
}

/**
* Returns the Phoenix JDBC URL a Phoenix MapReduce job will read
* from. If MAPREDUCE_INPUT_CLUSTER_URL is not set, then it returns the value of
* "jdbc:phoenix"
* @param configuration
* @return URL string
*/
public static String getInputClusterUrl(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_URL);
if (quorum == null) {
quorum = PhoenixRuntime.JDBC_PROTOCOL;
}
return quorum;
}

/**
* Returns the HBase Client Port
* @param configuration
Expand Down Expand Up @@ -120,17 +141,36 @@ public static String getOutputCluster(final Configuration configuration) {
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
@Deprecated
public static String getInputClusterZkQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
}


/**
* Returns the Phoenix JDBC URL a Phoenix MapReduce job will write to.
* If MAPREDUCE_OUTPUT_CLUSTER_URL is not set, then it returns the value of
* "jdbc:phoenix"
* @param configuration
* @return URL string
*/
public static String getOutputClusterUrl(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_URL);
if (quorum == null) {
quorum = PhoenixRuntime.JDBC_PROTOCOL;
}
return quorum;
}

/**
* Returns the value of HConstants.ZOOKEEPER_QUORUM.
* For tests only
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
@Deprecated
public static String getZKQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(HConstants.ZOOKEEPER_QUORUM);
Expand All @@ -141,6 +181,7 @@ public static String getZKQuorum(final Configuration configuration) {
* @param configuration
* @return ZooKeeper quorum string if defined, null otherwise
*/
@Deprecated
public static String getOutputClusterZkQuorum(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@
*/
public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixOutputFormat.class);
private final Set<String> propsToIgnore;

public PhoenixOutputFormat() {
this(Collections.<String>emptySet());
}

// FIXME Never used, and the ignore feature didn't work anyway
public PhoenixOutputFormat(Set<String> propsToIgnore) {
this.propsToIgnore = propsToIgnore;
}

@Override
Expand All @@ -63,7 +62,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE
@Override
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
try {
return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore);
return new PhoenixRecordWriter<T>(context.getConfiguration());
} catch (SQLException e) {
LOGGER.error("Error calling PhoenixRecordWriter " + e.getMessage());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public PhoenixRecordWriter(final Configuration configuration) throws SQLExceptio
public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
Connection connection = null;
try {
connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
connection = ConnectionUtil.getOutputConnection(configuration);
this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.statement = connection.prepareStatement(upsertQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ public static void setBatchSize(final Configuration configuration, final Long ba
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will read from
*/
@Deprecated
public static void setInputCluster(final Configuration configuration,
final String quorum) {
Preconditions.checkNotNull(configuration);
Expand All @@ -388,12 +389,35 @@ public static void setInputCluster(final Configuration configuration,
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to
*/
@Deprecated
public static void setOutputCluster(final Configuration configuration,
final String quorum) {
Preconditions.checkNotNull(configuration);
configuration.set(PhoenixConfigurationUtilHelper.MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum);
}


/**
* Sets which HBase cluster a Phoenix MapReduce job should read from
* @param configuration
* @param url Phoenix JDBC URL
*/
public static void setInputClusterUrl(final Configuration configuration,
final String url) {
Preconditions.checkNotNull(configuration);
configuration.set(PhoenixConfigurationUtilHelper.MAPREDUCE_INPUT_CLUSTER_URL, url);
}

/**
* Sets which HBase cluster a Phoenix MapReduce job should write to
* @param configuration
* @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to
*/
public static void setOutputClusterUrl(final Configuration configuration,
final String url) {
Preconditions.checkNotNull(configuration);
configuration.set(PhoenixConfigurationUtilHelper.MAPREDUCE_OUTPUT_CLUSTER_URL, url);
}

public static Class<?> getInputClass(final Configuration configuration) {
return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;

import java.sql.Connection;
import java.util.Arrays;
Expand All @@ -33,6 +37,9 @@

import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexUpgradeTool;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.query.ConnectionlessTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PhoenixRuntime;
Expand All @@ -45,7 +52,7 @@


@RunWith(Parameterized.class)
public class IndexUpgradeToolTest {
public class IndexUpgradeToolTest extends BaseConnectionlessQueryTest{
private static final String INPUT_LIST = "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3";
private final boolean upgrade;
private static final String DUMMY_STRING_VALUE = "anyValue";
Expand Down Expand Up @@ -170,7 +177,11 @@ public static synchronized Collection<Boolean> data() {
}

private void setupConfForConnectionlessQuery(Configuration conf) {
conf.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
String connectionlessUrl = PhoenixRuntime.JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR
+ CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR
+ PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
PhoenixConfigurationUtil.setInputClusterUrl(conf, connectionlessUrl);
PhoenixConfigurationUtil.setOutputClusterUrl(conf, connectionlessUrl);
conf.unset(HConstants.ZOOKEEPER_CLIENT_PORT);
conf.unset(HConstants.ZOOKEEPER_ZNODE_PARENT);
}
Expand Down

0 comments on commit e3ecf16

Please sign in to comment.