Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ServerSentEvent to store data as ByteBuf #267

Closed
NiteshKant opened this issue Nov 3, 2014 · 1 comment
Closed

Change ServerSentEvent to store data as ByteBuf #267

NiteshKant opened this issue Nov 3, 2014 · 1 comment
Assignees
Milestone

Comments

@NiteshKant
Copy link
Member

ServerSentEvent object stores data as a String. This means that we get the event data in heap memory as opposed to using netty's pooled ByteBuf. Moving this data into ByteBuf will optimize the memory footprint of an application consuming large amount of events.

Change in behavior

This means that the memory management of a ServerSentEvent will be a user responsibility. Specially after issue #264 is fixed.

A subtle side-effect is in case of proxies when data received from an HttpClient is written to an HttpServer response. Since, RxNetty currently auto-releases the ByteBuf (ServerSentEvent in this case), one has to make sure that the ByteBuf is retained once after it is read from the client.

@NiteshKant
Copy link
Member Author

PR #266 fixes this issue as a new implementation. Existing implementation is preserved and is deprecated (Issue #209)

Comparison of object allocation between the new and old implementation is:

Old Implementation

image

New Implementation

image

The above benchmark was done for a ServerSentEvent data size of 10KB. The code for the test is below:

Server

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.functions.Func1;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public final class TestSSEServerStart {

    private static final ByteBuf data;
    static {
        final byte[] dataBytes = new byte[10 * 1024];
        Arrays.fill(dataBytes, (byte) 'c');
        data = Unpooled.buffer().writeBytes(dataBytes).retain();
    }

    public static final byte[] DATA_PREFIX = "data: ".getBytes();
    public static final byte[] EOL = "\n\n".getBytes();

    public static void main(String[] args) {
        RxNetty.createHttpServer(8091, new RequestHandler<ByteBuf, ByteBuf>() {
            @Override
            public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                           final HttpServerResponse<ByteBuf> response) {
                return Observable.interval(1, TimeUnit.SECONDS)
                                 .flatMap(new Func1<Long, Observable<Void>>() {
                                     @Override
                                     public Observable<Void> call(Long interval) {
                                         for (int i = 0; i < 5000; i++) {
                                             response.writeBytes(DATA_PREFIX);
                                             response.writeBytes(data.retain());
                                             response.writeBytes(EOL);
                                         }
                                         return response.flush();
                                     }
                                 });
            }
        }).startAndWait();
    }
}

Client (Old)

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicLong;

public final class TestSSEDecoderMemoryOld {

    public static void main(String[] args) {
        testOldSSEDecoder(8091);
    }

    private static void testOldSSEDecoder(int serverPort) {
        System.out.println("Testing old SSE decoder. Server port: " + serverPort);
        final AtomicLong counter = new AtomicLong();
        RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder("localhost", serverPort)
               .pipelineConfigurator(PipelineConfigurators.<ByteBuf>sseClientConfigurator())
               .build()
               .submit(HttpClientRequest.createGet("/"))
               .flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() {
                   @Override
                   public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> clientResponse) {
                       return clientResponse.getContent()
                                            .doOnNext(new Action1<ServerSentEvent>() {
                                                @Override
                                                public void call(ServerSentEvent event) {
                                                    if (counter.incrementAndGet() % 1000 == 0) {
                                                        System.out.println("Received events count: " + counter.get());
                                                    }
                                                }
                                            });
                   }
               }).toBlocking().last();
    }
}

Client (New)

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicLong;

public final class TestSSEDecoderMemoryNew {

    public static void main(String[] args) {
        testNewSSEDecoder(8091);
    }

    private static void testNewSSEDecoder(final int serverPort) {
        System.out.println("Testing new SSE decoder. Server port: " + serverPort);
        final AtomicLong counter = new AtomicLong();
        RxNetty.<ByteBuf, io.reactivex.netty.protocol.http.sse.ServerSentEvent>newHttpClientBuilder("localhost",
                                                                                                    serverPort)
               .pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
               .build()
               .submit(HttpClientRequest.createGet("/"))
               .flatMap(
                       new Func1<HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent>, Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent>>() {
                           @Override
                           public Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent> call(
                                   HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent> response) {
                               return response.getContent()
                                              .doOnNext(
                                                      new Action1<io.reactivex.netty.protocol.http.sse.ServerSentEvent>() {
                                                          @Override
                                                          public void call(
                                                                  io.reactivex.netty.protocol.http.sse.ServerSentEvent serverSentEvent) {
                                                              if (counter.incrementAndGet() % 1000 == 0) {
                                                                  System.out.println(
                                                                          "Received events count: " + counter.get());
                                                              }
                                                          }
                                                      });
                           }
                       })
               .toBlocking().last();
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant