Skip to content

Commit

Permalink
PHOENIX-2373- Change ReserveNSequence Udf to take in zookeeper and te…
Browse files Browse the repository at this point in the history
…ntantId
  • Loading branch information
siddhi1305 authored and jfernandosf committed Nov 10, 2015
1 parent e1e4344 commit 8a5046e
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 37 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Properties;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.pig.data.Tuple; import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory; import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.UDFContext;
Expand All @@ -52,7 +54,7 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT {
private static final long MAX_VALUE = 10; private static final long MAX_VALUE = 10;


private static TupleFactory TF; private static TupleFactory TF;
private static Connection conn; private static Connection globalConn;
private static String zkQuorum; private static String zkQuorum;
private static Configuration conf; private static Configuration conf;
private static UDFContext udfContext; private static UDFContext udfContext;
Expand All @@ -65,15 +67,14 @@ public static void setUpBeforeClass() throws Exception {
conf = getTestClusterConfig(); conf = getTestClusterConfig();
zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig()); zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig());
conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
// Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); globalConn = DriverManager.getConnection(getUrl());
conn = DriverManager.getConnection(getUrl());
// Pig variables // Pig variables
TF = TupleFactory.getInstance(); TF = TupleFactory.getInstance();
} }


@Before @Before
public void setUp() throws SQLException { public void setUp() throws SQLException {
createSequence(); createSequence(globalConn);
createUdfContext(); createUdfContext();
} }


Expand Down Expand Up @@ -140,18 +141,69 @@ public void testSequenceNotExisting() throws Exception {
props.setErrorMessage("Sequence undefined"); props.setErrorMessage("Sequence undefined");
doTest(props); doTest(props);
} }

/**
* Test reserving sequence with tenant Id passed to udf.
* @throws Exception
*/
@Test
public void testTenantSequence() throws Exception {
Properties tentantProps = new Properties();
String tenantId = "TENANT";
tentantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
Connection tenantConn = DriverManager.getConnection(getUrl(), tentantProps);
createSequence(tenantConn);

try {
UDFTestProperties props = new UDFTestProperties(3);

// validates UDF reservation is for that tentant
doTest(tenantConn, props);

// validate global sequence value is still set to 1
assertEquals(1L, getNextSequenceValue(globalConn));
} finally {
dropSequence(tenantConn);
}
}

/**
* Test Use the udf to reserve multiple tuples
*
* @throws Exception
*/
@Test
public void testMultipleTuples() throws Exception {
Tuple tuple = TF.newTuple(2);
tuple.set(0, 2L);
tuple.set(1, SEQUENCE_NAME);


final String tentantId = globalConn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB);
ReserveNSequence udf = new ReserveNSequence(zkQuorum, tentantId);

for (int i = 0; i < 2; i++) {
udf.exec(tuple);
}
long nextValue = getNextSequenceValue(globalConn);
assertEquals(5L, nextValue);
}

private void doTest(UDFTestProperties props) throws Exception { private void doTest(UDFTestProperties props) throws Exception {
setCurrentValue(props.getCurrentValue()); doTest(globalConn, props);
}

private void doTest(Connection conn, UDFTestProperties props) throws Exception {
setCurrentValue(conn, props.getCurrentValue());
Tuple tuple = TF.newTuple(3); Tuple tuple = TF.newTuple(3);
tuple.set(0, props.getNumToReserve()); tuple.set(0, props.getNumToReserve());
tuple.set(1, props.getSequenceName()); tuple.set(1, props.getSequenceName());
tuple.set(2, zkQuorum); tuple.set(2, zkQuorum);
Long result = null; Long result = null;
try { try {
ReserveNSequence udf = new ReserveNSequence(); final String tenantId = conn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB);
ReserveNSequence udf = new ReserveNSequence(zkQuorum, tenantId);
result = udf.exec(tuple); result = udf.exec(tuple);
validateReservedSequence(props.getCurrentValue(), props.getNumToReserve(), result); validateReservedSequence(conn, props.getCurrentValue(), props.getNumToReserve(), result);
} catch (Exception e) { } catch (Exception e) {
if (props.isExceptionExpected()) { if (props.isExceptionExpected()) {
assertEquals(props.getExceptionClass(), e.getClass()); assertEquals(props.getExceptionClass(), e.getClass());
Expand All @@ -160,42 +212,40 @@ private void doTest(UDFTestProperties props) throws Exception {
throw e; throw e;
} }
} }

} }


private void createUdfContext() { private void createUdfContext() {
conf.set(ReserveNSequence.SEQUENCE_NAME_CONF_KEY, SEQUENCE_NAME);
udfContext = UDFContext.getUDFContext(); udfContext = UDFContext.getUDFContext();
udfContext.addJobConf(conf); udfContext.addJobConf(conf);
} }


private void validateReservedSequence(Long currentValue, long count, Long result) throws SQLException { private void validateReservedSequence(Connection conn, Long currentValue, long count, Long result) throws SQLException {
Long startIndex = currentValue + 1; Long startIndex = currentValue + 1;
assertEquals("Start index is incorrect", startIndex, result); assertEquals("Start index is incorrect", startIndex, result);
final long newNextSequenceValue = getNextSequenceValue(); final long newNextSequenceValue = getNextSequenceValue(conn);
assertEquals(startIndex + count, newNextSequenceValue); assertEquals(startIndex + count, newNextSequenceValue);
} }


private void createSequence() throws SQLException { private void createSequence(Connection conn) throws SQLException {
conn.createStatement().execute(String.format(CREATE_SEQUENCE_SYNTAX, SEQUENCE_NAME, 1, 1, 1, MAX_VALUE, 1)); conn.createStatement().execute(String.format(CREATE_SEQUENCE_SYNTAX, SEQUENCE_NAME, 1, 1, 1, MAX_VALUE, 1));
conn.commit(); conn.commit();
} }


private void setCurrentValue(long currentValue) throws SQLException { private void setCurrentValue(Connection conn, long currentValue) throws SQLException {
for (int i = 1; i <= currentValue; i++) { for (int i = 1; i <= currentValue; i++) {
getNextSequenceValue(); getNextSequenceValue(conn);
} }
} }


private long getNextSequenceValue() throws SQLException { private long getNextSequenceValue(Connection conn) throws SQLException {
String ddl = new StringBuilder().append("SELECT NEXT VALUE FOR ").append(SEQUENCE_NAME).toString(); String ddl = new StringBuilder().append("SELECT NEXT VALUE FOR ").append(SEQUENCE_NAME).toString();
ResultSet rs = conn.createStatement().executeQuery(ddl); ResultSet rs = conn.createStatement().executeQuery(ddl);
assertTrue(rs.next()); assertTrue(rs.next());
conn.commit(); conn.commit();
return rs.getLong(1); return rs.getLong(1);
} }


private void dropSequence() throws Exception { private void dropSequence(Connection conn) throws Exception {
String ddl = new StringBuilder().append("DROP SEQUENCE ").append(SEQUENCE_NAME).toString(); String ddl = new StringBuilder().append("DROP SEQUENCE ").append(SEQUENCE_NAME).toString();
conn.createStatement().execute(ddl); conn.createStatement().execute(ddl);
conn.commit(); conn.commit();
Expand All @@ -204,12 +254,12 @@ private void dropSequence() throws Exception {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
udfContext.reset(); udfContext.reset();
dropSequence(); dropSequence(globalConn);
} }


@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
conn.close(); globalConn.close();
} }


/** /**
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,17 +1,22 @@
/** /**
* * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.phoenix.pig.udf; package org.apache.phoenix.pig.udf;


/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
* applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
Expand All @@ -20,11 +25,16 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.pig.EvalFunc; import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple; import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.UDFContext;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;


/** /**
* UDF to Reserve a chunk of numbers for a given sequence * UDF to Reserve a chunk of numbers for a given sequence
Expand All @@ -34,29 +44,37 @@
*/ */
public class ReserveNSequence extends EvalFunc<Long> { public class ReserveNSequence extends EvalFunc<Long> {


public static final String INVALID_TUPLE_MESSAGE = "Tuple should have correct fields(NumtoReserve,SequenceName,zkquorum."; public static final String INVALID_TUPLE_MESSAGE = "Tuple should have correct fields(NumtoReserve,SequenceName).";
public static final String EMPTY_SEQUENCE_NAME_MESSAGE = "Sequence name should be not null"; public static final String EMPTY_SEQUENCE_NAME_MESSAGE = "Sequence name should be not null";
public static final String EMPTY_ZK_MESSAGE = "ZKQuorum should be not null"; public static final String EMPTY_ZK_MESSAGE = "ZKQuorum should be not null";
public static final String INVALID_NUMBER_MESSAGE = "Number of Sequences to Reserve should be greater than 0"; public static final String INVALID_NUMBER_MESSAGE = "Number of Sequences to Reserve should be greater than 0";
public static final String SEQUENCE_NAME_CONF_KEY = "phoenix.sequence.name";


private final String zkQuorum;
private final String tenantId;
private Configuration configuration;
Connection connection;

public ReserveNSequence(@NonNull String zkQuorum, @Nullable String tenantId) {
Preconditions.checkNotNull(zkQuorum, EMPTY_ZK_MESSAGE);
this.zkQuorum = zkQuorum;
this.tenantId = tenantId;
}
/** /**
* Reserve N next sequences for a sequence name. N is the first field in the tuple. Sequence name is the second * Reserve N next sequences for a sequence name. N is the first field in the tuple. Sequence name is the second
* field in the tuple zkquorum is the third field in the tuple * field in the tuple zkquorum is the third field in the tuple
*/ */
@Override @Override
public Long exec(Tuple input) throws IOException { public Long exec(Tuple input) throws IOException {
Preconditions.checkArgument(input != null && input.size() == 3, INVALID_TUPLE_MESSAGE); Preconditions.checkArgument(input != null && input.size() >= 2, INVALID_TUPLE_MESSAGE);
Long numToReserve = (Long)(input.get(0)); Long numToReserve = (Long)(input.get(0));
Preconditions.checkArgument(numToReserve > 0, INVALID_NUMBER_MESSAGE); Preconditions.checkArgument(numToReserve > 0, INVALID_NUMBER_MESSAGE);
String sequenceName = (String)input.get(1); String sequenceName = (String)input.get(1);
Preconditions.checkNotNull(sequenceName, EMPTY_SEQUENCE_NAME_MESSAGE); Preconditions.checkNotNull(sequenceName, EMPTY_SEQUENCE_NAME_MESSAGE);
String zkquorum = (String)input.get(2); // It will create a connection when called for the first Tuple per task.
Preconditions.checkNotNull(zkquorum, EMPTY_ZK_MESSAGE); // The connection gets cleaned up in finish() method
UDFContext context = UDFContext.getUDFContext(); if (connection == null) {
Configuration configuration = context.getJobConf(); initConnection();
configuration.set(HConstants.ZOOKEEPER_QUORUM, zkquorum); }
Connection connection = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
connection = ConnectionUtil.getOutputConnection(configuration); connection = ConnectionUtil.getOutputConnection(configuration);
Expand All @@ -79,6 +97,38 @@ public Long exec(Tuple input) throws IOException {
} }
} }
} }

/**
* Cleanup to be performed at the end.
* Close connection
*/
@Override
public void finish() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("Caught exception while closing connection", e);
}
}
}

private void initConnection() throws IOException {
// Create correct configuration to be used to make phoenix connections
UDFContext context = UDFContext.getUDFContext();
configuration = new Configuration(context.getJobConf());
configuration.set(HConstants.ZOOKEEPER_QUORUM, this.zkQuorum);
if (Strings.isNullOrEmpty(tenantId)) {
configuration.unset(PhoenixRuntime.TENANT_ID_ATTRIB);
} else {
configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
try {
connection = ConnectionUtil.getOutputConnection(configuration);
} catch (SQLException e) {
throw new IOException("Caught exception while creating connection", e);
}
}


private String getNextNSequenceSelectStatement(Long numToReserve, String sequenceName) { private String getNextNSequenceSelectStatement(Long numToReserve, String sequenceName) {
return new StringBuilder().append("SELECT NEXT " + numToReserve + " VALUES" + " FOR ").append(sequenceName) return new StringBuilder().append("SELECT NEXT " + numToReserve + " VALUES" + " FOR ").append(sequenceName)
Expand Down

0 comments on commit 8a5046e

Please sign in to comment.