Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.phoenix.end2end.index;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -26,8 +30,14 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.bouncycastle.util.Strings;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -39,6 +49,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.phoenix.coprocessor.MetaDataProtocol.DEFAULT_LOG_TTL;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -94,6 +106,36 @@ public void testReadIndexVerificationOutputRow() throws Exception {

}

@Test
public void testTTLOnOutputTable() throws SQLException, IOException {
String mockString = "mock_value";
byte[] mockStringBytes = Strings.toByteArray(mockString);
ManualEnvironmentEdge customClock = new ManualEnvironmentEdge();
customClock.setValue(1);
EnvironmentEdgeManager.injectEdge(customClock);
try (Connection conn = DriverManager.getConnection(getUrl())) {
HTable hTable = new HTable(config, OUTPUT_TABLE_NAME_BYTES);

IndexVerificationOutputRepository
outputRepository =
new IndexVerificationOutputRepository(mockStringBytes, conn);

outputRepository.createOutputTable(conn);
TestUtil.assertTableHasTtl(conn, TableName.valueOf(OUTPUT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL);
outputRepository.logToIndexToolOutputTable(mockStringBytes, mockStringBytes,
1, 2, mockString, mockStringBytes, mockStringBytes,
EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true);

Comment thread
swaroopak marked this conversation as resolved.
Assert.assertEquals(1, TestUtil.getRowCount(hTable, false));

customClock.incrementValue(1000*(DEFAULT_LOG_TTL+5));
EnvironmentEdgeManager.injectEdge(customClock);
int count = TestUtil.getRowCount(hTable, false);

Assert.assertEquals(0, count);
}
}

public void verifyOutputRow(IndexVerificationOutputRepository outputRepository, long scanMaxTs,
byte[] indexNameBytes, IndexVerificationOutputRow expectedRow)
throws IOException {
Expand Down Expand Up @@ -169,4 +211,17 @@ private void createTableAndIndexes(Connection conn, String dataTableName,
dataTableName + " (val1) include (val2, val3)");
conn.commit();
}

@After
public void dropOutputTable() throws Exception {
try(Connection conn = DriverManager.getConnection(getUrl())) {
ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = queryServices.getAdmin();
TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME_BYTES);
if (admin.tableExists(outputTableName)) {
((HBaseAdmin) admin).disableTable(OUTPUT_TABLE_NAME_BYTES);
((HBaseAdmin) admin).deleteTable(OUTPUT_TABLE_NAME_BYTES);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,36 @@
*/
package org.apache.phoenix.end2end.index;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.bouncycastle.util.Strings;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

import static org.apache.phoenix.coprocessor.MetaDataProtocol.DEFAULT_LOG_TTL;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

Expand All @@ -50,24 +65,53 @@ public void testReadResultRow() throws Exception {
byte[] indexNameBytes = Bytes.toBytes(indexName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
createTableAndIndex(conn, tableName, indexName);
IndexVerificationResultRepository resultRepository =
new IndexVerificationResultRepository(conn, indexNameBytes);
resultRepository.createResultTable(conn);
byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000");
byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000");
long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs);
resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
regionOne);
resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
regionTwo);
IndexVerificationResultRepository resultRepository = setupResultRepository(conn, indexNameBytes, expectedResult);
IndexToolVerificationResult actualResult =
resultRepository.getVerificationResult(conn, scanMaxTs);
assertVerificationResult(expectedResult, actualResult);
}
}

@Test
public void testTTLOnResultTable() throws SQLException, IOException {
String mockString = "mock_value";
byte[] mockStringBytes = Strings.toByteArray(mockString);
ManualEnvironmentEdge customClock = new ManualEnvironmentEdge();
customClock.setValue(1);
EnvironmentEdgeManager.injectEdge(customClock);
try (Connection conn = DriverManager.getConnection(getUrl())) {
HTable hTable = new HTable(config, RESULT_TABLE_NAME_BYTES);
long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs);
setupResultRepository(conn, mockStringBytes,expectedResult);

Assert.assertEquals(2, TestUtil.getRowCount(hTable, false));

customClock.incrementValue(1000*(DEFAULT_LOG_TTL+5));
EnvironmentEdgeManager.injectEdge(customClock);
int count = TestUtil.getRowCount(hTable, false);

Assert.assertEquals(0, count);
}
}

private IndexVerificationResultRepository setupResultRepository(Connection conn, byte[] indexNameBytes,IndexToolVerificationResult expectedResult)
throws SQLException, IOException {
IndexVerificationResultRepository resultRepository =
new IndexVerificationResultRepository(conn, indexNameBytes);
resultRepository.createResultTable(conn);
TestUtil.assertTableHasTtl(conn, TableName.valueOf(RESULT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL);
byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000");
byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000");
resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
regionOne);
resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
regionTwo);
return resultRepository;
}

private void assertVerificationResult(IndexToolVerificationResult expectedResult, IndexToolVerificationResult actualResult) {
assertEquals(expectedResult.getScanMaxTs(), actualResult.getScanMaxTs());
assertArrayEquals(expectedResult.getStartRow(), actualResult.getStartRow());
Expand Down Expand Up @@ -139,4 +183,17 @@ private void createTableAndIndex(Connection conn, String dataTableName,
dataTableName + " (val1) include (val2, val3)");
conn.commit();
}

@After
public void dropResultTable() throws Exception {
try(Connection conn = DriverManager.getConnection(getUrl())) {
ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = queryServices.getAdmin();
TableName outputTableName = TableName.valueOf(RESULT_TABLE_NAME_BYTES);
if (admin.tableExists(outputTableName)) {
((HBaseAdmin) admin).disableTable(RESULT_TABLE_NAME_BYTES);
((HBaseAdmin) admin).deleteTable(RESULT_TABLE_NAME_BYTES);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ public void createOutputTable(Connection connection) throws IOException, SQLExce
if (!admin.tableExists(outputTableName)) {
HTableDescriptor tableDescriptor = new
HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
tableDescriptor.setValue(HColumnDescriptor.TTL,
String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
columnDescriptor.setValue(HColumnDescriptor.TTL,
String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);
outputTable = admin.getConnection().getTable(outputTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ public void createResultTable(Connection connection) throws IOException, SQLExce
if (!admin.tableExists(resultTableName)) {
HTableDescriptor tableDescriptor = new
HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
columnDescriptor.setValue(HColumnDescriptor.TTL,
String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);
setResultTable(admin.getConnection().getTable(resultTableName));
Expand Down