Skip to content

Commit

Permalink
advanced filters: introduce interface for more complex filter interac…
Browse files Browse the repository at this point in the history
…tions (#1029)

Description: Introduces platform-bridged filter interfaces that allow stopping and stopping filter iteration, and asynchronous resumption of filter chain. The following conventions are adopted with these new interfaces:
- 'Continue' is the status used to indicate HTTP entities should be forwarded when filter iteration is ongoing.
- 'StopIteration' causes forwarding to halt (though invocations of the current filter will continue).
- The 'ResumeIteration' status or an asynchronous resume call must be used to begin iteration and forwarding again. 'Continue' is not valid when iteration is stopped.
- Using an asynchronous call to resume iteration will result in a special 'onResume' invocation of the filter, that will allow interaction with the HTTP stream on the same thread as other invocations, avoiding the need for synchronization.
- Resume mechanisms offer optional parameters attached to the return status that can be used to provide parts of the stream that have not yet been forwarded. e.g., if you resume during an 'onData' invocation, you can attach headers to the 'ResumeIteration' status (and must do so, if headers have not yet been forwarded).
Risk: Low
Testing: Local and CI

Signed-off-by: Mike Schore <mike.schore@gmail.com>
Signed-off-by: JP Simard <jp@jpsim.com>
  • Loading branch information
goaway authored and jpsim committed Nov 29, 2022
1 parent 1cf99c3 commit ddd89e5
Show file tree
Hide file tree
Showing 24 changed files with 358 additions and 135 deletions.
17 changes: 6 additions & 11 deletions mobile/examples/kotlin/hello_world/DemoFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,25 @@ import io.envoyproxy.envoymobile.FilterDataStatus
import io.envoyproxy.envoymobile.FilterHeadersStatus
import io.envoyproxy.envoymobile.FilterTrailersStatus
import io.envoyproxy.envoymobile.ResponseFilter
import io.envoyproxy.envoymobile.ResponseFilterCallbacks
import io.envoyproxy.envoymobile.ResponseHeaders
import io.envoyproxy.envoymobile.ResponseTrailers
import java.nio.ByteBuffer

class DemoFilter : ResponseFilter {

override fun setResponseFilterCallbacks(callbacks: ResponseFilterCallbacks) {
// No-op
}

override fun onResponseHeaders(headers: ResponseHeaders, endStream: Boolean):
FilterHeadersStatus<ResponseHeaders> {
Log.d("DemoFilter", "On headers!")
return FilterHeadersStatus.Continue(headers)
}

override fun onResponseData(body: ByteBuffer, endStream: Boolean): FilterDataStatus {
Log.d("DemoFilter", "On data!")
return FilterDataStatus.Continue(body)
}
override fun onResponseData(body: ByteBuffer, endStream: Boolean):
FilterDataStatus<ResponseHeaders> {
Log.d("DemoFilter", "On data!")
return FilterDataStatus.Continue(body)
}

override fun onResponseTrailers(trailers: ResponseTrailers):
FilterTrailersStatus<ResponseTrailers> {
FilterTrailersStatus<ResponseHeaders, ResponseTrailers> {
Log.d("DemoFilter", "On trailers!")
return FilterTrailersStatus.Continue(trailers)
}
Expand Down
11 changes: 6 additions & 5 deletions mobile/examples/swift/hello_world/DemoFilter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ struct DemoFilter: ResponseFilter {
NSLog("Adding new header!")
let builder = headers.toResponseHeadersBuilder()
builder.add(name: "filter-demo", value: "1")
return .continue(builder.build())
return .continue(headers: builder.build())
}

func setResponseFilterCallbacks(_ callbacks: ResponseFilterCallbacks) {}

func onResponseData(_ body: Data, endStream: Bool) -> FilterDataStatus {
func onResponseData(_ body: Data, endStream: Bool) -> FilterDataStatus<ResponseHeaders> {
// TODO(goaway): Can remove this when we have better integration coverage in place.
NSLog("Saw data chunk of length \(body.count)")
return .continue(body)
return .continue(data: body)
}

func onResponseTrailers(_ trailers: ResponseTrailers) -> FilterTrailersStatus<ResponseTrailers> {
return .continue(trailers)
func onResponseTrailers(_ trailers: ResponseTrailers)
-> FilterTrailersStatus<ResponseHeaders, ResponseTrailers> {
return .continue(trailers: trailers)
}

func onError(_ error: EnvoyError) {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.envoyproxy.envoymobile

import java.nio.ByteBuffer

/*
* RequestFilter supporting asynchronous resumption.
*/
interface AsyncRequestFilter : RequestFilter {
/**
* Called by the filter manager once to initialize the filter callbacks that the filter should
* use.
*
* @param callbacks: The callbacks for this filter to use to interact with the chain.
*/
fun setRequestFilterCallbacks(callbacks: RequestFilterCallbacks)

/**
* Invoked explicitly in response to an asynchronous `resumeRequest()` callback when filter
* iteration has been stopped. The parameters passed to this invocation will be a snapshot
* of any stream state that has not yet been forwarded along the filter chain.
*
* As with other filter invocations, this will be called on Envoy's main thread, and thus
* no additional synchronization is required between this and other invocations.
*
* @param headers: Headers, if `StopIteration` was returned from `onRequestHeaders`.
* @param data: Any data that has been buffered where `StopIterationAndBuffer` was returned.
* @param trailers: Trailers, if `StopIteration` was returned from `onRequestTrailers`.
* @param endStream: True, if the stream ended with the previous (and thus, last) invocation.
*
* @return: The resumption status including any HTTP entities that will be forwarded.
*/
fun onResumeRequest(
headers: RequestHeaders?,
data: ByteBuffer?,
trailers: RequestTrailers?,
endStream: Boolean
): FilterResumeStatus<RequestHeaders, RequestTrailers>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.envoyproxy.envoymobile

import java.nio.ByteBuffer

/*
* ResponseFilter supporting asynchronous resumption.
*/
interface AsyncResponseFilter : ResponseFilter {
/**
* Called by the filter manager once to initialize the filter callbacks that the filter should
* use.
*
* @param callbacks: The callbacks for this filter to use to interact with the chain.
*/
fun setResponseFilterCallbacks(callbacks: ResponseFilterCallbacks)

/**
* Invoked explicitly in response to an asynchronous `resumeResponse()` callback when filter
* iteration has been stopped. The parameters passed to this invocation will be a snapshot
* of any stream state that has not yet been forwarded along the filter chain.
*
* As with other filter invocations, this will be called on Envoy's main thread, and thus
* no additional synchronization is required between this and other invocations.
*
* @param headers: Headers, if `StopIteration` was returned from `onResponseHeaders`.
* @param data: Any data that has been buffered where `StopIterationAndBuffer` was returned.
* @param trailers: Trailers, if `StopIteration` was returned from `onReponseTrailers`.
* @param endStream: True, if the stream ended with the previous (and thus, last) invocation.
*
* @return: The resumption status including any HTTP entities that will be forwarded.
*/
fun onResumeResponse(
headers: ResponseHeaders?,
data: ByteBuffer?,
trailers: ResponseTrailers?,
endStream: Boolean
): FilterResumeStatus<ResponseHeaders, ResponseTrailers>
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class EnvoyHTTPFilterAdapter(
val result = requestFilter.onRequestHeaders(RequestHeaders(headers), endStream)
return when (result) {
is FilterHeadersStatus.Continue -> arrayOf(0 /*EnvoyHTTPHeadersStatusContinue*/, result.headers.headers)
is FilterHeadersStatus.StopIteration -> arrayOf(1 /*EnvoyHTTPHeadersStatusStopIteration*/, result.headers.headers)
is FilterHeadersStatus.StopIteration -> arrayOf(1 /*EnvoyHTTPHeadersStatusStopIteration*/, headers)
}
}
return arrayOf(0, headers)
Expand All @@ -40,7 +40,7 @@ internal class EnvoyHTTPFilterAdapter(
val result = responseFilter.onResponseHeaders(ResponseHeaders(headers), endStream)
return when (result) {
is FilterHeadersStatus.Continue -> arrayOf(0 /*EnvoyHTTPHeadersStatusContinue*/, result.headers.headers)
is FilterHeadersStatus.StopIteration -> arrayOf(1 /*EnvoyHTTPHeadersStatusStopIteration*/, result.headers.headers)
is FilterHeadersStatus.StopIteration -> arrayOf(1 /*EnvoyHTTPHeadersStatusStopIteration*/, headers)
}
}
return arrayOf(0, headers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,58 @@ package io.envoyproxy.envoymobile
import java.nio.ByteBuffer

/*
* Status returned by filters when transmitting or receiving data.
* Status to be returned by filters when transmitting or receiving data.
*/
sealed class FilterDataStatus {
sealed class FilterDataStatus<T : Headers> {
/**
* Continue filter chain iteration. If headers have not yet been sent to the next filter, they
* will be sent first via `onRequestHeaders()`/`onResponseHeaders()`.
*
* If data has previously been buffered, the data returned will be added to the buffer
* before the entirety is sent to the next filter.
* TODO: add param docs. name param in swift.
* @param data: The (potentially-modified) data to be forwarded along the filter chain.
*/
class Continue(val data: ByteBuffer) : FilterDataStatus()
class Continue<T : Headers>(val data: ByteBuffer) : FilterDataStatus<T>()

/**
* Do not iterate to any of the remaining filters in the chain, and buffer body data for later
* dispatching. The data returned here will be added to the buffer.
* dispatching. The data passed to this invocation will be buffered internally.
*
* This filter will continue to be called with new chunks of data.
* `onData` will continue to be called with any new chunks of data appended to all data that has
* been buffered so far.
*
* Returning `Continue` from `onRequestData()`/`onResponseData()` or calling
* `continueRequest()`/`continueResponse()` MUST be called when continued filter iteration is
* Returning `ResumeIteration` from another filter invocation or calling
* `resumeRequest()`/`resumeResponse()` MUST be called when continued filter iteration is
* desired.
*
* This should be called by filters which must parse a larger block of the incoming data before
* continuing processing.
*/
class StopIterationAndBuffer(val data: ByteBuffer) : FilterDataStatus()
class StopIterationAndBuffer<T : Headers> : FilterDataStatus<T>()

/**
* Do not iterate to any of the remaining filters in the chain, and do not internally buffer
* data.
*
* This filter will continue to be called with new chunks of data.
* `onData` will continue to be called with new chunks of data.
*
* Returning `Continue` from `onRequestData()`/`onResponseData()` or calling
* `continueRequest()`/`continueResponse()` MUST be called when continued filter iteration is
* Returning `ResumeIteration` from another filter invocation or calling
* `resumeRequest()`/`resumeResponse()` MUST be called when continued filter iteration is
* desired.
*
* This may be called by filters which must parse a larger block of the incoming data before
* continuing processing, and will handle their own buffering.
*/
class StopIterationNoBuffer : FilterDataStatus()
class StopIterationNoBuffer<T : Headers> : FilterDataStatus<T>()

/**
* Resume previously-stopped iteration, possibly forwarding headers if iteration was stopped
* during an on*Headers invocation.
*
* It is an error to return `ResumeIteration` if iteration is not currently stopped, and it is
* an error to include headers if headers have already been forwarded to the next filter
* (i.e. iteration was stopped during an on*Data invocation instead of on*Headers).
*
* @param headers: Headers to be forwarded (if needed).
* @param data: Data to be forwarded.
*/
class ResumeIteration<T : Headers>(val headers: T?, val data: ByteBuffer) : FilterDataStatus<T>()
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package io.envoyproxy.envoymobile

/*
* Status returned by filters when transmitting or receiving headers.
* Status to be returned by filters when transmitting or receiving headers.
*/
sealed class FilterHeadersStatus<T : Headers> {
/**
* Continue filter chain iteration, passing the provided headers through.
*
* @param headers: The (potentially-modified) headers to be forwarded along the filter chain.
*/
class Continue<T : Headers>(val headers: T) : FilterHeadersStatus<T>()

/**
* Do not iterate to any of the remaining filters in the chain with headers.
*
* Returning `continue` from `onRequestData()`/`onResponseData()` or calling
* `continueRequest()`/`continueResponse()` MUST occur when continued filter iteration is
* Returning `ResumeIteration` from another filter invocation or calling
* `resumeRequest()`/`resumeResponse()` MUST occur when continued filter iteration is
* desired.
*/
class StopIteration<T : Headers>(val headers: T) : FilterHeadersStatus<T>()
class StopIteration<T : Headers> : FilterHeadersStatus<T>()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.envoyproxy.envoymobile

import java.nio.ByteBuffer

/*
* Status to be returned by filters after resuming iteration asynchronously.
*/
sealed class FilterResumeStatus<T : Headers, U : Headers> {
/**
* Resume previously-stopped iteration, potentially forwarding headers, data, and/or trailers
* that have not yet been passed along the filter chain.
*
* It is an error to return ResumeIteration if iteration is not currently stopped, and it is
* an error to include headers if headers have already been forwarded to the next filter
* (i.e. iteration was stopped during an on*Data invocation instead of on*Headers). It is also
* an error to include data or trailers if `endStream` was previously set or if trailers have
* already been forwarded.
*
* @param headers: Headers to be forwarded (if needed).
* @param data: Data to be forwarded (if needed).
* @param trailers: Trailers to be forwarded (if needed).
*/
class ResumeIteration<T : Headers, U : Headers>(
val headers: T?,
val data: ByteBuffer?,
val trailers: U?
) : FilterResumeStatus<T, U>()
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
package io.envoyproxy.envoymobile

import java.nio.ByteBuffer

/*
* Status returned by filters when transmitting or receiving trailers.
* Status to be returned by filters when transmitting or receiving trailers.
*/
// TODO: create abstract Trailers class.
sealed class FilterTrailersStatus<T : Headers> {
sealed class FilterTrailersStatus<T : Headers, U : Headers> {
/**
* Continue filter chain iteration, passing the provided trailers through.
*
* @param trailers: The (potentially-modified) trailers to be forwarded along the filter chain.
*/
class Continue<T : Headers>(val trailers: T) : FilterTrailersStatus<T>()
class Continue<T : Headers, U : Headers>(val trailers: U) : FilterTrailersStatus<T, U>()

/**
* Do not iterate to any of the remaining filters in the chain with trailers.
*
* Calling `continueRequest()`/`continueResponse()` MUST occur when continued filter iteration
* Because trailers are by definition the last HTTP entity of a request or response, only
* asynchronous filters support resumption after returning `StopIteration` from on*Trailers.
* Calling `resumeRequest()`/`resumeResponse()` MUST occur if continued filter iteration
* is desired.
*/
class StopIteration<T : Headers>(val trailers: T) : FilterTrailersStatus<T>()
class StopIteration<T : Headers, U : Headers> : FilterTrailersStatus<T, U>()

/**
* Resume previously-stopped iteration, possibly forwarding headers and data if iteration was
* stopped during an on*Headers or on*Data invocation.
*
* It is an error to return `ResumeIteration` if iteration is not currently stopped, and it is
* an error to include headers if headers have already been forwarded to the next filter
* (i.e. iteration was stopped during an on*Data invocation instead of on*Headers).
*
* @param headers: Headers to be forwarded (if needed).
* @param data: Data to be forwarded (if needed).
* @param trailers: Trailers to be forwarded.
*/
class ResumeIteration<T : Headers, U : Headers>(
val headers: T?,
val data: ByteBuffer?,
val trailers: U?
) : FilterTrailersStatus<T, U>()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ import java.nio.ByteBuffer
* Filter executed for outbound requests, providing the ability to observe and mutate streams.
*/
interface RequestFilter : Filter {
/**
* Called by the filter manager once to initialize the filter callbacks that the filter should
* use.
*
* @param callbacks: The callbacks for this filter to use to interact with the chain.
*/
fun setRequestFilterCallbacks(callbacks: RequestFilterCallbacks)

/**
* Called once when the request is initiated.
*
Expand All @@ -37,16 +29,16 @@ interface RequestFilter : Filter {
*
* @return: The data status containing body with which to continue or buffer.
*/
fun onRequestData(body: ByteBuffer, endStream: Boolean): FilterDataStatus
fun onRequestData(body: ByteBuffer, endStream: Boolean): FilterDataStatus<RequestHeaders>

/**
* Called at most once when the request is closed from the client with trailers.
*
* Filters may mutate or delay the trailers.
* Filters may mutate or delay the trailers. Note trailers imply the stream has ended.
*
* @param trailers: The outbound trailers.
*
* @return: The trailer status containing body with which to continue or buffer.
*/
fun onRequestTrailers(trailers: RequestTrailers): FilterTrailersStatus<RequestTrailers>
fun onRequestTrailers(trailers: RequestTrailers): FilterTrailersStatus<RequestHeaders, RequestTrailers>
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package io.envoyproxy.envoymobile

interface RequestFilterCallbacks {
/**
* Continue iterating through the filter chain with buffered headers and body data.
* Resume iterating through the filter chain with buffered headers and body data.
*
* This can only be called if the filter has previously returned `stopIteration{...}` from
* `onHeaders()`/`onData()`/`onTrailers()`.
*
* Headers and any buffered body data will be passed to the next filter in the chain.
* This will result in an `onResumeRequest()` callback on the RequestFilter.
*
* If the request is not complete, the filter will still receive `onData()`/`onTrailers()`
* If the request is not complete, the filter may receive further `onData()`/`onTrailers()`
* calls.
*/
fun continueRequest()
fun resumeRequest()
}
Loading

0 comments on commit ddd89e5

Please sign in to comment.