Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Merge pull request #145 from ppalaga/HAWKULAR-235
Browse files Browse the repository at this point in the history
HAWKULAR-235 Make Pinger listen to URLs removed
  • Loading branch information
pilhuhn committed May 28, 2015
2 parents 104c57c + 54469cf commit cee313c
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 77 deletions.
1 change: 1 addition & 0 deletions .travis.force.build
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Change this file to force Travis to perform a rebuild and redeploy to repo
-
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.hawkular.inventory.api.Inventory;
import org.hawkular.inventory.api.filters.With;
import org.hawkular.inventory.api.model.Resource;
import rx.functions.Action1;

import javax.annotation.PostConstruct;
import javax.ejb.EJB;
Expand All @@ -30,8 +29,8 @@
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Startup;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -51,50 +50,6 @@
@Singleton
public class PingManager {

/**
* Collects new URLs reported by {@link PingManager#inventory} and synchronizes the various threads reporting the
* new URLs and those ones consuming them.
*
* @author <a href="https://github.com/ppalaga">Peter Palaga</a>
*/
static class NewUrlsCollector implements Action1<Resource> {
private final Object lock = new Object();
private List<PingDestination> newUrls = new ArrayList<>();

/**
* A callback for the {@link Inventory} that collects newly added URLs. It is safe to call this method
* concurrently from any random thread.
*
* @see rx.functions.Action1#call(java.lang.Object)
*/
@Override
public void call(Resource r) {
if (PingDestination.isUrl(r)) {
synchronized (lock) {
newUrls.add(PingDestination.from(r));
}
}
}

/**
* Returns the list of {@link PingDestination}s collected by this {@link NewUrlsCollector}. It is safe to call
* this method concurrently from any random thread.
*
* @return the list of {@link PingDestination}s
*/
public List<PingDestination> getNewUrls() {
synchronized (lock) {
if (this.newUrls.isEmpty()) {
return Collections.emptyList();
} else {
List<PingDestination> result = this.newUrls;
this.newUrls = new ArrayList<>();
return result;
}
}
}
}

/** How many rounds a WAIT_MILLIS do we wait for results to come in? */
private static final int ROUNDS = 15;
/** How long do we wait between each round in milliseconds */
Expand All @@ -117,22 +72,25 @@ public List<PingDestination> getNewUrls() {
TraitsPublisher traitsPublisher;

@javax.annotation.Resource(lookup = "java:global/Hawkular/ObservableInventory")
private Inventory.Mixin.Observable inventory;
Inventory.Mixin.Observable inventory;

final NewUrlsCollector newUrlsCollector = new NewUrlsCollector();
final UrlChangesCollector urlChangesCollector = new UrlChangesCollector();

@PostConstruct
public void startUp() {

/*
* Add the observer before reading the existing URLs from the inventory so that we do not loose the URLs that
* could have been added between those two calls.
* Add the observers before reading the existing URLs from the inventory so that we do not loose the URLs that
* could have been added or removed between those two calls.
*/
inventory.observable(Interest.in(Resource.class).being(Action.created())).subscribe(newUrlsCollector);

//we use just an observable inventory here, because it allows us to see all the tenants. This essentially
//circumvents any authz present on the inventory.
//We need that though because pinger doesn't have storage of its own and is considered "trusted", so it's ok.
inventory.observable(Interest.in(Resource.class).being(Action.created())).subscribe(
urlChangesCollector.getUrlCreatedAction());
inventory.observable(Interest.in(Resource.class).being(Action.deleted())).subscribe(
urlChangesCollector.getUrlDeletedAction());

// we use just an observable inventory here, because it allows us to see all the tenants. This essentially
// circumvents any authz present on the inventory.
// We need that though because pinger doesn't have storage of its own and is considered "trusted", so it's ok.
Set<Resource> urls = inventory.tenants().getAll().resourceTypes().getAll(With.id(PingDestination.URL_TYPE))
.resources().getAll().entities();
Log.LOG.infof("About to initialize Hawkular Pinger with %d URLs", urls.size());
Expand All @@ -157,8 +115,8 @@ public void startUp() {
@Schedule(minute = "*", hour = "*", second = "0,20,40", persistent = false)
public void scheduleWork() {

List<PingDestination> newUrls = newUrlsCollector.getNewUrls();
destinations.addAll(newUrls);
/* Apply URL additions and removals collected in between. */
urlChangesCollector.apply(this.destinations);

if (destinations.size() == 0) {
return;
Expand All @@ -171,8 +129,7 @@ public void scheduleWork() {
* Runs the pinging work on the provided list of destinations. The actual pings are scheduled to run in parallel in
* a thread pool. After ROUNDS*WAIT_MILLIS, remaining pings are cancelled and an error
*
* @param destinations
* Set of destinations to ping
* @param destinations Set of destinations to ping
*/
private void doThePing(Set<PingDestination> destinations) {
List<PingStatus> results = new ArrayList<>(destinations.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.component.pinger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.hawkular.inventory.api.Action;
import org.hawkular.inventory.api.Inventory;
import org.hawkular.inventory.api.model.Resource;

import rx.functions.Action1;

/**
* Collects URL additions and removals reported by {@link PingManager#inventory} and synchronizes the various threads
* reporting the new URLs and those ones consuming them.
*
* @author <a href="https://github.com/ppalaga">Peter Palaga</a>
*/
public class UrlChangesCollector {
/**
* An add or remove operation that can be performed on a {@link Set}.
*
* @author <a href="https://github.com/ppalaga">Peter Palaga</a>
*/
private static class UrlChange {

/** The nature of the change - {@link Action#created()} or {@link Action#deleted()} */
private final Action<?, ?> action;

/** The {@link PingDestination} that the change is related to */
private final PingDestination destination;

/**
* Creates a new {@link UrlChange}.
*
* @param action the nature of the change - {@link Action#created()} or {@link Action#deleted()}
* @param destination the {@link PingDestination} that the change is related to
*/
public UrlChange(Action<?, ?> action, PingDestination destination) {
super();
this.action = action;
this.destination = destination;
}

}

/**
* A callback for the {@link Inventory} that collects newly added URLs.
*/
private final Action1<Resource> urlCreatedAction = new Action1<Resource>() {
/**
* It is safe to call this method concurrently from any random thread.
*
* @see rx.functions.Action1#call(java.lang.Object)
*/
@Override
public void call(Resource r) {
if (PingDestination.isUrl(r)) {
synchronized (lock) {
changes.add(new UrlChange(Action.created(), PingDestination.from(r)));
}
}
}
};

/**
* A callback for the {@link Inventory} that collects URL removals.
*/
private final Action1<Resource> urlDeletedAction = new Action1<Resource>() {
/**
* It is safe to call this method concurrently from any random thread.
*
* @see rx.functions.Action1#call(java.lang.Object)
*/
@Override
public void call(Resource r) {
if (PingDestination.isUrl(r)) {
synchronized (lock) {
changes.add(new UrlChange(Action.deleted(), PingDestination.from(r)));
}
}
}
};

private final Object lock = new Object();
private List<UrlChangesCollector.UrlChange> changes = new ArrayList<>();

/**
* Applies the {@link UrlChange}s collected by this {@link UrlChangesCollector} to the given {@link Set} of
* {@link PingDestination}s - i.e. {@link PingDestination}s are either added or removed to/from the Set.
*
* @param destinations the {@link Set} of {@link PingDestination}s that the changes should be applied to.
*/
public void apply(Set<PingDestination> destinations) {
List<UrlChange> changesCopy = getChanges();

for (UrlChange change : changesCopy) {
if (Action.created().equals(change.action)) {
destinations.add(change.destination);
} else if (Action.deleted().equals(change.action)) {
destinations.remove(change.destination);
} else {
throw new IllegalStateException("Unexpected action '" + change.action
+ "'; expected Action.created() or Action.deleted()");
}
}
}

/**
* Returns the list of {@link UrlChange}s collected by this {@link UrlChangesCollector}. It is safe to call this
* method concurrently from any random thread.
*
* @return the list of {@link PingDestination}s
*/
private List<UrlChangesCollector.UrlChange> getChanges() {
synchronized (lock) {
if (this.changes.isEmpty()) {
return Collections.emptyList();
} else {
List<UrlChangesCollector.UrlChange> result = this.changes;
this.changes = new ArrayList<>();
return result;
}
}
}

/**
* Returns a callback for the {@link Inventory} that collects newly added URLs.
*/
public Action1<Resource> getUrlCreatedAction() {
return urlCreatedAction;
}

/**
* Returns a callback for the {@link Inventory} that collects URL removals.
*/
public Action1<Resource> getUrlDeletedAction() {
return urlDeletedAction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
*/
package org.hawkular.component.pinger;

import java.util.HashMap;
import java.util.Map;

import org.hawkular.component.pinger.PingDestination.ResourceField;
import org.hawkular.component.pinger.Traits.TraitHeader;
import org.hawkular.inventory.api.model.Resource;
import org.hawkular.inventory.api.model.ResourceType;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -38,31 +35,22 @@
*/
public class PingManagerTest {

private static final String TEST_RESOURCE_ID = "test-rsrc";
private static final String TEST_TENANT_ID = "test-tenat";
private static final String TEST_ENVIRONMENT_ID = "test-env";
private static final String TEST_URL = "http://hawkular.github.io";
private static final String GET_METHOD = "GET";

@Test
public void testScheduleWork() throws Exception {

PingManager manager = new PingManager();
manager.pinger = new Pinger();
Map<String, Object> props = new HashMap<>();
props.put(ResourceField.url.name(), TEST_URL);
props.put(ResourceField.method.name(), GET_METHOD);
ResourceType urlType = new ResourceType(TEST_TENANT_ID, PingDestination.URL_TYPE, "0");
Resource urlResource = new Resource(TEST_TENANT_ID, TEST_ENVIRONMENT_ID, null, TEST_RESOURCE_ID, urlType,
props);
manager.newUrlsCollector.call(urlResource);

Resource urlResource = PingerTestUtils.createTestResource();

manager.urlChangesCollector.getUrlCreatedAction().call(urlResource);

manager.metricPublisher = Mockito.mock(MetricPublisher.class);
manager.traitsPublisher = Mockito.mock(TraitsPublisher.class);

manager.scheduleWork();

PingDestination expectedDest = new PingDestination(TEST_TENANT_ID, TEST_ENVIRONMENT_ID, TEST_RESOURCE_ID,
TEST_URL, GET_METHOD);
PingDestination expectedDest = PingerTestUtils.createTestPingDestination();

Map<TraitHeader, String> expectedTraitsItems = new ImmutableMap.Builder<TraitHeader, String>().put(
TraitHeader.SERVER, "GitHub.com").build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.component.pinger;

import java.util.HashMap;
import java.util.Map;

import org.hawkular.component.pinger.PingDestination.ResourceField;
import org.hawkular.inventory.api.model.Resource;
import org.hawkular.inventory.api.model.ResourceType;

/**
* @author <a href="https://github.com/ppalaga">Peter Palaga</a>
*/
public class PingerTestUtils {

public static final String TEST_RESOURCE_ID = "test-rsrc";
public static final String TEST_TENANT_ID = "test-tenat";
public static final String TEST_ENVIRONMENT_ID = "test-env";
public static final String TEST_URL = "http://hawkular.github.io";
public static final String GET_METHOD = "GET";

public static Resource createTestResource() {
Map<String, Object> props = new HashMap<>();
props.put(ResourceField.url.name(), PingerTestUtils.TEST_URL);
props.put(ResourceField.method.name(), PingerTestUtils.GET_METHOD);
ResourceType urlType = new ResourceType(PingerTestUtils.TEST_TENANT_ID, PingDestination.URL_TYPE, "0");
Resource urlResource = new Resource(PingerTestUtils.TEST_TENANT_ID, PingerTestUtils.TEST_ENVIRONMENT_ID, null,
PingerTestUtils.TEST_RESOURCE_ID, urlType, props);
return urlResource;
}

/**
* @return
*/
public static PingDestination createTestPingDestination() {
return new PingDestination(PingerTestUtils.TEST_TENANT_ID, PingerTestUtils.TEST_ENVIRONMENT_ID,
PingerTestUtils.TEST_RESOURCE_ID, PingerTestUtils.TEST_URL, PingerTestUtils.GET_METHOD);
}
}

0 comments on commit cee313c

Please sign in to comment.