Skip to content

Commit

Permalink
Demonstrate the servlet 3.1 async I/O.
Browse files Browse the repository at this point in the history
For GET (demonstrates only async writing), try:

curl -XGET -v http://localhost:8080/rx/hello

For POST (demonstrate only async reading), try:

curl -XPOST -d@file.txt -v http://localhost:8080/rx/hello

For POST (demonstrate reading and writing), try:

curl -XPOST -d@file.txt -v http://localhost:8080/rx/return
  • Loading branch information
Michael Burman committed Aug 16, 2015
1 parent 121f04d commit b1ada07
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.hawkular.metrics.api.servlet;

import org.hawkular.metrics.api.servlet.rx.ObservableServlet;
import rx.Observable;
import rx.functions.Func2;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharsetEncoder;
import java.util.Arrays;

/**
* Created by miburman on 8/13/15.
*/
@WebServlet(asyncSupported = true, urlPatterns = "/rx/*")
public class RxServlet extends HttpServlet {

// Emulate asynchronous fetching from a datasource and return using async I/O
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// Emulate a data fetching from somewhere.. this isn't fetched until subscribe is called anyway
Observable<ByteBuffer> fakeDataSource = Observable.just("Return string")
.map(s -> ByteBuffer.wrap(s.getBytes()));

// Start async part
final AsyncContext asyncContext = req.startAsync();

// Write it to the outputStream (ignore errors for now)
ObservableServlet.write(fakeDataSource, resp.getOutputStream())
.subscribe(v -> {},
t -> {},
() -> {
resp.setStatus(HttpServletResponse.SC_OK);
asyncContext.complete();
});
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// Fetch the input buffer
// Lets gather up all the ByteBuffers for simpler prototype and transform to String..
Observable<ByteBuffer> byteBufferObservable = ObservableServlet.create(req.getInputStream())
.reduce((byteBuffer, byteBuffer2) -> {
ByteBuffer backingBuffer;

if (byteBuffer2.remaining() > (byteBuffer.capacity() - byteBuffer.limit())) {
backingBuffer = ByteBuffer.allocate(byteBuffer.capacity() + byteBuffer2.remaining() * 2); // It's virtual memory anyway..
backingBuffer.put(byteBuffer).put(byteBuffer2);
} else {
backingBuffer = byteBuffer.put(byteBuffer2);
}

return backingBuffer;
});

// Start the async, nothing is coming until we subscribe to byteBufferObservable
final AsyncContext asyncContext = req.startAsync();

if(req.getRequestURI().contains("return")) {

// Our business logic
Observable<ByteBuffer> fakeDataSource = byteBufferObservable
.map(bb -> {
StringBuilder sb = new StringBuilder();
sb.append("=== START OF THE PAYLOAD ===");
sb.append("InputFileSize: " + bb.remaining());
sb.append("=== END OF THE PAYLOAD ===");
return sb.toString();
})
.map(s -> ByteBuffer.wrap(s.getBytes()));

// Write to listener
ObservableServlet.write(fakeDataSource, resp.getOutputStream())
.subscribe(v -> {},
t -> {},
() -> {
resp.setStatus(HttpServletResponse.SC_OK);
asyncContext.complete();
});
} else {
byteBufferObservable
.subscribe(v -> {},
t -> {},
() -> {
resp.setStatus(HttpServletResponse.SC_CREATED);
asyncContext.complete();
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.hawkular.metrics.api.servlet.rx;

/**
* Copyright 2013-2014 Jitendra Kotamraju.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;

import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.logging.Logger;

/**
* An {@link Observable} interface to Servlet API
*
* @author Jitendra Kotamraju
*/
public class ObservableServlet {

/**
* Observes {@link ServletInputStream}.
*
* <p>
* This method uses Servlet 3.1 non-blocking API callback mechanisms. When the HTTP
* request data becomes available to be read, the subscribed {@code Observer}'s
* {@link Observer#onNext onNext} method is invoked. Similarly, when all data for the
* HTTP request has been read, the subscribed {@code Observer}'s
* {@link Observer#onCompleted onCompleted} method is invoked.
*
* <p>
* Before calling this method, a web application must put the corresponding HTTP request
* into asynchronous mode.
*
* @param in servlet input stream
* @return Observable of HTTP request data
*/
public static Observable<ByteBuffer> create(final ServletInputStream in) {
return Observable.create(new OnSubscribe<ByteBuffer>() {
@Override
public void call(Subscriber<? super ByteBuffer> subscriber) {
final ServletReadListener listener = new ServletReadListener(in, subscriber);
in.setReadListener(listener);
}
});
}

/**
* Observes {@link ServletOutputStream}.
*
* <p>
* This method uses Servlet 3.1 non-blocking API callback mechanisms. When the
* container notifies that HTTP response can be written, the subscribed
* {@code Observer}'s {@link Observer#onNext onNext} method is invoked.
*
* <p>
* Before calling this method, a web application must put the corresponding HTTP
* request into asynchronous mode.
*
* @param out servlet output stream
* @return Observable of HTTP response write ready events
*/
public static Observable<Void> create(final ServletOutputStream out) {

return Observable.create(new OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
final ServletWriteListener listener = new ServletWriteListener(subscriber, out);
out.setWriteListener(listener);
}
});
}

/**
* Writes the given Observable data to ServletOutputStream.
*
* <p>
* This method uses Servlet 3.1 non-blocking API callback mechanisms. When the HTTP
* request data becomes available to be read, the subscribed {@code Observer}'s
* {@link Observer#onNext onNext} method is invoked. Similarly, when all data for the
* HTTP request has been read, the subscribed {@code Observer}'s
* {@link Observer#onCompleted onCompleted} method is invoked.
*
* <p>
* Before calling this method, a web application must put the corresponding HTTP request
* into asynchronous mode.
*
* @param data
* @param out servlet output stream
* @return
*/
public static Observable<Void> write(final Observable<ByteBuffer> data, final ServletOutputStream out) {
return Observable.create(new OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
Observable<Void> events = create(out);
Observable<Void> writeobs = Observable.zip(data, events, (byteBuffer, aVoid) -> {
try {
byte[] b = new byte[byteBuffer.remaining()];
byteBuffer.get(b);
out.write(b);
} catch (IOException ioe) {
ioe.printStackTrace();
}
return null;
});
writeobs.subscribe(subscriber);
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.hawkular.metrics.api.servlet.rx;

/**
* Copyright 2013-2014 Jitendra Kotamraju.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rx.Observer;
import rx.Subscriber;

import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A servlet {@link ReadListener} that pushes HTTP request data to an {@link Observer}
*
* @author Jitendra Kotamraju
* @author miburman
*/
class ServletReadListener implements ReadListener {
private final Subscriber<? super ByteBuffer> subscriber;
private final ServletInputStream in;

ServletReadListener(ServletInputStream in, Subscriber<? super ByteBuffer> subscriber) {
this.in = in;
this.subscriber = subscriber;
}

@Override
public void onDataAvailable() throws IOException {
// Original RxServlet has a bug here, in.isFinished() needs to be called, as
// in.isReady() returns true on EOF
while(in.isReady() && !in.isFinished() && !subscriber.isUnsubscribed()) {
byte[] buf = new byte[4096];
int len = in.read(buf);
if (len != -1) {
subscriber.onNext(ByteBuffer.wrap(buf, 0, len));
}
}
}

@Override
public void onAllDataRead() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}

@Override
public void onError(Throwable t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.hawkular.metrics.api.servlet.rx;

/**
* Copyright 2014 Jitendra Kotamraju.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rx.Subscriber;

import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;

/**
* A servlet {@link WriteListener} that pushes Observable events that indicate
* when HTTP response data can be written
*
* @author Jitendra Kotamraju
*/
class ServletWriteListener implements WriteListener {

private final Subscriber<? super Void> subscriber;
private final ServletOutputStream out;

ServletWriteListener(Subscriber<? super Void> subscriber, final ServletOutputStream out) {
this.subscriber = subscriber;
this.out = out;
}

@Override
public void onWritePossible() {
do {
subscriber.onNext(null);
// loop until isReady() false, otherwise container will not call onWritePossible()
} while (!subscriber.isUnsubscribed() && out.isReady());
// If isReady() false, container will call onWritePossible()
// when data can be written.
}

@Override
public void onError(Throwable t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}

}

1 comment on commit b1ada07

@jsanda
Copy link
Contributor

@jsanda jsanda commented on b1ada07 Aug 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together. RxServlet is exactly what I was looking for. I hope we can take advantage of the fact that we know we are only dealing with JSON (at least for now) and can encapsulate the ByteBuffer logic in doGet and in doPost so that the servlet for reading/writing data points for example only has to worry about mapping JSON to our REST model classes and vice versa.

Please sign in to comment.