Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-7407: Remove AttributesFactory (1/4) #4297

Merged
merged 2 commits into from
Nov 11, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,23 @@
*/
package org.apache.geode.rest.internal.web.controllers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
Expand All @@ -50,6 +45,7 @@
import org.apache.geode.internal.cache.PartitionAttributesImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionTestHelper;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.rest.internal.web.RestFunctionTemplate;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.RestAPITest;
Expand All @@ -61,14 +57,12 @@
* @since GemFire 8.0
*/

@Category({RestAPITest.class})
@Category(RestAPITest.class)
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase {

private final String REPLICATE_REGION_NAME = "sampleRRegion";

private final String PR_REGION_NAME = "samplePRRegion";
private final String REPLICATE_REGION_NAME = "sampleRRegion";

@Parameterized.Parameter
public String urlContext;
Expand All @@ -84,26 +78,27 @@ public RestAPIOnRegionFunctionExecutionDUnitTest() {


private void createPeer(DataPolicy policy) {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(policy);
Region region =
CacheFactory.getAnyInstance().createRegion(REPLICATE_REGION_NAME, factory.create());
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Region Created :" + region);
assertNotNull(region);
Region<Object, Object> region = CacheFactory.getAnyInstance()
.createRegionFactory()
.setDataPolicy(policy)
.setScope(Scope.DISTRIBUTED_ACK)
.create(REPLICATE_REGION_NAME);

assertThat(region).isNotNull();
}

private boolean createPeerWithPR() {
RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 10);
AttributesFactory raf = new AttributesFactory(ra);
PartitionAttributesImpl pa = new PartitionAttributesImpl();
pa.setAll(ra.getPartitionAttributes());
pa.setTotalNumBuckets(17);
raf.setPartitionAttributes(pa);

Region region = CacheFactory.getAnyInstance().createRegion(PR_REGION_NAME, raf.create());
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Region Created :" + region);
assertNotNull(region);
Region<Object, Object> region = CacheFactory.getAnyInstance()
.createRegionFactory()
.setPartitionAttributes(pa)
.create(PR_REGION_NAME);

assertThat(region).isNotNull();
return Boolean.TRUE;
}

Expand All @@ -117,20 +112,20 @@ private void populatePRRegion() {
}
// Assert there is data in each bucket
for (int bid = 0; bid < pr.getTotalNumberOfBuckets(); bid++) {
assertTrue(pr.getBucketKeys(bid).size() > 0);
assertThat(pr.getBucketKeys(bid).size() > 0).isTrue();
jujoramos marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void populateRRRegion() {
Region region = CacheFactory.getAnyInstance().getRegion(REPLICATE_REGION_NAME);
assertNotNull(region);
Region<String, Integer> region = CacheFactory.getAnyInstance().getRegion(REPLICATE_REGION_NAME);
assertThat(region).isNotNull();

final HashSet testKeys = new HashSet();
final HashSet<String> testKeys = new HashSet<>();
for (int i = 17 * 3; i > 0; i--) {
testKeys.add("execKey-" + i);
}
int j = 0;
for (final Object testKey : testKeys) {
for (final String testKey : testKeys) {
region.put(testKey, j++);
}

Expand Down Expand Up @@ -165,12 +160,12 @@ public void testOnRegionExecutionWithReplicateRegion() {
vm1.invoke("createPeer", () -> createPeer(DataPolicy.REPLICATE));
vm2.invoke("createPeer", () -> createPeer(DataPolicy.REPLICATE));

vm3.invoke("populateRRRegion", () -> populateRRRegion());
vm3.invoke("populateRRRegion", this::populateRRRegion);

CloseableHttpResponse response = executeFunctionThroughRestCall("SampleFunction",
REPLICATE_REGION_NAME, null, null, null, null);
assertEquals(200, response.getStatusLine().getStatusCode());
assertNotNull(response.getEntity());
assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(response.getEntity()).isNotNull();

assertCorrectInvocationCount("SampleFunction", 1, vm0, vm1, vm2, vm3);

Expand All @@ -179,35 +174,35 @@ public void testOnRegionExecutionWithReplicateRegion() {
}

@Test
public void testOnRegionExecutionWithPartitionRegion() throws Exception {
public void testOnRegionExecutionWithPartitionRegion() {
createCacheAndRegisterFunction();

createPeersWithPR(vm0, vm1, vm2, vm3);

vm3.invoke("populatePRRegion", () -> populatePRRegion());
vm3.invoke("populatePRRegion", this::populatePRRegion);

CloseableHttpResponse response =
executeFunctionThroughRestCall("SampleFunction", PR_REGION_NAME, null, null, null, null);
assertEquals(200, response.getStatusLine().getStatusCode());
assertNotNull(response.getEntity());
assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(response.getEntity()).isNotNull();

assertCorrectInvocationCount("SampleFunction", 4, vm0, vm1, vm2, vm3);

restURLs.clear();
}

@Test
public void testOnRegionWithFilterExecutionWithPartitionRegion() throws Exception {
public void testOnRegionWithFilterExecutionWithPartitionRegion() {
createCacheAndRegisterFunction();

createPeersWithPR(vm0, vm1, vm2, vm3);

vm3.invoke("populatePRRegion", () -> populatePRRegion());
vm3.invoke("populatePRRegion", this::populatePRRegion);

CloseableHttpResponse response =
executeFunctionThroughRestCall("SampleFunction", PR_REGION_NAME, "key2", null, null, null);
assertEquals(200, response.getStatusLine().getStatusCode());
assertNotNull(response.getEntity());
assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(response.getEntity()).isNotNull();

assertCorrectInvocationCount("SampleFunction", 1, vm0, vm1, vm2, vm3);

Expand All @@ -216,17 +211,17 @@ public void testOnRegionWithFilterExecutionWithPartitionRegion() throws Exceptio

private void createPeersWithPR(VM... vms) {
for (final VM vm : vms) {
vm.invoke("createPeerWithPR", () -> createPeerWithPR());
vm.invoke("createPeerWithPR", this::createPeerWithPR);
}
}

@Test
public void testOnRegionWithFilterExecutionWithPartitionRegionJsonArgs() throws Exception {
public void testOnRegionWithFilterExecutionWithPartitionRegionJsonArgs() {
createCacheAndRegisterFunction();

createPeersWithPR(vm0, vm1, vm2, vm3);

vm3.invoke("populatePRRegion", () -> populatePRRegion());
vm3.invoke("populatePRRegion", this::populatePRRegion);

String jsonBody = "[" + "{\"@type\": \"double\",\"@value\": 210}"
+ ",{\"@type\":\"org.apache.geode.rest.internal.web.controllers.Item\","
Expand All @@ -235,8 +230,8 @@ public void testOnRegionWithFilterExecutionWithPartitionRegionJsonArgs() throws

CloseableHttpResponse response = executeFunctionThroughRestCall("SampleFunction",
PR_REGION_NAME, null, jsonBody, null, null);
assertEquals(200, response.getStatusLine().getStatusCode());
assertNotNull(response.getEntity());
assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(response.getEntity()).isNotNull();

// Assert that only 1 node has executed the function.
assertCorrectInvocationCount("SampleFunction", 4, vm0, vm1, vm2, vm3);
Expand All @@ -253,36 +248,36 @@ public void testOnRegionWithFilterExecutionWithPartitionRegionJsonArgs() throws

response = executeFunctionThroughRestCall("SampleFunction", PR_REGION_NAME, "key2", jsonBody,
null, null);
assertEquals(200, response.getStatusLine().getStatusCode());
assertNotNull(response.getEntity());
assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(response.getEntity()).isNotNull();

// Assert that only 1 node has executed the function.
assertCorrectInvocationCount("SampleFunction", 1, vm0, vm1, vm2, vm3);

restURLs.clear();
}

private class SampleFunction extends RestFunctionTemplate {

public static final String Id = "SampleFunction";
private static class SampleFunction extends RestFunctionTemplate {
static final String Id = "SampleFunction";

@Override
@SuppressWarnings("unchecked")
public void execute(FunctionContext context) {
invocationCount++;
if (context instanceof RegionFunctionContext) {
RegionFunctionContext rfContext = (RegionFunctionContext) context;
rfContext.getDataSet().getCache().getLogger()
rfContext.getCache().getLogger()
.info("Executing function : SampleFunction.execute(hasResult=true) with filter: "
+ rfContext.getFilter() + " " + rfContext);
if (rfContext.getArguments() instanceof Boolean) {
/* return rfContext.getArguments(); */
if (hasResult()) {
rfContext.getResultSender().lastResult((Serializable) rfContext.getArguments());
rfContext.getResultSender().lastResult(rfContext.getArguments());
} else {
rfContext.getDataSet().getCache().getLogger()
rfContext.getCache().getLogger()
.info("Executing function : SampleFunction.execute(hasResult=false) " + rfContext);
while (!rfContext.getDataSet().isDestroyed()) {
rfContext.getDataSet().getCache().getLogger().info("For Bug43513 ");
rfContext.getCache().getLogger().info("For Bug43513 ");
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Expand All @@ -298,7 +293,7 @@ public void execute(FunctionContext context) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
rfContext.getDataSet().getCache().getLogger()
rfContext.getCache().getLogger()
.warning("Got Exception : Thread Interrupted" + e);
}
}
Expand All @@ -308,9 +303,9 @@ public void execute(FunctionContext context) {
* rfContext).get(key);
*/
rfContext.getResultSender().lastResult(
(Serializable) PartitionRegionHelper.getLocalDataForContext(rfContext).get(key));
PartitionRegionHelper.getLocalDataForContext(rfContext).get(key));
} else {
rfContext.getResultSender().lastResult((Serializable) rfContext.getDataSet().get(key));
rfContext.getResultSender().lastResult(rfContext.getDataSet().get(key));
}
/* return (Serializable)rfContext.getDataSet().get(key); */
} else if (rfContext.getArguments() instanceof Set) {
Expand All @@ -326,8 +321,8 @@ public void execute(FunctionContext context) {
/* return vals; */
} else if (rfContext.getArguments() instanceof HashMap) {
HashMap putData = (HashMap) rfContext.getArguments();
for (Iterator i = putData.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
for (Object o : putData.entrySet()) {
Map.Entry me = (Map.Entry) o;
rfContext.getDataSet().put(me.getKey(), me.getValue());
}
rfContext.getResultSender().lastResult(Boolean.TRUE);
Expand All @@ -339,11 +334,12 @@ public void execute(FunctionContext context) {
context.getResultSender().lastResult(Boolean.FALSE);
} else {
DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
LogWriter logger = ds.getLogWriter();
assertThat(ds).isNotNull();
Logger logger = LogService.getLogger();
logger.info("Executing in SampleFunction on Server : " + ds.getDistributedMember()
+ "with Context : " + context);
while (ds.isConnected()) {
logger.fine("Just executing function in infinite loop for Bug43513");
logger.debug("Just executing function in infinite loop for Bug43513");
try {
Thread.sleep(250);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -374,5 +370,4 @@ public boolean isHA() {
return false;
}
}

}