Skip to content

Commit

Permalink
PHOENIX-3895 Upgrade to Apache Avatica 1.10.0
Browse files Browse the repository at this point in the history
With the Avatica Array support, Phoenix code may see an Array
implementation which _isn't_ PhoenixArray. We can make a
best-effort to convert the provided Array into a PhoenixArray.
  • Loading branch information
joshelser committed Jun 2, 2017
1 parent 7956fb0 commit 6af7b4f
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Array;
import java.sql.SQLException;
import java.sql.Types;
import java.text.Format;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -86,10 +88,34 @@ public static byte getSeparatorByte(boolean rowKeyOrderOptimizable, SortOrder so
public byte[] toBytes(Object object, PDataType baseType, SortOrder sortOrder) {
return toBytes(object, baseType, sortOrder, true);
}

/**
* Ensures that the provided {@code object} is a PhoenixArray, attempting a conversion in the
* case when it is not.
*/
PhoenixArray toPhoenixArray(Object object, PDataType baseType) {
if (object instanceof PhoenixArray) {
return (PhoenixArray) object;
}
if (!(object instanceof Array)) {
throw new IllegalArgumentException("Expected an Array but got " + object.getClass());
}
Array arr = (Array) object;
try {
Object untypedArrayData = arr.getArray();
if (!(untypedArrayData instanceof Object[])) {
throw new IllegalArgumentException("Array data is required to be Object[] but data for "
+ arr.getClass() + " is " + untypedArrayData.getClass());
}
return this.getArrayFactory().newArray(baseType, (Object[]) untypedArrayData);
} catch (SQLException e) {
throw new IllegalArgumentException("Could not convert Array data", e);
}
}

public byte[] toBytes(Object object, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
if (object == null) { throw new ConstraintViolationException(this + " may not be null"); }
PhoenixArray arr = ((PhoenixArray)object);
PhoenixArray arr = toPhoenixArray(object, baseType);
int noOfElements = arr.numElements;
if (noOfElements == 0) { return ByteUtil.EMPTY_BYTE_ARRAY; }
TrustedByteArrayOutputStream byteStream = null;
Expand All @@ -115,7 +141,7 @@ public byte[] toBytes(Object object, PDataType baseType, SortOrder sortOrder, bo
}
DataOutputStream oStream = new DataOutputStream(byteStream);
// Handles bit inversion also
return createArrayBytes(byteStream, oStream, (PhoenixArray)object, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
return createArrayBytes(byteStream, oStream, arr, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
}

public static int serializeNulls(DataOutputStream oStream, int nulls) throws IOException {
Expand Down Expand Up @@ -376,7 +402,7 @@ private static int getOffset(ByteBuffer indexBuffer, int arrayIndex, boolean use

@Override
public Object toObject(Object object, PDataType actualType) {
return object;
return toPhoenixArray(object, arrayBaseType(actualType));
}

public Object toObject(Object object, PDataType actualType, SortOrder sortOrder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.math.BigInteger;
import java.math.MathContext;
import java.math.RoundingMode;
import java.sql.Array;
import java.sql.SQLException;
import java.text.Format;
import java.util.Random;

Expand Down Expand Up @@ -1152,9 +1154,19 @@ public static PDataType fromLiteral(Object value) {
if (value == null) { return null; }
for (PDataType type : PDataType.values()) {
if (type.isArrayType()) {
PhoenixArray arr = (PhoenixArray)value;
if ((type.getSqlType() == arr.baseType.sqlType + PDataType.ARRAY_TYPE_BASE)
&& type.getJavaClass().isInstance(value)) { return type; }
if (value instanceof PhoenixArray) {
PhoenixArray arr = (PhoenixArray)value;
if ((type.getSqlType() == arr.baseType.sqlType + PDataType.ARRAY_TYPE_BASE)
&& type.getJavaClass().isInstance(value)) { return type; }
} else {
Array arr = (Array) value;
try {
// Does the array's component type make sense for what we were told it is
if (arr.getBaseType() == type.getSqlType() - PDataType.ARRAY_TYPE_BASE) {
return type;
}
} catch (SQLException e) { /* Passthrough to fail */ }
}
} else {
if (type.getJavaClass().isInstance(value)) { return type; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.phoenix.schema.types;

import java.math.BigDecimal;
import java.sql.Array;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Map;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -1211,4 +1216,72 @@ public void testIsCoercibleTo() {
}
}
}

@Test
public void testArrayConversion() {
final String[] data = new String[] {"asdf", "qwerty"};
PhoenixArray phxArray = PArrayDataType.instantiatePhoenixArray(PVarchar.INSTANCE, data);
assertTrue("Converting a PhoenixArray to a PhoenixArray should return the same object",
phxArray == PVarcharArray.INSTANCE.toPhoenixArray(phxArray, PVarchar.INSTANCE));
// Create a skeleton of an Array which isn't a PhoenixArray. Make sure we can convert that.
Array customArray = new Array() {

@Override
public String getBaseTypeName() throws SQLException {
return "VARCHAR";
}

@Override
public int getBaseType() throws SQLException {
return Types.VARCHAR;
}

@Override
public Object getArray() throws SQLException {
return data;
}

@Override
public Object getArray(Map<String,Class<?>> map) throws SQLException {
return null;
}

@Override
public Object getArray(long index, int count) throws SQLException {
return null;
}

@Override
public Object getArray(long index, int count, Map<String,Class<?>> map)
throws SQLException {
return null;
}

@Override
public ResultSet getResultSet() throws SQLException {
return null;
}

@Override
public ResultSet getResultSet(Map<String,Class<?>> map) throws SQLException {
return null;
}

@Override
public ResultSet getResultSet(long index, int count) throws SQLException {
return null;
}

@Override
public ResultSet getResultSet(long index, int count, Map<String,Class<?>> map)
throws SQLException {
return null;
}

@Override public void free() throws SQLException {}
};

PhoenixArray copy = PVarcharArray.INSTANCE.toPhoenixArray(customArray, PVarchar.INSTANCE);
assertEquals(phxArray, copy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
Expand All @@ -45,7 +46,9 @@
import org.apache.phoenix.queryserver.client.ThinClientUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/**
* Smoke test for query server.
Expand All @@ -58,6 +61,9 @@ public class QueryServerBasicsIT extends BaseHBaseManagedTimeIT {
private static Configuration CONF;
private static String CONN_STRING;

@Rule
public TestName name = new TestName();

@BeforeClass
public static void beforeClass() throws Exception {
CONF = getTestClusterConfig();
Expand Down Expand Up @@ -123,7 +129,7 @@ public void testSchemas() throws Exception {

@Test
public void smokeTest() throws Exception {
final String tableName = getClass().getSimpleName().toUpperCase() + System.currentTimeMillis();
final String tableName = name.getMethodName();
try (final Connection connection = DriverManager.getConnection(CONN_STRING)) {
assertThat(connection.isClosed(), is(false));
connection.setAutoCommit(true);
Expand Down Expand Up @@ -162,4 +168,151 @@ public void smokeTest() throws Exception {
}
}
}

@Test
public void arrayTest() throws Exception {
final String tableName = name.getMethodName();
try (Connection conn = DriverManager.getConnection(CONN_STRING);
Statement stmt = conn.createStatement()) {
conn.setAutoCommit(false);
assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
assertFalse(stmt.execute("CREATE TABLE " + tableName + " ("
+ "pk VARCHAR NOT NULL PRIMARY KEY, "
+ "histogram INTEGER[])")
);
conn.commit();
int numRows = 10;
int numEvenElements = 4;
int numOddElements = 6;
for (int i = 0; i < numRows; i++) {
int arrayLength = i % 2 == 0 ? numEvenElements : numOddElements;
StringBuilder sb = new StringBuilder();
for (int arrayOffset = 0; arrayOffset < arrayLength; arrayOffset++) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(getArrayValueForOffset(arrayOffset));
}
String updateSql = "UPSERT INTO " + tableName + " values('" + i + "', " + "ARRAY[" + sb.toString() + "])";
assertEquals(1, stmt.executeUpdate(updateSql));
}
conn.commit();
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName)) {
for (int i = 0; i < numRows; i++) {
assertTrue(rs.next());
assertEquals(i, Integer.parseInt(rs.getString(1)));
Array array = rs.getArray(2);
Object untypedArrayData = array.getArray();
assertTrue("Expected array data to be an int array, but was " + untypedArrayData.getClass(), untypedArrayData instanceof Object[]);
Object[] arrayData = (Object[]) untypedArrayData;
int expectedArrayLength = i % 2 == 0 ? numEvenElements : numOddElements;
assertEquals(expectedArrayLength, arrayData.length);
for (int arrayOffset = 0; arrayOffset < expectedArrayLength; arrayOffset++) {
assertEquals(getArrayValueForOffset(arrayOffset), arrayData[arrayOffset]);
}
}
assertFalse(rs.next());
}
}
}

@Test
public void preparedStatementArrayTest() throws Exception {
final String tableName = name.getMethodName();
try (Connection conn = DriverManager.getConnection(CONN_STRING);
Statement stmt = conn.createStatement()) {
conn.setAutoCommit(false);
assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
assertFalse(stmt.execute("CREATE TABLE " + tableName + " ("
+ "pk VARCHAR NOT NULL PRIMARY KEY, "
+ "histogram INTEGER[])")
);
conn.commit();
int numRows = 10;
int numEvenElements = 4;
int numOddElements = 6;
try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?)")) {
for (int i = 0; i < numRows; i++) {
pstmt.setString(1, Integer.toString(i));
int arrayLength = i % 2 == 0 ? numEvenElements : numOddElements;
Object[] arrayData = new Object[arrayLength];
for (int arrayOffset = 0; arrayOffset < arrayLength; arrayOffset++) {
arrayData[arrayOffset] = getArrayValueForOffset(arrayOffset);
}
pstmt.setArray(2, conn.createArrayOf("INTEGER", arrayData));
assertEquals(1, pstmt.executeUpdate());
}
conn.commit();
}
conn.commit();
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName)) {
for (int i = 0; i < numRows; i++) {
assertTrue(rs.next());
assertEquals(i, Integer.parseInt(rs.getString(1)));
Array array = rs.getArray(2);
Object untypedArrayData = array.getArray();
assertTrue("Expected array data to be an int array, but was " + untypedArrayData.getClass(), untypedArrayData instanceof Object[]);
Object[] arrayData = (Object[]) untypedArrayData;
int expectedArrayLength = i % 2 == 0 ? numEvenElements : numOddElements;
assertEquals(expectedArrayLength, arrayData.length);
for (int arrayOffset = 0; arrayOffset < expectedArrayLength; arrayOffset++) {
assertEquals(getArrayValueForOffset(arrayOffset), arrayData[arrayOffset]);
}
}
assertFalse(rs.next());
}
}
}

@Test
public void preparedStatementVarcharArrayTest() throws Exception {
final String tableName = name.getMethodName();
try (Connection conn = DriverManager.getConnection(CONN_STRING);
Statement stmt = conn.createStatement()) {
conn.setAutoCommit(false);
assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
assertFalse(stmt.execute("CREATE TABLE " + tableName + " ("
+ "pk VARCHAR NOT NULL PRIMARY KEY, "
+ "histogram VARCHAR[])")
);
conn.commit();
int numRows = 10;
int numEvenElements = 4;
int numOddElements = 6;
try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?)")) {
for (int i = 0; i < numRows; i++) {
pstmt.setString(1, Integer.toString(i));
int arrayLength = i % 2 == 0 ? numEvenElements : numOddElements;
Object[] arrayData = new Object[arrayLength];
for (int arrayOffset = 0; arrayOffset < arrayLength; arrayOffset++) {
arrayData[arrayOffset] = Integer.toString(getArrayValueForOffset(arrayOffset));
}
pstmt.setArray(2, conn.createArrayOf("VARCHAR", arrayData));
assertEquals(1, pstmt.executeUpdate());
}
conn.commit();
}
conn.commit();
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName)) {
for (int i = 0; i < numRows; i++) {
assertTrue(rs.next());
assertEquals(i, Integer.parseInt(rs.getString(1)));
Array array = rs.getArray(2);
Object untypedArrayData = array.getArray();
assertTrue("Expected array data to be an int array, but was " + untypedArrayData.getClass(), untypedArrayData instanceof Object[]);
Object[] arrayData = (Object[]) untypedArrayData;
int expectedArrayLength = i % 2 == 0 ? numEvenElements : numOddElements;
assertEquals(expectedArrayLength, arrayData.length);
for (int arrayOffset = 0; arrayOffset < expectedArrayLength; arrayOffset++) {
assertEquals(Integer.toString(getArrayValueForOffset(arrayOffset)), arrayData[arrayOffset]);
}
}
assertFalse(rs.next());
}
}
}

private int getArrayValueForOffset(int arrayOffset) {
return arrayOffset * 2 + 1;
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
<!-- Do not change jodatime.version until HBASE-15199 is fixed -->
<jodatime.version>1.6</jodatime.version>
<joni.version>2.1.2</joni.version>
<avatica.version>1.9.0</avatica.version>
<avatica.version>1.10.0</avatica.version>
<jettyVersion>8.1.7.v20120910</jettyVersion>
<tephra.version>0.12.0-incubating</tephra.version>
<spark.version>2.0.2</spark.version>
Expand Down

0 comments on commit 6af7b4f

Please sign in to comment.