Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple admin server from plugins startup, add proxy status endpoint #181

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,8 +26,8 @@
import com.hotels.styx.api.client.ActiveOrigins;
import com.hotels.styx.api.client.ConnectionPool;
import com.hotels.styx.api.client.Origin;
import com.hotels.styx.api.client.OriginsSnapshot;
import com.hotels.styx.api.client.OriginsChangeListener;
import com.hotels.styx.api.client.OriginsSnapshot;
import com.hotels.styx.api.client.RemoteHost;
import com.hotels.styx.api.metrics.MetricRegistry;
import com.hotels.styx.api.metrics.codahale.CodaHaleMetricRegistry;
Expand Down Expand Up @@ -248,26 +248,26 @@ private void handleSetOriginsEvent(SetOriginsEvent event) {
concat(this.origins.keySet().stream(), newOriginsMap.keySet().stream())
.collect(toSet())
.forEach(originId -> {
Origin origin = newOriginsMap.get(originId);
Origin origin = newOriginsMap.get(originId);

if (isNewOrigin(originId, origin)) {
MonitoredOrigin monitoredOrigin = addMonitoredEndpoint(origin);
originChanges.addOrReplaceOrigin(originId, monitoredOrigin);
if (isNewOrigin(originId, origin)) {
MonitoredOrigin monitoredOrigin = addMonitoredEndpoint(origin);
originChanges.addOrReplaceOrigin(originId, monitoredOrigin);

} else if (isUpdatedOrigin(originId, origin)) {
MonitoredOrigin monitoredOrigin = changeMonitoredEndpoint(origin);
originChanges.addOrReplaceOrigin(originId, monitoredOrigin);
} else if (isUpdatedOrigin(originId, origin)) {
MonitoredOrigin monitoredOrigin = changeMonitoredEndpoint(origin);
originChanges.addOrReplaceOrigin(originId, monitoredOrigin);

} else if (isUnchangedOrigin(originId, origin)) {
LOG.info("Existing origin has been left unchanged. Origin={}:{}", appId, origin);
originChanges.keepExistingOrigin(originId, this.origins.get(originId));
} else if (isUnchangedOrigin(originId, origin)) {
LOG.info("Existing origin has been left unchanged. Origin={}:{}", appId, origin);
originChanges.keepExistingOrigin(originId, this.origins.get(originId));

} else if (isRemovedOrigin(originId, origin)) {
removeMonitoredEndpoint(originId);
originChanges.noteRemovedOrigin();
}
}
);
} else if (isRemovedOrigin(originId, origin)) {
removeMonitoredEndpoint(originId);
originChanges.noteRemovedOrigin();
}
}
);

this.origins = originChanges.updatedOrigins();

Expand Down Expand Up @@ -476,10 +476,12 @@ private OriginState state() {
}
}

@VisibleForTesting
public static Builder newOriginsInventoryBuilder(Id appId) {
return new Builder(appId);
}

@VisibleForTesting
public static Builder newOriginsInventoryBuilder(BackendService backendService) {
return new Builder(backendService.id())
.connectionPoolFactory(simplePoolFactory(backendService, new CodaHaleMetricRegistry()))
Expand Down
98 changes: 98 additions & 0 deletions components/common/src/main/java/com/hotels/styx/common/Result.java
@@ -0,0 +1,98 @@
/*
Copyright (C) 2013-2018 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.common;

import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
* Represents the outcome of attempting to do something.
* On a success a value is stored (defaults to "success" if not specified).
* On a failure an exception is stored (defaults to a new Exception if not specified).
*
* @param <T> type of success value
*/
public class Result<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked the code that uses this class or thought about it considering we are doing Java, but I believe we wanted to implement this as an Either/Try monad? So I would expect : map, flatMap... Scala's try monad is nicer (as in simpler) https://mauricio.github.io/2014/02/17/scala-either-try-and-the-m-word.html , but it seems there are a few implementations in Java such as https://github.com/jasongoodwin/better-java-monads/blob/master/src/main/java/com/jasongoodwin/monads/Try.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this class was simply to add a representation of success or failure that can be stored as a value.

I can see a similarity with the Try.java linked, but I'm not sure it is worth adding a new dependency just for this class.

In terms of API, a difference I see is that the Result class can be instantiated directly with a call to success or failure without needing to wrap a block of code, so it is in that sense more decoupled from the try-catch structure and can be used in situations where try-catch does not appear (e.g. using result values in unit tests).

private final boolean success;
private final T valueOnSuccess;
private final Exception causeOnFailure;

private Result(boolean success, T valueOnSuccess, Exception causeOnFailure) {
this.success = success;
this.valueOnSuccess = valueOnSuccess;
this.causeOnFailure = causeOnFailure;
}

public static <T> Result<T> success(T value) {
return new Result<>(true, requireNonNull(value), null);
}

public static Result<String> success() {
return success("success");
}

public static <T> Result<T> failure(Exception e) {
return new Result<>(false, null, requireNonNull(e));
}

public static <T> Result<T> failure() {
return failure(new Exception("Failure"));
}

public Optional<T> successValue() {
return Optional.ofNullable(valueOnSuccess);
}

public <R> Result<R> mapSuccess(Function<T, R> mapper) {
return success
? success(mapper.apply(valueOnSuccess))
: (Result<R>) this;
}

public Result<T> defaultOnFailure(Function<Exception, T> mapper) {
return success
? this
: success(mapper.apply(causeOnFailure));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Result<?> result = (Result<?>) o;
return success == result.success
&& Objects.equals(valueOnSuccess, result.valueOnSuccess)
&& Objects.equals(causeOnFailure, result.causeOnFailure);
}

@Override
public int hashCode() {
return Objects.hash(success, valueOnSuccess, causeOnFailure);
}

@Override
public String toString() {
return format("%s[%s]", success ? "SUCCESS" : "FAILURE", success ? valueOnSuccess : causeOnFailure);
}
}
@@ -0,0 +1,94 @@
/*
Copyright (C) 2013-2018 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.configstore;

import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static java.util.Objects.requireNonNull;
import static rx.schedulers.Schedulers.computation;

/**
* Stores data about the current state of the system.
* Added to allow Styx to operate in a more dynamic, adaptive way.
*/
public class ConfigStore {
private final Observer<Update> updates;
private final ConcurrentMap<String, Object> values;
private final Observable<Update> propagation;

public ConfigStore() {
this.values = new ConcurrentHashMap<>();

PublishSubject<Update> updates = PublishSubject.create();
PublishSubject<Update> propagation = PublishSubject.create();

updates.subscribe(update -> {
this.values.put(update.key, update.value);
propagation.onNext(update);
});

// Use more restricted interfaces for fields
this.updates = updates;
this.propagation = propagation;
}

public <T> Optional<T> get(String key) {
return Optional.ofNullable((T) values.get(key));
}

public <T> Optional<T> get(String key, Class<T> type) {
return Optional.ofNullable(values.get(key)).map(type::cast);
}

public void set(String key, Object value) {
this.updates.onNext(new Update(key, value));
}

public <T> Observable<T> watch(String key) {
return propagation.filter(update -> update.key().equals(key))
.map(Update::value)
.map(value -> (T) value)
.observeOn(computation());
}

public <T> Observable<T> watch(String key, Class<T> type) {
return watch(key).cast(type);
}

private static class Update {
private final String key;
private final Object value;

Update(String key, Object value) {
this.key = requireNonNull(key);
this.value = requireNonNull(value);
}

String key() {
return key;
}

Object value() {
return value;
}
}
}
@@ -0,0 +1,122 @@
/*
Copyright (C) 2013-2018 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.configstore;

import com.hotels.styx.support.Latch;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import rx.Observable;

import java.util.concurrent.atomic.AtomicReference;

import static com.hotels.styx.support.matchers.IsOptional.isValue;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class ConfigStoreTest {
private ConfigStore configStore;

@BeforeMethod
public void setUp() {
configStore = new ConfigStore();
}

@Test
public void getsWhatWasSet() {
configStore.set("foo", "bar");
assertThat(configStore.get("foo", String.class), isValue("bar"));
}

@Test
public void listenersReceiveUpdatesWhenValuesChange() {
Latch sync = new Latch(1);
AtomicReference<String> update = new AtomicReference<>();

configStore.watch("foo", String.class)
.subscribe(value -> {
update.set(value);
sync.countDown();
});

configStore.set("foo", "bar");
sync.await(1, SECONDS);
assertThat(update.get(), is("bar"));
}

@Test
public void valueInConfigStoreIsConsistentWithListenersUpdate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can be folded into the previous one (listenersReceiveUpdatesWhenValuesChange) by adding one line at the end:

assertThat(configStore.get("foo"), is(of("bar")));

That is:

     @Test
    public void valueInConfigStoreIsConsistentWithListenersUpdate() {
        AtomicReference<String> update = new AtomicReference<>();

        Latch sync = new Latch(1);

        configStore.watch("foo", String.class)
                .subscribe(value -> {
                    update.set(value);
                    sync.countDown();
                });

        configStore.set("foo", "bar");
        sync.await(1, SECONDS);
        assertThat(update.get(), is("bar"));
        assertThat(configStore.get("foo"), is(of("bar")));
    }

AtomicReference<String> valueInStore = new AtomicReference<>();
AtomicReference<String> valueInUpdate = new AtomicReference<>();

Latch sync = new Latch(1);

configStore.watch("foo", String.class)
.subscribe(value -> {
valueInStore.set(configStore.get("foo", String.class).orElse(null));
valueInUpdate.set(value);
sync.countDown();
});

configStore.set("foo", "bar");
sync.await(1, SECONDS);
assertThat(valueInStore.get(), is(valueInUpdate.get()));
}

// If this test fails it will cause a deadlock, resulting in a latch timeout
@Test
public void listensOnSeparateThread() {
Latch unlockedByTestThread = new Latch(1);
Latch unlockedBySubscribeThread = new Latch(1);

configStore.watch("foo", String.class)
.subscribe(value -> {
unlockedByTestThread.await(1, SECONDS);
unlockedBySubscribeThread.countDown();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When registering a watch, a possibly existing value should be propagated straight away. This prevents unnecessary race conditions where a consumer is late to observe the event.

Please confirm if the watch behaves like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not currently the case, I will fix it.


configStore.set("foo", "bar");
unlockedByTestThread.countDown();
unlockedBySubscribeThread.await(2, SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify the purpose of this test please? Is it to show that the watch subscriptions run from a different thread from the config setter thread?

If so, does every watch subscriber run in their own thread? How far will it scale? Say I have say 10 watch subscribes on "foo", then on setting "foo" do they run simultaneously on 10 different threads? Or are they queued to a thread pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it to show that the watch subscriptions run from a different thread from the config setter thread?

Yes.

If so, does every watch subscriber run in their own thread? How far will it scale? Say I have say 10 watch subscribes on "foo", then on setting "foo" do they run simultaneously on 10 different threads? Or are they queued to a thread pool?

That code is already part of this PR. You can look at the watch method in the ConfigStore class. Please review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this was just to probe your intentions regarding config store threading model. I think it is ok for now. Perhaps in future we may want to allow consumers to provide a custom executor.

}

// If this test fails it will cause a deadlock, resulting in a latch timeout
@Test
public void multipleListenersCanSubscribeSimultaneously() {
Latch unlockedByListener1 = new Latch(1);
Latch unlockedByListener2 = new Latch(1);
Latch unlockedWhenBothFinish = new Latch(2);

Observable<String> watch = configStore.watch("foo", String.class);

// Listener 1
watch.subscribe(value -> {
unlockedByListener1.countDown();
unlockedByListener2.await(1, SECONDS);
unlockedWhenBothFinish.countDown();
});

// Listener 2
watch.subscribe(value -> {
unlockedByListener1.await(1, SECONDS);
unlockedByListener2.countDown();
unlockedWhenBothFinish.countDown();
});

configStore.set("foo", "bar");
unlockedWhenBothFinish.await(5, SECONDS);
}
}