Skip to content

Commit

Permalink
Merge pull request #903 from burmanm/openshift_cache
Browse files Browse the repository at this point in the history
Add a method for fetching all the metric definitions (for Openshift)
  • Loading branch information
John Sanda committed Mar 23, 2018
2 parents 8d0f7c5 + 13c97da commit 5074288
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.hawkular.metrics.api.jaxrs.filter;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE;
Expand Down Expand Up @@ -72,7 +71,8 @@ public void filter(ContainerRequestContext requestContext) throws IOException {
UriInfo uriInfo = requestContext.getUriInfo();
String path = uriInfo.getPath();

if (path.startsWith("/tenants") || path.startsWith("/admin")) {
if (path.startsWith("/tenants") || path.startsWith("/admin") || path.startsWith("/openshift")) {
// if (path.startsWith("/tenants") || path.startsWith("/admin")) {
String tenant = requestContext.getHeaders().getFirst(TENANT_HEADER_NAME);
if (tenant == null || tenant.trim().isEmpty()) {
// Fail on missing tenant info
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.api.servlet;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.inject.Inject;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.hawkular.metrics.api.servlet.rx.ObservableServlet;
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.model.Metric;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

import rx.Observable;
import rx.Observer;
import rx.exceptions.Exceptions;
import rx.subjects.PublishSubject;

/**
* @author Michael Burman
*/
@WebServlet(urlPatterns = "/openshift/*", asyncSupported = true)
public class OpenshiftServlet extends HttpServlet {

private static final ObjectMapper objectMapper;
private static final ObjectWriter objectWriter;
private static final byte[] comma = ",".getBytes(Charset.forName("UTF-8"));
private static final String DESCRIPTOR_TAG = "descriptor_name";

static {
objectMapper = new ObjectMapper();
objectWriter = objectMapper.writer();
}

@Inject
private MetricsService metricsService;

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.setCharacterEncoding("UTF-8");
resp.setContentType("application/json");
AsyncContext asyncContext = getAsyncContext(req);

Observable<Metric<Object>> metricObservable = metricsService.scanAllMetricIndexes()
.filter(m -> m.getTags().containsKey(DESCRIPTOR_TAG))
.onBackpressureBuffer();

PublishSubject<byte[]> byteSubject = PublishSubject.create();

// Transform above on the fly to ByteBuffers and write them as soon as we have them
Observable<byte[]> buffers =
metricObservable.map(m -> {
try {
return objectWriter.writeValueAsBytes(m);
} catch (JsonProcessingException e) {
throw Exceptions.propagate(e);
}
}).onBackpressureBuffer();

buffers.subscribe(new Observer<byte[]>() {
AtomicBoolean first = new AtomicBoolean(true);

@Override public void onCompleted() {
if(!first.get()) {
byteSubject.onNext("]".getBytes(Charset.forName("UTF-8")));
}
byteSubject.onCompleted();
}

@Override public void onError(Throwable throwable) {

}

@Override public void onNext(byte[] bytes) {
if(first.compareAndSet(true, false)) {
byteSubject.onNext("[".getBytes(Charset.forName("UTF-8")));
byteSubject.onNext(bytes);
} else {
byteSubject.onNext(comma);
byteSubject.onNext(bytes);
}
}
});

ObservableServlet.write(byteSubject, resp.getOutputStream())
.subscribe(v -> {},
t -> {
t.printStackTrace();
// invoked..
asyncContext.complete();
},
asyncContext::complete);
}

private AsyncContext getAsyncContext(HttpServletRequest req) {
if(req.isAsyncStarted()) {
return req.getAsyncContext();
} else {
return req.startAsync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.api.servlet.rx;

import java.io.IOException;

import javax.servlet.ServletOutputStream;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;

/**
* An {@link Observable} interface to Servlet API
*
* @author Jitendra Kotamraju
* @author Michael Burman
*/
public class ObservableServlet {

/**
* Observes {@link ServletOutputStream}.
*
* <p>
* This method uses Servlet 3.1 non-blocking API callback mechanisms. When the
* container notifies that HTTP response can be written, the subscribed
* {@code Observer}'s {@link Observer#onNext onNext} method is invoked.
*
* <p>
* Before calling this method, a web application must put the corresponding HTTP
* request into asynchronous mode.
*
* @param out servlet output stream
* @return Observable of HTTP response write ready events
*/
public static Observable<Void> create(final ServletOutputStream out) {
return Observable.unsafeCreate(subscriber -> {
final ServletWriteListener listener = new ServletWriteListener(subscriber, out);
out.setWriteListener(listener);
});
}

/**
* Writes the given Observable data to ServletOutputStream.
*
* <p>
* This method uses Servlet 3.1 non-blocking API callback mechanisms. When the HTTP
* request data becomes available to be read, the subscribed {@code Observer}'s
* {@link Observer#onNext onNext} method is invoked. Similarly, when all data for the
* HTTP request has been read, the subscribed {@code Observer}'s
* {@link Observer#onCompleted onCompleted} method is invoked.
*
* <p>
* Before calling this method, a web application must put the corresponding HTTP request
* into asynchronous mode.
*
* @param data
* @param out servlet output stream
* @return
*/
public static Observable<Void> write(final Observable<byte[]> data, final ServletOutputStream out) {
return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
Observable<Void> events = create(out).onBackpressureDrop();
Observable<Void> writeobs = Observable.zip(data, events, (b, aVoid) -> {
try {
out.write(b);
} catch (IOException ioe) {
ioe.printStackTrace();
}
return null;
});
writeobs.subscribe(subscriber);
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.api.servlet.rx;

import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;

import rx.Subscriber;

/**
* A servlet {@link WriteListener} that pushes Observable events that indicate
* when HTTP response data can be written
*
* @author Jitendra Kotamraju
* @author Michael Burman
*/
class ServletWriteListener implements WriteListener {

private final Subscriber<? super Void> subscriber;
private final ServletOutputStream out;

ServletWriteListener(Subscriber<? super Void> subscriber, final ServletOutputStream out) {
this.subscriber = subscriber;
this.out = out;
}

@Override
public void onWritePossible() {
while(out.isReady() && !subscriber.isUnsubscribed()) {
subscriber.onNext(null);
// loop until isReady() false, otherwise container will not call onWritePossible()
}
// If isReady() false, container will call onWritePossible()
// when data can be written.
}

@Override
public void onError(Throwable t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}

}
16 changes: 16 additions & 0 deletions api/metrics-api-jaxrs/src/main/webapp/WEB-INF/web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,24 @@
</init-param>
</servlet>

<servlet>
<servlet-name>openshiftServlet</servlet-name>
<servlet-class>org.hawkular.metrics.api.servlet.OpenshiftServlet</servlet-class>
<async-supported>true</async-supported>
</servlet>

<servlet-mapping>
<servlet-name>staticContent</servlet-name>
<url-pattern>/static/*</url-pattern>
</servlet-mapping>

<servlet-mapping>
<servlet-name>openshiftServlet</servlet-name>
<url-pattern>/openshift</url-pattern>
</servlet-mapping>

<servlet-mapping>
<servlet-name>openshiftServlet</servlet-name>
<url-pattern>/openshift/*</url-pattern>
</servlet-mapping>
</web-app>
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface DataAccess {

<T> Observable<Row> findMetricInData(MetricId<T> id);

<T> Observable<Observable<Row>> scanMetricsInMetricsIndex();

<T> Observable<Row> findMetricInMetricsIndex(MetricId<T> id);

<T> Observable<Row> getMetricTags(MetricId<T> id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public StatementType getType() {

private PreparedStatement findMetricInDataCompressed;

private PreparedStatement scanMetricInMetricsIndex;

private PreparedStatement findAllMetricsInData;

private PreparedStatement findAllMetricsInDataCompressed;
Expand Down Expand Up @@ -523,6 +525,13 @@ protected void initPreparedStatements() {
"FROM metrics_idx " +
"WHERE tenant_id = ? AND type = ? AND metric = ?");

scanMetricInMetricsIndex = session.prepare(
"SELECT tenant_id, type, metric, tags, token(tenant_id, type) " +
"FROM metrics_idx " +
"WHERE token(tenant_id, type) > ? AND token(tenant_id, type) <=" +
" ?"
);

getMetricTags = session.prepare(
"SELECT tags " +
"FROM metrics_idx " +
Expand Down Expand Up @@ -754,6 +763,17 @@ public <T> Observable<Row> findMetricInData(MetricId<T> id) {
.take(1);
}

@Override
public <T> Observable<Observable<Row>> scanMetricsInMetricsIndex() {
// TODO MetricsServiceImpl can do tags filtering
return Observable.from(getTokenRanges())
.map(tr -> rxSession.executeAndFetch(
scanMetricInMetricsIndex.bind()
.setToken(0, tr.getStart())
.setToken(1, tr.getEnd())));

}

@Override
public <T> Observable<Row> findMetricInMetricsIndex(MetricId<T> id) {
return rxSession.executeAndFetch(findMetricInMetricsIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public interface MetricsService {
*/
<T> Observable<Metric<T>> findMetrics(String tenantId, MetricType<T> type);

<T> Observable<Metric<T>> scanAllMetricIndexes();

/**
* Find tenant's metrics with filtering abilities. The filtering can take place at the type level or at the
* tag level.
Expand Down
Loading

0 comments on commit 5074288

Please sign in to comment.