Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
import static org.apache.phoenix.query.BaseTest.generateUniqueName;
import static org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
import static org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
Expand All @@ -51,6 +48,7 @@
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.FailoverPhoenixConnection;
import org.apache.phoenix.jdbc.HABaseIT;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.jdbc.HighAvailabilityGroup;
import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
Expand All @@ -65,33 +63,20 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;

@Category(NeedsOwnMiniClusterTest.class)
public class ReplicationLogGroupIT {
public class ReplicationLogGroupIT extends HABaseIT {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogGroupIT.class);
private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS =
new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();

@ClassRule
public static TemporaryFolder standbyFolder = new TemporaryFolder();
@ClassRule
public static TemporaryFolder localFolder = new TemporaryFolder();

private static Configuration conf1;
private static Configuration conf2;
private static URI standbyUri;
private static URI fallbackUri;
private static String zkUrl;
private static String peerZkUrl;

Expand All @@ -106,14 +91,6 @@ public class ReplicationLogGroupIT {

@BeforeClass
public static void doSetup() throws Exception {
conf1 = CLUSTERS.getHBaseCluster1().getConfiguration();
conf1.setBoolean(SYNCHRONOUS_REPLICATION_ENABLED, true);
conf2 = CLUSTERS.getHBaseCluster2().getConfiguration();
conf2.setBoolean(SYNCHRONOUS_REPLICATION_ENABLED, true);
standbyUri = new Path(standbyFolder.getRoot().toString()).toUri();
fallbackUri = new Path(localFolder.getRoot().toString()).toUri();
conf1.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, standbyUri.toString());
conf1.set(ReplicationLogGroup.REPLICATION_FALLBACK_HDFS_URL_KEY, fallbackUri.toString());
conf1.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
CLUSTERS.start();
zkUrl = CLUSTERS.getZkUrl1();
Expand Down Expand Up @@ -313,6 +290,39 @@ public void testAppendAndSync() throws Exception {
}
}

@Test
public void testAppendAndSyncNoIndex() throws Exception {
final String tableName = "T_" + generateUniqueName();
try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) DriverManager
.getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
String ddl = String.format("create table %s (id1 integer not null, "
+ "id2 integer not null, val1 varchar, val2 varchar "
+ "constraint pk primary key (id1, id2))", tableName);
conn.createStatement().execute(ddl);
conn.commit();
PreparedStatement stmt =
conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, ?)");
// upsert 50 rows
int rowCount = 50;
for (int i = 0; i < 5; ++i) {
for (int j = 0; j < 10; ++j) {
stmt.setInt(1, i);
stmt.setInt(2, j);
stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
stmt.setString(4, null);
stmt.executeUpdate();
}
conn.commit();
}
// verify replication
Map<String, Integer> expected = Maps.newHashMap();
// mutation count will be equal to row count since the atomic upsert mutations will be
// ignored and therefore not replicated
expected.put(tableName, rowCount * 2); // Put + Delete
verifyReplication(expected);
}
}

/**
* This test simulates RS crashes in the middle of write transactions after the edits have been
* written to the WAL but before they have been replicated to the standby cluster. Those edits
Expand Down