Skip to content

Commit

Permalink
finagle-stream: ensure that the HttpDechunker sends EOF only after al…
Browse files Browse the repository at this point in the history
…l messages

Github-issue: twitter/finagle#84
Signed-off-by: marius a. eriksen <marius@twitter.com>
  • Loading branch information
stephenjudkins authored and mariusae committed May 18, 2012
1 parent 7722dc9 commit 56a2829
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
19 changes: 9 additions & 10 deletions src/main/scala/com/twitter/finagle/stream/HttpDechunker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,29 @@ class HttpDechunker extends BrokerChannelHandler {
ch: Channel,
out: Broker[ChannelBuffer],
err: Broker[Throwable],
close: Offer[Unit])
{
close: Offer[Unit]
) {
Offer.select(
close { _=>
ch.close()
error(err, EOF)
},

upstreamEvent {
case MessageValue(chunk: HttpChunk, _) =>
val content = chunk.getContent

val sendOf =
if (content.readable)
out.send(content)
else
Offer.const(())
if (content.readable) out.send(content) else Offer.const(())

if (chunk.isLast) {
ch.close()
sendOf andThen error(err, EOF)
sendOf.sync() ensure { error(err, EOF) }
} else {
ch.setReadable(false)
sendOf andThen ch.setReadable(true)
read(ch, out, err, close)
sendOf.sync() ensure {
ch.setReadable(true)
read(ch, out, err, close)
}
}

case MessageValue(invalid, ctx) =>
Expand Down
48 changes: 48 additions & 0 deletions src/test/scala/com/twitter/finagle/stream/HttpDechunkerSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.twitter.finagle.stream

import org.specs.SpecificationWithJUnit
import com.twitter.finagle.{SunkChannel, SunkChannelFactory}
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel.{Channels, MessageEvent}
import org.jboss.netty.handler.codec.http._
import com.twitter.conversions.time._
import java.nio.charset.Charset

class HttpDechunkerSpec extends SpecificationWithJUnit {

"HttpDechunker" should {
"wait until last message is synced before sending EOF" in {
val cf = new SunkChannelFactory
val pipeline = Channels.pipeline
val dechunker = new HttpDechunker
pipeline.addLast("dechunker", dechunker)
val channel = new SunkChannel(cf, pipeline, cf.sink)

val httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
httpResponse.setChunked(true)

Channels.fireMessageReceived(channel, httpResponse)

val streamResponse = (channel.upstreamEvents.headOption match {
case Some(m: MessageEvent) => m.getMessage match {
case s: StreamResponse => s
}
case _ => throw new Exception("No upstream message received")
})

val messages = streamResponse.messages
val error = streamResponse.error

Channels.fireMessageReceived(channel, new DefaultHttpChunk(ChannelBuffers.wrappedBuffer("1".getBytes)))
messages.sync()(1.second).toString(Charset.defaultCharset) must_== "1"

Channels.fireMessageReceived(channel, new DefaultHttpChunk(ChannelBuffers.wrappedBuffer("2".getBytes)))
Channels.fireMessageReceived(channel, HttpChunk.LAST_CHUNK)
val receiveError = error.sync()
receiveError.isDefined must_== false
messages.sync()(1.second).toString(Charset.defaultCharset) must_== "2"
receiveError.isDefined must_== true
receiveError(1.second) must_== EOF
}
}
}

0 comments on commit 56a2829

Please sign in to comment.