Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
fe779dd
Add mantis publish properties to docs
calvin681 Aug 24, 2020
ab35293
Merge remote-tracking branch 'upstream/master'
calvin681 Aug 25, 2020
75f750d
Merge remote-tracking branch 'upstream/master'
calvin681 Oct 2, 2020
24ac51b
Merge branch 'master' of github.com:Netflix/mantis
calvin681 Oct 10, 2020
7811869
Update MQL docs for using [*]
calvin681 Oct 10, 2020
054f0b9
Merge remote-tracking branch 'upstream/master'
calvin681 Oct 26, 2020
ce18bf0
Add myself as code owners
calvin681 Oct 26, 2020
a35f197
Merge remote-tracking branch 'upstream/master'
calvin681 Oct 27, 2020
789ab06
Merge remote-tracking branch 'upstream/master'
calvin681 Nov 20, 2020
d4d7d4b
Rule base auto scale should not scale down lower than min
calvin681 Dec 3, 2020
ed3c558
add comment
calvin681 Dec 3, 2020
48cf69f
Subscribe to source job drop metrics in job master
calvin681 Dec 8, 2020
3003e93
Merge remote-tracking branch 'upstream/master'
calvin681 Dec 8, 2020
e489031
add copy right, fix tests
calvin681 Dec 8, 2020
2ba5fed
Address review comments. Add parameters to system params.
calvin681 Dec 11, 2020
552c94d
Handle empty string for drop metric param
calvin681 Dec 15, 2020
07741f3
merge upstream
calvin681 Dec 15, 2020
691d291
Merge remote-tracking branch 'upstream/master'
calvin681 Dec 16, 2020
5fc5236
Fix bug in determining source job metric vs job metric
calvin681 Dec 18, 2020
a5b7eae
Merge remote-tracking branch 'upstream/master'
calvin681 Dec 23, 2020
ee3398a
Add ClutchRps auto scaler, outlier detection using sourcejob drop
calvin681 Dec 31, 2020
0074bab
clean up
calvin681 Dec 31, 2020
8d918c7
add copyright
calvin681 Dec 31, 2020
c5bf421
fix tests
calvin681 Dec 31, 2020
fdb8505
make setpoint percentile configurable
calvin681 Dec 31, 2020
234874a
add more test on config generation
calvin681 Jan 1, 2021
0b42dea
add test message
calvin681 Jan 4, 2021
118eeff
Merge remote-tracking branch 'upstream/master'
calvin681 Jan 5, 2021
17ec535
Rope uses absolute values instead of percentage
calvin681 Jan 5, 2021
c6fdaf6
Merge remote-tracking branch 'upstream/master'
calvin681 Jan 12, 2021
2b3db28
Add auto scale strategies to the docs
calvin681 Jan 15, 2021
b95050a
address comments
calvin681 Jan 15, 2021
ff39b50
Add snippet about clutch experimental
calvin681 Jan 19, 2021
b2043db
Change scale of clutch rps config params to be 100.0
calvin681 Jan 20, 2021
d7c449b
Merge remote-tracking branch 'upstream/master'
calvin681 Jan 20, 2021
59584f0
Merge remote-tracking branch 'upstream/master'
calvin681 Jan 30, 2021
cd430e6
Merge remote-tracking branch 'upstream/master'
calvin681 Mar 15, 2021
23ac31b
Merge remote-tracking branch 'upstream/master'
calvin681 Mar 26, 2021
e9776ba
Log mantisEventsDropped metric when there is an exception during proc…
calvin681 Mar 26, 2021
c61fdec
Default to creating a new Map
calvin681 Mar 26, 2021
0d667a7
back to warn
calvin681 Mar 26, 2021
3177bc4
add comment to explain deprecation
calvin681 Mar 29, 2021
3213c46
Merge remote-tracking branch 'upstream/master'
calvin681 May 3, 2021
21117a9
Feeds backpressure back to drop operator on SSE connection
calvin681 May 3, 2021
18b2b4b
Add test for serial consumption
calvin681 May 10, 2021
44ea933
clean up test
calvin681 May 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ public void onError(Throwable e) {
public void onNext(T t) {

if (requested.get() > 0) {
requested.decrementAndGet();
o.onNext(t);
next.increment();
requested.decrementAndGet();

} else {

dropped.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void resetConnected() {
}
}

private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response,
protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response,
final Action1<Boolean> updateDataRecvngStatus,
final long dataRecvTimeoutSecs, String delimiter) {
long interval = Math.max(1, dataRecvTimeoutSecs / 2);
Expand Down Expand Up @@ -297,7 +297,7 @@ private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Serve
return Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString()));
}
return Observable.just(t1.contentAsString());
})
}, 1)
.filter(data -> {
if (data.startsWith("ping")) {
pingCounter.increment();
Expand All @@ -308,7 +308,7 @@ private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Serve
.flatMapIterable((data) -> {
boolean useSnappy = true;
return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter);
})
}, 1)
.takeUntil(shutdownSubject)
.takeWhile((event) -> !isShutdown);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.mantisrx.server.worker.client;

import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.common.metrics.spectator.MetricId;
import io.netty.buffer.Unpooled;
import io.reactivx.mantis.operators.DropOperator;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SseWorkerConnectionTest {
private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnectionTest.class);

@Test
public void testStreamContentDrops() throws Exception {
String metricGroupString = "testmetric";
MetricGroupId metricGroupId = new MetricGroupId(metricGroupString);
SseWorkerConnection workerConnection = new SseWorkerConnection("connection_type",
"hostname",
80,
b -> {},
b -> {},
t -> {},
600,
false,
new CopyOnWriteArraySet<>(),
1,
null,
true,
metricGroupId);
HttpClientResponse<ServerSentEvent> response = mock(HttpClientResponse.class);
Metrics metrics = mock(Metrics.class);
Counter onNextCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onNext.toString()));
Counter onErrorCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onError.toString()));
Counter onCompleteCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.onComplete.toString()));
Counter droppedCounter = new CounterImpl(new MetricId(metricGroupString, DropOperator.Counters.dropped.toString()));
when(metrics.getMetricGroupId()).thenReturn(metricGroupId);
when(metrics.getCounter(DropOperator.Counters.onNext.toString())).thenReturn(onNextCounter);
when(metrics.getCounter(DropOperator.Counters.onError.toString())).thenReturn(onErrorCounter);
when(metrics.getCounter(DropOperator.Counters.onComplete.toString())).thenReturn(onCompleteCounter);
when(metrics.getCounter(DropOperator.Counters.dropped.toString())).thenReturn(droppedCounter);
MetricsRegistry.getInstance().registerAndGet(metrics);
TestScheduler testScheduler = Schedulers.test();

// Events are just "0", "1", "2", ...
Observable<ServerSentEvent> contentObs = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.map(t -> new ServerSentEvent(Unpooled.copiedBuffer(Long.toString(t), Charset.defaultCharset())));

when(response.getContent()).thenReturn(contentObs);

TestSubscriber<MantisServerSentEvent> subscriber = new TestSubscriber<>(1);

workerConnection.streamContent(response, b -> {}, 600, "delimiter").subscribeOn(testScheduler).subscribe(subscriber);

testScheduler.advanceTimeBy(100, TimeUnit.SECONDS);
subscriber.assertValueCount(1);
List<MantisServerSentEvent> events = subscriber.getOnNextEvents();
assertEquals("0", events.get(0).getEventAsString());
logger.info("next: {}", onNextCounter.value());
logger.info("drop: {}", droppedCounter.value());
assertTrue(onNextCounter.value() < 10);
assertTrue(droppedCounter.value() > 90);
}

public static class CounterImpl implements Counter {
private final AtomicLong count = new AtomicLong();
private final MetricId id;

public CounterImpl(MetricId id) {
this.id = id;
}

@Override
public void increment() {
count.incrementAndGet();
}

@Override
public void increment(long x) {
count.addAndGet(x);
}

@Override
public long value() {
return count.get();
}

@Override
public long rateValue() {
return -1;
}

@Override
public long rateTimeInMilliseconds() {
return -1;
}

@Override
public String event() {
return null;
}

@Override
public MetricId id() {
return id;
}
}
}