Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,84 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.customizedstate.CustomizedStateProvider;
import org.apache.helix.customizedstate.CustomizedStateProviderFactory;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.model.CustomizedState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
private static Logger LOG = LoggerFactory.getLogger(TestCustomizedStateUpdate.class);
private final String CUSTOMIZE_STATE_NAME = "testState1";
private final String PARTITION_NAME1 = "testPartition1";
private final String PARTITION_NAME2 = "testPartition2";
private final String RESOURCE_NAME = "testResource1";
private final String PARTITION_STATE = "partitionState";
private static HelixManager _manager;
private static CustomizedStateProvider _mockProvider;

@Test
public void testUpdateCustomizedState() throws Exception {
HelixManager manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin",
InstanceType.ADMINISTRATOR, ZK_ADDR);
manager.connect();
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
_manager = HelixManagerFactory
.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
_participants[0].connect();
_mockProvider = CustomizedStateProviderFactory.getInstance()
.buildCustomizedStateProvider(_manager, _participants[0].getInstanceName());
}

@AfterClass
public void afterClass() throws Exception {
super.afterClass();
_manager.disconnect();
}

HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@BeforeMethod
public void beforeMethod() {
HelixDataAccessor dataAccessor = _manager.getHelixDataAccessor();
PropertyKey propertyKey = dataAccessor.keyBuilder()
.customizedStates(_participants[0].getInstanceName(), CUSTOMIZE_STATE_NAME);
CustomizedState customizedStates = manager.getHelixDataAccessor().getProperty(propertyKey);
dataAccessor.removeProperty(propertyKey);
CustomizedState customizedStates = dataAccessor.getProperty(propertyKey);
Assert.assertNull(customizedStates);
}

CustomizedStateProvider mockProvider = CustomizedStateProviderFactory.getInstance()
.buildCustomizedStateProvider(manager, _participants[0].getInstanceName());
@Test
public void testUpdateCustomizedState() {

// test adding customized state for a partition
Map<String, String> customizedStateMap = new HashMap<>();
customizedStateMap.put("PREVIOUS_STATE", "STARTED");
customizedStateMap.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
_mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
customizedStateMap);

CustomizedState customizedState =
mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
Map<String, Map<String, String>> mapView = customizedState.getRecord().getMapFields();
Expand All @@ -79,10 +109,10 @@ public void testUpdateCustomizedState() throws Exception {
// test partial update customized state for previous partition
Map<String, String> stateMap1 = new HashMap<>();
stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
stateMap1);
_mockProvider
.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1, stateMap1);

customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
Expand All @@ -96,10 +126,10 @@ public void testUpdateCustomizedState() throws Exception {
stateMap1 = new HashMap<>();
stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
stateMap1.put("CURRENT_STATE", "COMPLETED");
mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
stateMap1);
_mockProvider
.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1, stateMap1);

customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
Expand All @@ -113,40 +143,176 @@ public void testUpdateCustomizedState() throws Exception {
Map<String, String> stateMap2 = new HashMap<>();
stateMap2.put("PREVIOUS_STATE", "STARTED");
stateMap2.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2,
stateMap2);
_mockProvider
.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2, stateMap2);

customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 2);
Assert.assertEqualsNoOrder(mapView.keySet().toArray(), new String[] {
PARTITION_NAME1, PARTITION_NAME2
});
Assert.assertEqualsNoOrder(mapView.keySet().toArray(),
new String[]{PARTITION_NAME1, PARTITION_NAME2});

Map<String, String> partitionMap1 = mockProvider
Map<String, String> partitionMap1 = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
Assert.assertEquals(partitionMap1.keySet().size(), 2);
Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED");

Map<String, String> partitionMap2 = mockProvider
Map<String, String> partitionMap2 = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
Assert.assertEquals(partitionMap2.keySet().size(), 2);
Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED");
Assert.assertEquals(partitionMap2.get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");

// test delete customized state for a partition
mockProvider.deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1);
customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
_mockProvider
.deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME2);
}

@Test
public void testUpdateSinglePartitionCustomizedState() {
_mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
PARTITION_STATE);

// get customized state
CustomizedState customizedState =
_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE)
.size(), 1);
Map<String, String> map = new HashMap<>();
map.put(PARTITION_NAME1, null);
Assert.assertEquals(customizedState
.getPartitionStateMap(CustomizedState.CustomizedStateProperty.PREVIOUS_STATE), map);
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME),
map);
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.END_TIME),
map);
Assert.assertEquals(customizedState.getState(PARTITION_NAME1), PARTITION_STATE);
Assert.assertNull(customizedState.getState(PARTITION_NAME2));
Assert.assertTrue(customizedState.isValid());

// get per partition customized state
map = new HashMap<>();
map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), PARTITION_STATE);
Map<String, String> partitionCustomizedState = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
Assert.assertEquals(partitionCustomizedState, map);
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
}

@Test
public void testUpdateSinglePartitionCustomizedStateWithNullField() {
_mockProvider
.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1, (String) null);

// get customized state
CustomizedState customizedState =
_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Map<String, String> map = new HashMap<>();
map.put(PARTITION_NAME1, null);
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE),
map);
Assert.assertEquals(customizedState.getState(PARTITION_NAME1), null);
Assert.assertTrue(customizedState.isValid());

// get per partition customized state
map = new HashMap<>();
map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), null);
Map<String, String> partitionCustomizedState = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
Assert.assertEquals(partitionCustomizedState, map);
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
}

@Test
public void testUpdateCustomizedStateWithEmptyMap() {
_mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
new HashMap<>());

// get customized state
CustomizedState customizedState =
_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNull(customizedState.getState(PARTITION_NAME1));
Map<String, String> partitionStateMap =
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE);
Assert.assertNotNull(partitionStateMap);
Assert.assertTrue(partitionStateMap.containsKey(PARTITION_NAME1));
Assert.assertNull(partitionStateMap.get(PARTITION_NAME1));
Assert.assertNull(customizedState.getState(PARTITION_NAME1));
Assert.assertFalse(partitionStateMap.containsKey(PARTITION_NAME2));
Assert.assertTrue(customizedState.isValid());

// get per partition customized state
Map<String, String> partitionCustomizedState = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
Assert.assertEquals(partitionCustomizedState.size(), 0);
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
}

@Test
public void testDeleteNonExistingPerPartitionCustomizedState() {
_mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
PARTITION_STATE);
_mockProvider
.deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
Assert.assertNotNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1));
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
}

@Test
public void testSimultaneousUpdateCustomizedState() {
int n = 10;

List<Callable<Boolean>> threads = new ArrayList<>();
for (int i = 0; i < n; i++) {
threads.add(new TestSimultaneousUpdate());
}
Map<String, Boolean> resultMap = TestHelper.startThreadsConcurrently(threads, 1000);
Assert.assertEquals(resultMap.size(), n);
Boolean[] results = new Boolean[n];
Arrays.fill(results, true);
Assert.assertEqualsNoOrder(resultMap.values().toArray(), results);
}

private static class TestSimultaneousUpdate implements Callable<Boolean> {
private Random rand = new Random();

manager.disconnect();
@Override
public Boolean call() {
String customizedStateName = "testState";
String resourceName = "resource" + String.valueOf(rand.nextInt(10));
String partitionName = "partition" + String.valueOf(rand.nextInt(10));
String partitionState = "Updated";
try {
_mockProvider.updateCustomizedState(customizedStateName, resourceName, partitionName,
partitionState);
} catch (Exception e) {
return false;
}
Map<String, String> states = _mockProvider
.getPerPartitionCustomizedState(customizedStateName, resourceName, partitionName);
if (states == null) {
return false;
}
return states.get(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name())
.equals(partitionState);
}
}
}