diff --git a/lib/http/2/client.rb b/lib/http/2/client.rb index 93b5136..e289ed8 100644 --- a/lib/http/2/client.rb +++ b/lib/http/2/client.rb @@ -35,10 +35,11 @@ def initialize(**settings) # @param frame [Hash] def send(frame) if @state == :connection_header - emit(:frame, CONNECTION_HEADER) @state = :connected + emit(:frame, CONNECTION_HEADER) - settings(stream_limit: @stream_limit, window_limit: @window_limit) + payload = @settings.select {|k,v| v != SPEC_DEFAULT_CONNECTION_SETTINGS[k]} + settings(payload) end super(frame) diff --git a/lib/http/2/connection.rb b/lib/http/2/connection.rb index 33b458a..4bccc8a 100644 --- a/lib/http/2/connection.rb +++ b/lib/http/2/connection.rb @@ -3,8 +3,33 @@ module HTTP2 # Default connection and stream flow control window (64KB). DEFAULT_FLOW_WINDOW = 65535 + # Default header table size + DEFAULT_HEADER_SIZE = 4096 + + # Default stream_limit + DEFAULT_MAX_CONCURRENT_STREAMS = 100 + + # Default values for SETTINGS frame, as defined by the spec. + SPEC_DEFAULT_CONNECTION_SETTINGS = { + settings_header_table_size: 4096, + settings_enable_push: 1, # enabled for servers + settings_max_concurrent_streams: Framer::MAX_STREAM_ID, # unlimited + settings_initial_window_size: 65535, + settings_max_frame_size: 16384, + settings_max_header_list_size: 2**31 - 1, # unlimited + }.freeze + + DEFAULT_CONNECTIONS_SETTINGS = { + settings_header_table_size: 4096, + settings_enable_push: 1, # enabled for servers + settings_max_concurrent_streams: 100, + settings_initial_window_size: 65535, # + settings_max_frame_size: 16384, + settings_max_header_list_size: 2**31 - 1, # unlimited + }.freeze + # Default stream priority (lower values are higher priority). - DEFAULT_PRIORITY = 2**30 + DEFAULT_WEIGHT = 16 # Default connection "fast-fail" preamble string as defined by the spec. CONNECTION_HEADER = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" @@ -30,9 +55,18 @@ class Connection # infinity, but is automatically updated on receipt of peer settings). attr_reader :window - # Maximum number of concurrent streams allowed by the peer (automatically - # updated on receipt of peer settings). - attr_reader :stream_limit + # Max frame size + attr_reader :max_frame_size + def max_frame_size=(size) + @framer.max_frame_size = @max_frame_size = size + end + + # Current value of connection SETTINGS + def settings_value; @settings; end + + # Pending settings value + # Sent but not ack'ed settings + attr_reader :pending_settings # Number of active streams between client and server (reserved streams # are not counted towards the stream limit). @@ -40,18 +74,22 @@ class Connection # Initializes new connection object. # - def initialize(streams: 100, window: DEFAULT_FLOW_WINDOW, **settings) - @stream_limit = streams + def initialize(**settings) + @settings = DEFAULT_CONNECTIONS_SETTINGS.merge(settings) @compressor = Header::Compressor.new(settings) @decompressor = Header::Decompressor.new(settings) @active_stream_count = 0 @streams = {} + @pending_settings = [] @framer = Framer.new - @window = window - @window_limit = window + + @window_limit = @settings[:settings_initial_window_size] + @window = @window_limit + + self.max_frame_size = @settings[:settings_max_frame_size] @recv_buffer = Buffer.new @send_buffer = [] @@ -64,11 +102,11 @@ def initialize(streams: 100, window: DEFAULT_FLOW_WINDOW, **settings) # @param priority [Integer] # @param window [Integer] # @param parent [Stream] - def new_stream(priority: DEFAULT_PRIORITY, parent: nil) + def new_stream(**args) raise ConnectionClosed.new if @state == :closed - raise StreamLimitExceeded.new if @active_stream_count == @stream_limit + raise StreamLimitExceeded.new if @active_stream_count == @settings[:settings_max_concurrent_streams] - stream = activate_stream(@stream_id, priority, parent) + stream = activate_stream(id: @stream_id, **args) @stream_id += 2 stream @@ -101,20 +139,14 @@ def goaway(error = :no_error, payload = nil) @state = :closed end - # Sends a connection SETTINGS frame to the peer. Setting window size - # to Float::INFINITY disables flow control. + # Sends a connection SETTINGS frame to the peer. # - # @param stream_limit [Integer] maximum number of concurrent streams - # @param window_limit [Float] maximum flow window size - def settings(stream_limit: @stream_limit, window_limit: @window_limit) - payload = { settings_max_concurrent_streams: stream_limit } - if window_limit.to_f.infinite? - payload[:settings_flow_control_options] = 1 - else - payload[:settings_initial_window_size] = window_limit - end - + # @param settings [Array or Hash] + def settings(payload) + payload = payload.to_a + @pending_settings << payload send({type: :settings, stream: 0, payload: payload}) + @pending_settings << payload end # Decodes incoming bytes into HTTP 2.0 frames and routes them to @@ -144,11 +176,14 @@ def receive(data) raise HandshakeError.new else @state = :connection_header - settings(stream_limit: @stream_limit, window_limit: @window_limit) + payload = @settings.select {|k,v| v != SPEC_DEFAULT_CONNECTION_SETTINGS[k]} + settings(payload) end end while frame = @framer.parse(@recv_buffer) do + emit(:frame_received, frame) + # Header blocks MUST be transmitted as a contiguous sequence of frames # with no interleaved frames of any other type, or from any other stream. if !@continuation.empty? @@ -160,16 +195,13 @@ def receive(data) @continuation << frame return if !frame[:flags].include? :end_headers - headers = @continuation.collect do |chunk| - decode_headers(chunk) - chunk[:payload] - end.flatten(1) + payload = @continuation.map {|f| f[:payload]}.join frame = @continuation.shift @continuation.clear frame.delete(:length) - frame[:payload] = headers + frame[:payload] = Buffer.new(payload) frame[:flags] << :end_headers end @@ -200,8 +232,10 @@ def receive(data) stream = @streams[frame[:stream]] if stream.nil? - stream = activate_stream(frame[:stream], - frame[:priority] || DEFAULT_PRIORITY) + stream = activate_stream(id: frame[:stream], + weight: frame[:weight] || DEFAULT_WEIGHT, + dependency: frame[:dependency] || 0, + exclusive: frame[:exclusive] || false) emit(:stream, stream) end @@ -248,7 +282,7 @@ def receive(data) end end - stream = activate_stream(pid, DEFAULT_PRIORITY, parent) + stream = activate_stream(id: pid, parent: parent) emit(:promise, stream) stream << frame else @@ -277,6 +311,7 @@ def receive(data) # @note all frames are currently delivered in FIFO order. # @param frame [Hash] def send(frame) + emit(:frame_sent, frame) if frame[:type] == :data send_data(frame, true) @@ -288,7 +323,9 @@ def send(frame) goaway(frame[:error]) end else - emit(:frame, encode(frame)) + # HEADERS and PUSH_PROMISE may generate CONTINUATION + frames = encode(frame) + frames.each {|f| emit(:frame, f) } end end end @@ -296,14 +333,16 @@ def send(frame) # Applies HTTP 2.0 binary encoding to the frame. # # @param frame [Hash] - # @return [Buffer] encoded frame + # @return [Array of Buffer] encoded frame def encode(frame) if frame[:type] == :headers || frame[:type] == :push_promise - encode_headers(frame) + frames = encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame + else + frames = [frame] # otherwise one frame end - @framer.generate(frame) + frames.map {|f| @framer.generate(f) } end # Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any @@ -329,15 +368,14 @@ def connection_management(frame) case @state when :connection_header # SETTINGS frames MUST be sent at the start of a connection. - connection_settings(frame) @state = :connected + connection_settings(frame) when :connected case frame[:type] when :settings connection_settings(frame) when :window_update - flow_control_allowed? @window += frame[:increment] send_data(nil, true) when :ping @@ -355,7 +393,8 @@ def connection_management(frame) # for new streams. @state = :closed emit(:goaway, frame[:last_stream], frame[:error], frame[:payload]) - + when :altsvc, :blocked + emit(frame[:type], frame) else connection_error end @@ -372,10 +411,18 @@ def connection_settings(frame) connection_error end - frame[:payload].each do |key,v| + settings, ack_received = \ + if frame[:flags].include?(:ack) + # Process pending settings we have sent. + [@pending_settings.shift, true] + else + [frame[:payload], false] + end + + settings.each do |key,v| + @settings[key] = v case key when :settings_max_concurrent_streams - @stream_limit = v # A change to SETTINGS_INITIAL_WINDOW_SIZE could cause the available # space in a flow control window to become negative. A sender MUST @@ -383,7 +430,7 @@ def connection_settings(frame) # controlled frames until it receives WINDOW_UPDATE frames that cause # the flow control window to become positive. when :settings_initial_window_size - flow_control_allowed? + v > 0x7fffffff and connection_error @window = @window - @window_limit + v @streams.each do |id, stream| stream.emit(:window, stream.window - @window_limit + v) @@ -391,18 +438,43 @@ def connection_settings(frame) @window_limit = v - # Flow control can be disabled the entire connection using the - # SETTINGS_FLOW_CONTROL_OPTIONS setting. This setting ends all forms - # of flow control. An implementation that does not wish to perform - # flow control can use this in the initial SETTINGS exchange. - when :settings_flow_control_options - flow_control_allowed? + # Setting header table size might cause some headers evicted + when :settings_header_table_size + @compressor.set_table_size(v) - if v == 1 - @window = @window_limit = Float::INFINITY + when :settings_enable_push + if @stream_id % 2 == 1 + # This is client. Peer (server) is not allowed to change settings_enable_push. + unless v == 0 + connection_error + end + else + # This is server. Peer (client) can set either 0 or 1. + unless v == 0 || v == 1 + connection_error + end + end + + when :settings_max_frame_size + self.max_frame_size = v + + when :settings_compress_data + # This is server. Peer (client) can set either 0 or 1. + unless v == 0 || v == 1 + connection_error end + + else + # ignore unknown settings end end + + if ack_received + emit(:settings_ack, frame, @pending_settings.size) + elsif @state != :closed + # send ack + send({type: :settings, stream: 0, payload: [], flags: [:ack]}) + end end # Decode headers payload and update connection decompressor state. @@ -425,21 +497,34 @@ def decode_headers(frame) # Encode headers payload and update connection compressor state. # # @param frame [Hash] + # @return [Array of Frame] def encode_headers(frame) - if !frame[:payload].is_a? String - frame[:payload] = @compressor.encode(frame[:payload]) + payload = frame[:payload] + unless payload.is_a? String + payload = @compressor.encode(payload) end - rescue Exception => e - connection_error(:compression_error, msg: e.message) - end + frames = [] - # Once disabled, no further flow control operations are permitted. - # - def flow_control_allowed? - if @window_limit == Float::INFINITY - connection_error(:flow_control_error) + while payload.size > 0 + cont = frame.dup + cont[:type] = :continuation + cont[:flags] = [] + cont[:payload] = payload.slice!(0, max_frame_size) + frames << cont end + if frames.empty? + frames = [frame] + else + frames.first[:type] = frame[:type] + frames.first[:flags] = frame[:flags] - [:end_headers] + frames.last[:flags] << :end_headers + end + + frames + + rescue Exception => e + [connection_error(:compression_error, msg: e.message)] end # Activates new incoming or outgoing stream and registers appropriate @@ -449,12 +534,12 @@ def flow_control_allowed? # @param priority [Integer] # @param window [Integer] # @param parent [Stream] - def activate_stream(id, priority, parent = nil) + def activate_stream(id: nil, **args) if @streams.key?(id) connection_error(msg: 'Stream ID already exists') end - stream = Stream.new(id, priority, @window_limit, parent) + stream = Stream.new({connection: self, id: id, window: @window_limit}.merge(args)) # Streams that are in the "open" state, or either of the "half closed" # states count toward the maximum number of streams that an endpoint is diff --git a/lib/http/2/flow_buffer.rb b/lib/http/2/flow_buffer.rb index c848bef..2ee228d 100644 --- a/lib/http/2/flow_buffer.rb +++ b/lib/http/2/flow_buffer.rb @@ -1,8 +1,5 @@ module HTTP2 - # Maximum size of a DATA payload (16383 bytes, ~16K). - MAX_FRAME_SIZE = 2**14-1 - # Implementation of stream and connection DATA flow control: frames may # be split and / or may be buffered based on current flow control window. # @@ -54,8 +51,8 @@ def send_data(frame = nil, encode = false) sent = frame_size end - frame = encode(frame) if encode - emit(:frame, frame) + frames = encode ? encode(frame) : [frame] + frames.each {|f| emit(:frame, f) } @window -= sent end end diff --git a/lib/http/2/framer.rb b/lib/http/2/framer.rb index 355c67c..048ae1e 100644 --- a/lib/http/2/framer.rb +++ b/lib/http/2/framer.rb @@ -1,12 +1,15 @@ module HTTP2 - # Performs encoding, decoding, and validation of binary HTTP 2.0 frames. + # Performs encoding, decoding, and validation of binary HTTP/2 frames. # class Framer include Error - # Maximum frame size (16383 bytes) - MAX_PAYLOAD_SIZE = 2**14-1 + # Default value of max frame size (16384 bytes) + DEFAULT_MAX_FRAME_SIZE = 2**14 + + # Current maximum frame size + attr_accessor :max_frame_size # Maximum stream ID (2^31) MAX_STREAM_ID = 0x7fffffff @@ -14,7 +17,7 @@ class Framer # Maximum window increment value (2^31) MAX_WINDOWINC = 0x7fffffff - # HTTP 2.0 frame type mapping as defined by the spec + # HTTP/2 frame type mapping as defined by the spec FRAME_TYPES = { data: 0x0, headers: 0x1, @@ -24,36 +27,45 @@ class Framer push_promise: 0x5, ping: 0x6, goaway: 0x7, - window_update: 0x9, - continuation: 0xa + window_update: 0x8, + continuation: 0x9, + altsvc: 0xa, } + FRAME_TYPES_WITH_PADDING = [ :data, :headers, :push_promise ] + # Per frame flags as defined by the spec FRAME_FLAGS = { data: { - end_stream: 0, reserved: 1 + end_stream: 0, + padded: 3, compressed: 5 }, headers: { - end_stream: 0, reserved: 1, - end_headers: 2, priority: 3 + end_stream: 0, end_headers: 2, + padded: 3, priority: 5, }, priority: {}, rst_stream: {}, - settings: {}, - push_promise: { end_headers: 2 }, + settings: { ack: 0 }, + push_promise: { + end_headers: 2, + padded: 3, + }, ping: { ack: 0 }, goaway: {}, window_update:{}, - continuation: { - end_stream: 0, end_headers: 2 - } + continuation: { end_headers: 2 }, + altsvc: {}, } # Default settings as defined by the spec DEFINED_SETTINGS = { - settings_max_concurrent_streams: 4, - settings_initial_window_size: 7, - settings_flow_control_options: 10 + settings_header_table_size: 1, + settings_enable_push: 2, + settings_max_concurrent_streams: 3, + settings_initial_window_size: 4, + settings_max_frame_size: 5, + settings_max_header_list_size: 6, } # Default error types as defined by the spec @@ -62,22 +74,35 @@ class Framer protocol_error: 1, internal_error: 2, flow_control_error: 3, + settings_timeout: 4, stream_closed: 5, - frame_too_large: 6, + frame_size_error: 6, refused_stream: 7, cancel: 8, - compression_error: 9 + compression_error: 9, + connect_error: 10, + enhance_your_calm: 11, + inadequate_security: 12, } RBIT = 0x7fffffff RBYTE = 0x0fffffff - HEADERPACK = "CnCCN" - UINT32 = "N" - + EBIT = 0x80000000 + UINT32 = "N".freeze + UINT16 = "n".freeze + UINT8 = "C".freeze + HEADERPACK = (UINT8 + UINT16 + UINT8 + UINT8 + UINT32).freeze FRAME_LENGTH_HISHIFT = 16 FRAME_LENGTH_LOMASK = 0xFFFF + BINARY = 'binary'.freeze + + private_constant :RBIT, :RBYTE, :EBIT, :HEADERPACK, :UINT32, :UINT16, :UINT8, :BINARY - private_constant :RBIT, :RBYTE, :HEADERPACK, :UINT32 + # Initializes new framer object. + # + def initialize + @max_frame_size = DEFAULT_MAX_FRAME_SIZE + end # Generates common 9-byte frame header. # - http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-4.1 @@ -91,10 +116,14 @@ def commonHeader(frame) raise CompressionError.new("Invalid frame type (#{frame[:type]})") end - if frame[:length] > MAX_PAYLOAD_SIZE + if frame[:length] > @max_frame_size raise CompressionError.new("Frame size is too large: #{frame[:length]}") end + if frame[:length] < 0 + raise CompressionError.new("Frame size is invalid: #{frame[:length]}") + end + if frame[:stream] > MAX_STREAM_ID raise CompressionError.new("Stream ID (#{frame[:stream]}) is too large") end @@ -129,16 +158,18 @@ def readCommonHeader(buf) frame[:length] = (len_hi << FRAME_LENGTH_HISHIFT) | len_lo frame[:type], _ = FRAME_TYPES.select { |t,pos| type == pos }.first - frame[:flags] = FRAME_FLAGS[frame[:type]].reduce([]) do |acc, (name, pos)| - acc << name if (flags & (1 << pos)) > 0 - acc + if frame[:type] + frame[:flags] = FRAME_FLAGS[frame[:type]].reduce([]) do |acc, (name, pos)| + acc << name if (flags & (1 << pos)) > 0 + acc + end end frame[:stream] = stream & RBIT frame end - # Generates encoded HTTP 2.0 frame. + # Generates encoded HTTP/2 frame. # - http://tools.ietf.org/html/draft-ietf-httpbis-http2 # # @param frame [Hash] @@ -155,21 +186,31 @@ def generate(frame) length += frame[:payload].bytesize when :headers - if frame[:priority] + if frame[:weight] || frame[:stream_dependency] || !frame[:exclusive].nil? + unless frame[:weight] && frame[:stream_dependency] && !frame[:exclusive].nil? + raise CompressionError.new("Must specify all of priority parameters for #{frame[:type]}") + end frame[:flags] += [:priority] if !frame[:flags].include? :priority end if frame[:flags].include? :priority - bytes << [frame[:priority] & RBIT].pack(UINT32) - length += 4 + bytes << [(frame[:exclusive] ? EBIT : 0) | + (frame[:stream_dependency] & RBIT)].pack(UINT32) + bytes << [frame[:weight] - 1].pack(UINT8) + length += 5 end bytes << frame[:payload] length += frame[:payload].bytesize when :priority - bytes << [frame[:priority] & RBIT].pack(UINT32) - length += 4 + unless frame[:weight] && frame[:stream_dependency] && !frame[:exclusive].nil? + raise CompressionError.new("Must specify all of priority parameters for #{frame[:type]}") + end + bytes << [(frame[:exclusive] ? EBIT : 0) | + (frame[:stream_dependency] & RBIT)].pack(UINT32) + bytes << [frame[:weight] - 1].pack(UINT8) + length += 5 when :rst_stream bytes << pack_error(frame[:error]) @@ -181,7 +222,9 @@ def generate(frame) end frame[:payload].each do |(k,v)| - if !k.is_a? Integer + if k.is_a? Integer + DEFINED_SETTINGS.has_value?(k) or next + else k = DEFINED_SETTINGS[k] if k.nil? @@ -189,9 +232,9 @@ def generate(frame) end end - bytes << [k & RBYTE].pack(UINT32) + bytes << [k].pack(UINT16) bytes << [v].pack(UINT32) - length += 8 + length += 6 end when :push_promise @@ -224,13 +267,60 @@ def generate(frame) when :continuation bytes << frame[:payload] length += frame[:payload].bytesize + + when :altsvc + bytes << [frame[:max_age], frame[:port]].pack(UINT32 + UINT16) + length += 6 + if frame[:proto] + frame[:proto].bytesize > 255 and raise CompressionError.new("Proto too long") + bytes << [frame[:proto].bytesize].pack(UINT8) << frame[:proto].force_encoding(BINARY) + length += 1 + frame[:proto].bytesize + else + bytes << [0].pack(UINT8) + length += 1 + end + if frame[:host] + frame[:host].bytesize > 255 and raise CompressionError.new("Host too long") + bytes << [frame[:host].bytesize].pack(UINT8) << frame[:host].force_encoding(BINARY) + length += 1 + frame[:host].bytesize + else + bytes << [0].pack(UINT8) + length += 1 + end + if frame[:origin] + bytes << frame[:origin] + length += frame[:origin].bytesize + end + end + + # Process padding. + # frame[:padding] gives number of extra octets to be added. + # - http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-6.1 + if frame[:padding] + unless FRAME_TYPES_WITH_PADDING.include?(frame[:type]) + raise CompressionError.new("Invalid padding flag for #{frame[:type]}") + end + + padlen = frame[:padding] + if padlen <= 0 || padlen > 256 || padlen + length > @max_frame_size + raise CompressionError.new("Invalid padding #{padlen}") + end + + length += padlen + bytes.prepend([padlen -= 1].pack(UINT8)) + frame[:flags] << :padded + + # Padding: Padding octets that contain no application semantic value. + # Padding octets MUST be set to zero when sending and ignored when + # receiving. + bytes << "\0" * padlen end frame[:length] = length bytes.prepend(commonHeader(frame)) end - # Decodes complete HTTP 2.0 frame from provided buffer. If the buffer + # Decodes complete HTTP/2 frame from provided buffer. If the buffer # does not contain enough data, no further work is performed. # # @param buf [Buffer] @@ -242,28 +332,64 @@ def parse(buf) buf.read(9) payload = buf.read(frame[:length]) + # Implementations MUST discard frames + # that have unknown or unsupported types. + # - http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.5 + return nil if frame[:type].nil? + + # Process padding + padlen = 0 + if FRAME_TYPES_WITH_PADDING.include?(frame[:type]) + padded = frame[:flags].include?(:padded) + if padded + padlen = payload.read(1).unpack(UINT8).first + frame[:padding] = padlen + 1 + padlen > payload.bytesize and raise ProtocolError.new("padding too long") + padlen > 0 and payload.slice!(-padlen,padlen) + frame[:length] -= frame[:padding] + frame[:flags].delete(:padded) + end + end + case frame[:type] when :data frame[:payload] = payload.read(frame[:length]) when :headers if frame[:flags].include? :priority - frame[:priority] = payload.read_uint32 & RBIT + e_sd = payload.read_uint32 + frame[:stream_dependency] = e_sd & RBIT + frame[:exclusive] = (e_sd & EBIT) != 0 + frame[:weight] = payload.getbyte + 1 end frame[:payload] = payload.read(frame[:length]) when :priority - frame[:priority] = payload.read_uint32 & RBIT + e_sd = payload.read_uint32 + frame[:stream_dependency] = e_sd & RBIT + frame[:exclusive] = (e_sd & EBIT) != 0 + frame[:weight] = payload.getbyte + 1 when :rst_stream frame[:error] = unpack_error payload.read_uint32 when :settings - frame[:payload] = {} - (frame[:length] / 8).times do - id = payload.read_uint32 & RBYTE + # NOTE: frame[:length] might not match the number of frame[:payload] + # because unknown extensions are ignored. + frame[:payload] = [] + unless frame[:length] % 6 == 0 + raise ProtocolError.new("Invalid settings payload length") + end + + if frame[:stream] != 0 + raise ProtocolError.new("Invalid stream ID (#{frame[:stream]})") + end + + (frame[:length] / 6).times do + id = payload.read(2).unpack(UINT16).first val = payload.read_uint32 # Unsupported or unrecognized settings MUST be ignored. + # Here we send it along. name, _ = DEFINED_SETTINGS.select { |name, v| v == id }.first - frame[:payload][name] = val if name + frame[:payload] << [name, val] if name end when :push_promise frame[:promise_stream] = payload.read_uint32 & RBIT @@ -274,12 +400,26 @@ def parse(buf) frame[:last_stream] = payload.read_uint32 & RBIT frame[:error] = unpack_error payload.read_uint32 - size = frame[:length] - 8 + size = frame[:length] - 8 # for last_stream and error frame[:payload] = payload.read(size) if size > 0 when :window_update frame[:increment] = payload.read_uint32 & RBIT when :continuation frame[:payload] = payload.read(frame[:length]) + when :altsvc + frame[:max_age], frame[:port] = payload.read(6).unpack(UINT32 + UINT16) + + len = payload.getbyte + len > 0 and frame[:proto] = payload.read(len) + + len = payload.getbyte + len > 0 and frame[:host] = payload.read(len) + + if payload.size > 0 + frame[:origin] = payload.read(payload.size) + end + else + # Unknown frame type is explicitly allowed end frame diff --git a/lib/http/2/stream.rb b/lib/http/2/stream.rb index 5ab7492..86c59b2 100644 --- a/lib/http/2/stream.rb +++ b/lib/http/2/stream.rb @@ -24,7 +24,7 @@ module HTTP2 # | | ,-------|:active |-------. | | # | | H / ES | | ES \ H | | # | v v +--------+ v v | - # | +-----------+ | +-_---------+ | + # | +-----------+ | +-----------+ | # | |:half_close| | |:half_close| | # | | (remote) | | | (local) | | # | +-----------+ | +-----------+ | @@ -49,7 +49,8 @@ class Stream attr_reader :parent # Stream priority as set by initiator. - attr_reader :priority + attr_reader :weight + attr_reader :dependency # Size of current stream flow control window. attr_reader :window @@ -64,13 +65,18 @@ class Stream # will emit new stream objects, when new stream frames are received. # # @param id [Integer] - # @param priority [Integer] + # @param weight [Integer] + # @param dependency [Integer] + # @param exclusive [Boolean] # @param window [Integer] # @param parent [Stream] - def initialize(id, priority, window, parent = nil) - @id = id - @priority = priority - @window = window + def initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, window: nil, parent: nil) + @connection = connection or raise ArgumentError("missing mandatory argument connection") + @id = id or raise ArgumentError("missing mandatory argument id") + @weight = weight + @dependency = dependency + process_priority({weight: weight, stream_dependency: dependency, exclusive: exclusive}) + @window = window or raise ArgumentError("missing mandatory argument window") @parent = parent @state = :idle @error = false @@ -90,17 +96,14 @@ def receive(frame) when :data emit(:data, frame[:payload]) if !frame[:ignore] when :headers, :push_promise - if frame[:payload].is_a? Array - emit(:headers, Hash[*frame[:payload].flatten]) if !frame[:ignore] - else - emit(:headers, frame[:payload]) if !frame[:ignore] - end + emit(:headers, frame[:payload]) if !frame[:ignore] when :priority - @priority = frame[:priority] - emit(:priority, @priority) + process_priority(frame) when :window_update @window += frame[:increment] send_data + when :altsvc, :blocked + emit(frame[:type], frame) end complete_transition(frame) @@ -116,7 +119,7 @@ def send(frame) transition(frame, true) frame[:stream] ||= @id - @priority = frame[:priority] if frame[:type] == :priority + process_priority(frame) if frame[:type] == :priority if frame[:type] == :data send_data(frame) @@ -129,7 +132,7 @@ def send(frame) # Sends a HEADERS frame containing HTTP response headers. # - # @param headers [Hash] + # @param headers [Array or Hash] Array of key-value pairs or Hash # @param end_headers [Boolean] indicates that no more headers will be sent # @param end_stream [Boolean] indicates that no payload will be sent def headers(headers, end_headers: true, end_stream: false) @@ -150,10 +153,11 @@ def promise(headers, end_headers: true, &block) # Sends a PRIORITY frame with new stream priority value (can only be # performed by the client). # - # @param p [Integer] new stream priority value - def reprioritize(p) + # @param weight [Integer] new stream weight value + # @param dependency [Integer] new stream dependency stream + def reprioritize(weight: 16, dependency: 0, exclusive: false) stream_error if @id.even? - send({type: :priority, priority: p}) + send({type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive}) end # Sends DATA frame containing response payload. @@ -164,8 +168,8 @@ def data(payload, end_stream: true) flags = [] flags << :end_stream if end_stream - while payload.bytesize > MAX_FRAME_SIZE do - chunk = payload.slice!(0, MAX_FRAME_SIZE) + while payload.bytesize > @connection.max_frame_size do + chunk = payload.slice!(0, @connection.max_frame_size) send({type: :data, payload: chunk}) end @@ -344,8 +348,11 @@ def transition(frame, sending) # frame bearing the END_STREAM flag is sent. when :half_closed_local if sending - if frame[:type] == :rst_stream + case frame[:type] + when :rst_stream event(:local_rst) + when :priority + process_priority(frame) else stream_error end @@ -354,7 +361,9 @@ def transition(frame, sending) when :data, :headers, :continuation event(:remote_closed) if end_stream?(frame) when :rst_stream then event(:remote_rst) - when :window_update, :priority + when :priority + process_priority(frame) + when :window_update frame[:ignore] = true end end @@ -380,6 +389,8 @@ def transition(frame, sending) case frame[:type] when :rst_stream then event(:remote_rst) when :window_update then frame[:ignore] = true + when :priority + process_priority(frame) else stream_error(:stream_closed); end end @@ -407,15 +418,21 @@ def transition(frame, sending) if sending case frame[:type] when :rst_stream then # ignore + when :priority then + process_priority(frame) else stream_error(:stream_closed) if !(frame[:type] == :rst_stream) end else - case @closed - when :remote_rst, :remote_closed - stream_error(:stream_closed) if !(frame[:type] == :rst_stream) - when :local_rst, :local_closed - frame[:ignore] = true + if frame[:type] == :priority + process_priority(frame) + else + case @closed + when :remote_rst, :remote_closed + stream_error(:stream_closed) if !(frame[:type] == :rst_stream) + when :local_rst, :local_closed + frame[:ignore] = true + end end end end @@ -455,6 +472,20 @@ def complete_transition(frame) end end + def process_priority(frame) + @weight = frame[:weight] + @dependency = frame[:stream_dependency] + emit(:priority, + weight: frame[:weight], + dependency: frame[:stream_dependency], + exclusive: frame[:exclusive]) + # TODO: implement dependency tree housekeeping + # Latest draft defines a fairly complex priority control. + # See https://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.3 + # We currently have no prioritization among streams. + # We should add code here. + end + def end_stream?(frame) case frame[:type] when :data, :headers, :continuation @@ -469,6 +500,5 @@ def stream_error(error = :stream_error, msg: nil) klass = error.to_s.split('_').map(&:capitalize).join raise Error.const_get(klass).new(msg) end - end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 04a64da..cc4e8d4 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -24,13 +24,13 @@ it "should initialize client with custom connection settings" do frames = [] - @client = Client.new(streams: 200) + @client = Client.new(:settings_max_concurrent_streams => 200) @client.on(:frame) { |bytes| frames << bytes } @client.ping("12345678") frame = f.parse(frames[1]) frame[:type].should eq :settings - frame[:payload][:settings_max_concurrent_streams].should eq 200 + frame[:payload].should include([:settings_max_concurrent_streams, 200]) end end diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index d3bf0c9..849952f 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -26,12 +26,12 @@ context "stream management" do it "should initialize to default stream limit (100)" do - @conn.stream_limit.should eq 100 + @conn.settings_value[:settings_max_concurrent_streams].should eq 100 end it "should change stream limit to received SETTINGS value" do @conn << f.generate(SETTINGS) - @conn.stream_limit.should eq 10 + @conn.settings_value[:settings_max_concurrent_streams].should eq 10 end it "should count open streams against stream limit" do @@ -78,12 +78,68 @@ @conn << f.generate(SETTINGS) stream, headers = nil, HEADERS.dup - headers[:priority] = 20 + headers[:weight] = 20 + headers[:stream_dependency] = 0 + headers[:exclusive] = false @conn.on(:stream) {|s| stream = s } @conn << f.generate(headers) - stream.priority.should eq 20 + stream.weight.should eq 20 + end + end + + context "Headers pre/post processing" do + it "should not concatenate multiple occurences of a header field with the same name" do + input = [ + ["Content-Type", "text/html"], + ["Cache-Control", "max-age=60, private"], + ["Cache-Control", "must-revalidate"], + ] + expected = [ + ["content-type", "text/html"], + ["cache-control", "max-age=60, private"], + ["cache-control", "must-revalidate"], + ] + headers = [] + @conn.on(:frame) do |bytes| + bytes.force_encoding('binary') + # bytes[3]: frame's type field + [1,5,9].include?(bytes[3].ord) and headers << f.parse(bytes) + end + + stream = @conn.new_stream + stream.headers(input) + + headers.size.should eq 1 + emitted = Decompressor.new.decode(headers.first[:payload]) + emitted.should match_array(expected) + end + + it "should not split zero-concatenated header field values" do + input = [ + ["cache-control", "max-age=60, private\0must-revalidate"], + ["content-type", "text/html"], + ["cookie", "a=b\0c=d; e=f"], + ] + expected = [ + ["cache-control", "max-age=60, private\0must-revalidate"], + ["content-type", "text/html"], + ["cookie", "a=b\0c=d; e=f"], + ] + + result = nil + @conn.on(:stream) do |stream| + stream.on(:headers) {|h| result = h} + end + + srv = Server.new + srv.on(:frame) {|bytes| @conn << bytes} + stream = srv.new_stream + stream.headers(input) + + result.should eq expected + end end @@ -94,7 +150,7 @@ it "should update connection and stream windows on SETTINGS" do settings, data = SETTINGS.dup, DATA.dup - settings[:payload] = { settings_initial_window_size: 1024 } + settings[:payload] = [[:settings_initial_window_size, 1024]] data[:payload] = 'x'*2048 stream = @conn.new_stream @@ -111,29 +167,15 @@ it "should initialize streams with window specified by peer" do settings = SETTINGS.dup - settings[:payload] = { settings_initial_window_size: 1024 } + settings[:payload] = [[:settings_initial_window_size, 1024]] @conn << f.generate(settings) @conn.new_stream.window.should eq 1024 end - it "should support global disable of flow control" do - @conn << f.generate(SETTINGS) - @conn.window.should eq Float::INFINITY - end - - it "should raise error on flow control after disabling it" do - expect { @conn << f.generate(SETTINGS) }.to_not raise_error - expect { - [WINDOW_UPDATE, SETTINGS].each do |frame| - @conn.dup << f.generate(frame) - end - }.to raise_error(FlowControlError) - end - it "should observe connection flow control" do settings, data = SETTINGS.dup, DATA.dup - settings[:payload] = { settings_initial_window_size: 1000 } + settings[:payload] = [[:settings_initial_window_size, 1000]] @conn << f.generate(settings) s1 = @conn.new_stream @@ -157,7 +199,7 @@ context "framing" do it "should buffer incomplete frames" do settings = SETTINGS.dup - settings[:payload] = { settings_initial_window_size: 1000 } + settings[:payload] = [[:settings_initial_window_size, 1000]] @conn << f.generate(settings) frame = f.generate(WINDOW_UPDATE.merge({stream: 0, increment: 1000})) @@ -197,11 +239,14 @@ cc = Compressor.new h1, h2 = HEADERS.dup, CONTINUATION.dup - h1[:payload] = cc.encode([req_headers.first]) + + # Header block fragment might not complete for decompression + payload = cc.encode(req_headers) + h1[:payload] = payload.slice!(0, payload.size/2) # first half h1[:stream] = 5 h1[:flags] = [] - h2[:payload] = cc.encode([req_headers.last]) + h2[:payload] = payload # the remaining h2[:stream] = 5 @conn << f.generate(SETTINGS) @@ -238,15 +283,15 @@ it "should raise connection error on decode exception" do @conn << f.generate(SETTINGS) frame = f.generate(HEADERS.dup) - frame[2] = 0.chr - + @conn.instance_eval{@framer}.should_receive(:parse).and_raise(CompressionError.new) expect { @conn << frame }.to raise_error(ProtocolError) end it "should emit encoded frames via on(:frame)" do bytes = nil @conn.on(:frame) {|d| bytes = d } - @conn.settings(stream_limit: 10, window_limit: Float::INFINITY) + @conn.settings(settings_max_concurrent_streams: 10, + settings_initial_window_size: 0x7fffffff) bytes.should eq f.generate(SETTINGS) end @@ -267,6 +312,78 @@ ':path' => '/resource' }) end + + it "should generate CONTINUATION if HEADERS is too long" do + headers = [] + @conn.on(:frame) do |bytes| + bytes.force_encoding('binary') + # bytes[3]: frame's type field + [1,5,9].include?(bytes[3].ord) and headers << f.parse(bytes) + end + + stream = @conn.new_stream + stream.headers({ + ':method' => 'get', + ':scheme' => 'http', + ':authority' => 'www.example.org', + ':path' => '/resource', + 'custom' => 'q' * 44000, + }, end_stream: true) + headers.size.should eq 3 + headers[0][:type].should eq :headers + headers[1][:type].should eq :continuation + headers[2][:type].should eq :continuation + headers[0][:flags].should eq [:end_stream] + headers[1][:flags].should eq [] + headers[2][:flags].should eq [:end_headers] + end + + it "should not generate CONTINUATION if HEADERS fits exactly in a frame" do + headers = [] + @conn.on(:frame) do |bytes| + bytes.force_encoding('binary') + # bytes[3]: frame's type field + [1,5,9].include?(bytes[3].ord) and headers << f.parse(bytes) + end + + stream = @conn.new_stream + stream.headers({ + ':method' => 'get', + ':scheme' => 'http', + ':authority' => 'www.example.org', + ':path' => '/resource', + 'custom' => 'q' * 18682, # this number should be updated when Huffman table is changed + }, end_stream: true) + headers[0][:length].should eq @conn.max_frame_size + headers.size.should eq 1 + headers[0][:type].should eq :headers + headers[0][:flags].should include(:end_headers) + headers[0][:flags].should include(:end_stream) + end + + it "should generate CONTINUATION if HEADERS exceed the max payload by one byte" do + headers = [] + @conn.on(:frame) do |bytes| + bytes.force_encoding('binary') + [1,5,9].include?(bytes[3].ord) and headers << f.parse(bytes) + end + + stream = @conn.new_stream + stream.headers({ + ':method' => 'get', + ':scheme' => 'http', + ':authority' => 'www.example.org', + ':path' => '/resource', + 'custom' => 'q' * 18683, # this number should be updated when Huffman table is changed + }, end_stream: true) + headers[0][:length].should eq @conn.max_frame_size + headers[1][:length].should eq 1 + headers.size.should eq 2 + headers[0][:type].should eq :headers + headers[1][:type].should eq :continuation + headers[0][:flags].should eq [:end_stream] + headers[1][:flags].should eq [:end_headers] + end end context "connection management" do @@ -344,14 +461,17 @@ it "should send GOAWAY frame on connection error" do stream = @conn.new_stream - @conn.stub(:encode) + @conn.should_receive(:encode) do |frame| + frame[:type].should eq :settings + [frame] + end @conn.should_receive(:encode) do |frame| frame[:type].should eq :goaway frame[:last_stream].should eq stream.id frame[:error].should eq :protocol_error + [frame] end - @conn << f.generate(SETTINGS) expect { @conn << f.generate(DATA) }.to raise_error(ProtocolError) end end @@ -360,14 +480,15 @@ it ".settings should emit SETTINGS frames" do @conn.should_receive(:send) do |frame| frame[:type].should eq :settings - frame[:payload].should eq({ - settings_max_concurrent_streams: 10, - settings_flow_control_options: 1 - }) + frame[:payload].should eq([ + [:settings_max_concurrent_streams, 10], + [:settings_initial_window_size, 0x7fffffff], + ]) frame[:stream].should eq 0 end - @conn.settings(stream_limit: 10, window_limit: Float::INFINITY) + @conn.settings(settings_max_concurrent_streams: 10, + settings_initial_window_size: 0x7fffffff) end it ".ping should generate PING frames" do diff --git a/spec/framer_spec.rb b/spec/framer_spec.rb index 86b5ff4..263feac 100644 --- a/spec/framer_spec.rb +++ b/spec/framer_spec.rb @@ -9,12 +9,12 @@ { length: 4, type: :headers, - flags: [:end_stream, :reserved, :end_headers], + flags: [:end_stream, :end_headers], stream: 15, } } - let(:bytes) { [0,0x04, 0x01, 0x7, 0x0000000F].pack("CnCCN") } + let(:bytes) { [0,0x04, 0x01, 0x5, 0x0000000F].pack("CnCCN") } it "should generate common 9 byte header" do f.commonHeader(frame).should eq bytes @@ -24,7 +24,21 @@ f.readCommonHeader(Buffer.new(bytes)).should eq frame end - it "should raise exception on invalid frame type" do + it "should generate a large frame" do + f = Framer.new + f.max_frame_size = 2**24-1 + frame = { + length: 2**18 + 2**16 + 17, + type: :headers, + flags: [:end_stream, :end_headers], + stream: 15, + } + bytes = [5, 17, 0x01, 0x5, 0x0000000F].pack("CnCCN") + f.commonHeader(frame).should eq bytes + f.readCommonHeader(Buffer.new(bytes)).should eq frame + end + + it "should raise exception on invalid frame type when sending" do expect { frame[:type] = :bogus f.commonHeader(frame) @@ -47,7 +61,7 @@ it "should raise exception on invalid frame size" do expect { - frame[:length] = 2**14 + frame[:length] = 2**24 f.commonHeader(frame) }.to raise_error(CompressionError, /too large/) end @@ -58,13 +72,13 @@ frame = { length: 4, type: :data, - flags: [:end_stream, :reserved], + flags: [:end_stream], stream: 1, payload: 'text' } bytes = f.generate(frame) - bytes.should eq [0,0x4,0x0,0x3,0x1,*'text'.bytes].pack("CnCCNC*") + bytes.should eq [0,0x4,0x0,0x1,0x1,*'text'.bytes].pack("CnCCNC*") f.parse(bytes).should eq frame end @@ -75,13 +89,13 @@ frame = { length: 12, type: :headers, - flags: [:end_stream, :reserved, :end_headers], + flags: [:end_stream, :end_headers], stream: 1, payload: 'header-block' } bytes = f.generate(frame) - bytes.should eq [0,0xc,0x1,0x7,0x1,*'header-block'.bytes].pack("CnCCNC*") + bytes.should eq [0,0xc,0x1,0x5,0x1,*'header-block'.bytes].pack("CnCCNC*") f.parse(bytes).should eq frame end @@ -89,14 +103,16 @@ frame = { length: 16, type: :headers, - flags: [:end_headers, :priority], + flags: [:end_headers], stream: 1, - priority: 15, + stream_dependency: 15, + weight: 12, + exclusive: false, payload: 'header-block' } bytes = f.generate(frame) - bytes.should eq [0,0x10,0x1,0xc,0x1,0xf,*'header-block'.bytes].pack("CnCCNNC*") + bytes.should eq [0,0x11,0x1,0x24,0x1,0xf,0xb,*'header-block'.bytes].pack("CnCCNNCC*") f.parse(bytes).should eq frame end end @@ -104,14 +120,16 @@ context "PRIORITY" do it "should generate and parse bytes" do frame = { - length: 4, + length: 5, type: :priority, stream: 1, - priority: 15 + stream_dependency: 15, + weight: 12, + exclusive: true, } bytes = f.generate(frame) - bytes.should eq [0,0x4,0x2,0x0,0x1,0xf].pack("CnCCNN") + bytes.should eq [0,0x5,0x2,0x0,0x1,0x8000000f,0xb].pack("CnCCNNC") f.parse(bytes).should eq frame end end @@ -134,47 +152,88 @@ context "SETTINGS" do let(:frame) { { - length: 8, type: :settings, flags: [], stream: 0, - payload: { - settings_max_concurrent_streams: 10 - } + payload: [ + [:settings_max_concurrent_streams, 10], + [:settings_header_table_size, 2048], + ] } } it "should generate and parse bytes" do + bytes = f.generate(frame) + bytes.should eq [0,12,0x4,0x0,0x0,3,10,1,2048].pack("CnCCNnNnN") + parsed = f.parse(bytes) + parsed.delete(:length) + frame.delete(:length) + parsed.should eq frame + end + it "should generate settings when id is given as an integer" do + frame[:payload][1][0] = 1 bytes = f.generate(frame) - bytes.should eq [0,0x8,0x4,0x0,0x0,0x4,0xa].pack("CnCCNNN") - f.parse(bytes).should eq frame + bytes.should eq [0,12,0x4,0x0,0x0,3,10,1,2048].pack("CnCCNnNnN") end - it "should ignore custom settings" do - frame[:length] = 8*2 - frame[:payload] = { - settings_max_concurrent_streams: 10, - settings_initial_window_size: 20 - } + it "should ignore custom settings when sending" do + frame[:payload] = [ + [:settings_max_concurrent_streams, 10], + [:settings_initial_window_size, 20], + [55, 30], + ] - buf = f.generate(frame.merge({55 => 30})) + buf = f.generate(frame) + frame[:payload].slice!(2) # cut off the extension + frame[:length] = 12 # frame length should be computed WITHOUT extensions f.parse(buf).should eq frame end - it "should raise exception on invalid stream ID" do + it "should ignore custom settings when receiving" do + frame[:payload] = [ + [:settings_max_concurrent_streams, 10], + [:settings_initial_window_size, 20], + ] + + buf = f.generate(frame) + buf.setbyte(2, 18) # add 6 to the frame length + buf << "\x00\x37\x00\x00\x00\x1e" + parsed = f.parse(buf) + parsed.delete(:length) + frame.delete(:length) + parsed.should eq frame + end + + it "should raise exception on sending invalid stream ID" do expect { frame[:stream] = 1 f.generate(frame) }.to raise_error(CompressionError, /Invalid stream ID/) end - it "should raise exception on invalid setting" do + it "should raise exception on receiving invalid stream ID" do + expect { + buf = f.generate(frame) + buf.setbyte(8, 1) + f.parse(buf) + }.to raise_error(ProtocolError, /Invalid stream ID/) + end + + it "should raise exception on sending invalid setting" do expect { - frame[:payload] = {random: 23} + frame[:payload] = [[:random, 23]] f.generate(frame) }.to raise_error(CompressionError, /Unknown settings ID/) end + + it "should raise exception on receiving invalid payload length" do + expect { + buf = f.generate(frame) + buf.setbyte(2, 11) # change payload length + f.parse(buf) + }.to raise_error(ProtocolError, /Invalid settings payload length/) + end end context "PUSH_PROMISE" do @@ -256,7 +315,7 @@ } bytes = f.generate(frame) - bytes.should eq [0,0x4,0x9,0x0,0x0,0xa].pack("CnCCNN") + bytes.should eq [0,0x4,0x8,0x0,0x0,0xa].pack("CnCCNN") f.parse(bytes).should eq frame end end @@ -267,23 +326,120 @@ length: 12, type: :continuation, stream: 1, - flags: [:end_stream, :end_headers], + flags: [:end_headers], payload: 'header-block' } bytes = f.generate(frame) - bytes.should eq [0,0xc,0xa,0x5,0x1,*'header-block'.bytes].pack("CnCCNC*") + bytes.should eq [0,0xc,0x9,0x4,0x1,*'header-block'.bytes].pack("CnCCNC*") + f.parse(bytes).should eq frame + end + end + + context "ALTSVC" do + it "should generate and parse bytes" do + frame = { + length: 44, + type: :altsvc, + stream: 1, + max_age: 1402290402, # 4 + port: 8080, # 2 + proto: 'h2-13', # 1 + 5 + host: 'www.example.com', # 1 + 15 + origin: 'www.example.com', # 15 + } + bytes = f.generate(frame) + expected = [0, 43, 0xa, 0, 1, 1402290402, 8080].pack("CnCCNNn") + expected << [5, *'h2-13'.bytes].pack("CC*") + expected << [15, *'www.example.com'.bytes].pack("CC*") + expected << [*'www.example.com'.bytes].pack("C*") + bytes.should eq expected f.parse(bytes).should eq frame end end + context "Padding" do + [:data, :headers, :push_promise].each do |type| + [1,256].each do |padlen| + context "generating #{type} frame padded #{padlen}" do + before do + @frame = { + length: 12, + type: type, + stream: 1, + payload: 'example data', + } + type == :push_promise and @frame[:promise_stream] = 2 + @normal = f.generate(@frame) + @padded = f.generate(@frame.merge(:padding => padlen)) + end + it "should generate a frame with padding" do + @padded.bytesize.should eq @normal.bytesize + padlen + end + it "should fill padded octets with zero" do + trailer_len = padlen - 1 + @padded[-trailer_len, trailer_len].should match(/\A\0*\z/) + end + it "should parse a frame with padding" do + f.parse(Buffer.new(@padded)).should eq \ + f.parse(Buffer.new(@normal)).merge(:padding => padlen) + end + it "should preserve payload" do + f.parse(Buffer.new(@padded))[:payload].should eq @frame[:payload] + end + end + end + end + context "generating with invalid padding length" do + before do + @frame = { + length: 12, + type: :data, + stream: 1, + payload: 'example data', + } + end + [0, 257,1334].each do |padlen| + it "should raise error on trying to generate data frame padded with invalid #{padlen}" do + expect { + f.generate(@frame.merge(:padding => padlen)) + }.to raise_error(CompressionError, /padding/i) + end + end + it "should raise error when adding a padding would make frame too large" do + @frame[:payload] = 'q' * (f.max_frame_size - 200) + @frame[:length] = @frame[:payload].size + @frame[:padding] = 210 # would exceed 4096 + expect { + f.generate(@frame) + }.to raise_error(CompressionError, /padding/i) + end + end + context "parsing frames with invalid paddings" do + before do + @frame = { + length: 12, + type: :data, + stream: 1, + payload: 'example data', + } + @padlen = 123 + @padded = f.generate(@frame.merge(:padding => @padlen)) + end + it "should raise exception when the given padding is longer than the payload" do + @padded.setbyte(9,240) + expect { f.parse(Buffer.new(@padded)) }.to raise_error(ProtocolError, /padding/) + end + end + end + it "should determine frame length" do frames = [ [{type: :data, stream: 1, flags: [:end_stream], payload: "abc"}, 3], [{type: :headers, stream: 1, payload: "abc"}, 3], - [{type: :priority, stream: 3, priority: 30}, 4], + [{type: :priority, stream: 3, stream_dependency: 30, exclusive: false, weight: 1}, 5], [{type: :rst_stream, stream: 3, error: 100}, 4], - [{type: :settings, payload: {settings_max_concurrent_streams: 10}}, 8], + [{type: :settings, payload: [[:settings_max_concurrent_streams, 10]]}, 6], [{type: :push_promise, promise_stream: 5, payload: "abc"}, 7], [{type: :ping, payload: "blob"*2}, 8], [{type: :goaway, last_stream: 5, error: 20, payload: "blob"}, 12], @@ -293,7 +449,8 @@ frames.each do |(frame, size)| bytes = f.generate(frame) - bytes.slice(0,3).unpack("Cn")[1].should eq size + bytes.slice(1,2).unpack("n").first.should eq size + bytes.readbyte(0).should eq 0 end end @@ -318,4 +475,15 @@ bytes.should be_empty end + it "should ignore unknown extension frames" do + frame = {type: :headers, stream: 1, payload: "headers"} + bytes = f.generate(frame) + bytes = Buffer.new(bytes + bytes) # Two HEADERS frames in bytes + bytes.setbyte(3, 42) # Make the first unknown type 42 + + f.parse(bytes).should be_nil # first frame should be ignored + f.parse(bytes).should eq frame # should generate only one HEADERS + bytes.should be_empty + end + end diff --git a/spec/helper.rb b/spec/helper.rb index ebfbc86..1d5e9e2 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -47,7 +47,9 @@ PRIORITY = { type: :priority, stream: 1, - priority: 15 + exclusive: false, + stream_dependency: 0, + weight: 20, } RST_STREAM = { @@ -59,10 +61,10 @@ SETTINGS = { type: :settings, stream: 0, - payload: { - settings_max_concurrent_streams: 10, - settings_flow_control_options: 1 - } + payload: [ + [:settings_max_concurrent_streams, 10], + [:settings_initial_window_size, 0x7fffffff], + ] } PUSH_PROMISE = { @@ -104,9 +106,18 @@ payload: '-second-block' } +ALTSVC = { + type: :altsvc, + max_age: 1402290402, # 4 + port: 8080, # 2 reserved 1 + proto: 'h2-12', # 1 + 5 + host: 'www.example.com', # 1 + 15 + origin: 'www.example.com', # 15 +} + FRAME_TYPES = [ DATA, HEADERS, PRIORITY, RST_STREAM, SETTINGS, PUSH_PROMISE, - PING, GOAWAY, WINDOW_UPDATE, CONTINUATION + PING, GOAWAY, WINDOW_UPDATE, CONTINUATION, ALTSVC ] def set_stream_id(bytes, id) diff --git a/spec/server_spec.rb b/spec/server_spec.rb index 3134d66..c715815 100644 --- a/spec/server_spec.rb +++ b/spec/server_spec.rb @@ -23,14 +23,15 @@ it "should initialize client with custom connection settings" do frames = [] - @srv = Server.new(streams: 200, window: 2**10) + @srv = Server.new(settings_max_concurrent_streams: 200, + settings_initial_window_size: 2**10) @srv.on(:frame) { |recv| frames << recv } @srv << CONNECTION_HEADER frame = f.parse(frames[0]) frame[:type].should eq :settings - frame[:payload][:settings_max_concurrent_streams].should eq 200 - frame[:payload][:settings_initial_window_size].should eq 2**10 + frame[:payload].should include([:settings_max_concurrent_streams, 200]) + frame[:payload].should include([:settings_initial_window_size, 2**10]) end end @@ -40,7 +41,7 @@ @srv.on(:stream) do |stream| expect { - stream.promise({}) {} + stream.promise(':method' => 'GET') {} }.to_not raise_error end diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index d42ad3d..20c32a6 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -12,8 +12,8 @@ end it "should set custom stream priority" do - stream = @client.new_stream(priority: 3) - stream.priority.should eq 3 + stream = @client.new_stream(weight: 3, dependency: 2, exclusive: true) + stream.weight.should eq 3 end context "reserved (local)" do @@ -55,8 +55,8 @@ end it "should reprioritize stream on PRIORITY" do - @stream.receive PRIORITY.merge({priority: 30}) - @stream.priority.should eq 30 + expect { @stream.receive PRIORITY }.to_not raise_error + @stream.weight.should eq 20 end end @@ -95,8 +95,8 @@ end it "should reprioritize stream on PRIORITY" do - @stream.send PRIORITY - @stream.priority.should eq 15 + expect { @stream.send PRIORITY }.to_not raise_error + @stream.weight.should eq 20 end end @@ -116,7 +116,7 @@ end it "should transition to half closed (local) if sending END_STREAM" do - [DATA, HEADERS, CONTINUATION].each do |frame| + [DATA, HEADERS].each do |frame| s, f = @stream.dup, frame.dup f[:flags] = [:end_stream] @@ -126,7 +126,7 @@ end it "should transition to half closed (remote) if receiving END_STREAM" do - [DATA, HEADERS, CONTINUATION].each do |frame| + [DATA, HEADERS].each do |frame| s, f = @stream.dup, frame.dup f[:flags] = [:end_stream] @@ -236,7 +236,7 @@ before(:each) { @stream.send HEADERS_END_STREAM } it "should raise error on attempt to send frames" do - (FRAME_TYPES - [RST_STREAM]).each do |frame| + (FRAME_TYPES - [PRIORITY, RST_STREAM]).each do |frame| expect { @stream.dup.send frame }.to raise_error StreamError end end @@ -267,6 +267,11 @@ @stream.state.should eq :half_closed_local end + it "should reprioritize stream on PRIORITY" do + expect { @stream.send PRIORITY }.to_not raise_error + @stream.weight.should eq 20 + end + it "should emit :half_close event on transition" do order = [] stream = @client.new_stream @@ -294,7 +299,7 @@ before(:each) { @stream.receive HEADERS_END_STREAM } it "should raise STREAM_CLOSED error on reciept of frames" do - (FRAME_TYPES - [RST_STREAM, WINDOW_UPDATE]).each do |frame| + (FRAME_TYPES - [PRIORITY, RST_STREAM, WINDOW_UPDATE]).each do |frame| expect { @stream.dup.receive frame }.to raise_error(StreamClosed) @@ -302,7 +307,7 @@ end it "should transition to closed if END_STREAM flag is sent" do - [DATA, HEADERS, CONTINUATION].each do |frame| + [DATA, HEADERS].each do |frame| s, f = @stream.dup, frame.dup f[:flags] = [:end_stream] @@ -327,6 +332,11 @@ @stream.state.should eq :half_closed_remote end + it "should reprioritize stream on PRIORITY" do + expect { @stream.receive PRIORITY }.to_not raise_error + @stream.weight.should eq 20 + end + it "should emit :half_close event on transition" do order = [] stream = @client.new_stream @@ -358,7 +368,7 @@ end it "should raise STREAM_CLOSED on attempt to send frames" do - (FRAME_TYPES - [RST_STREAM]).each do |frame| + (FRAME_TYPES - [PRIORITY, RST_STREAM]).each do |frame| expect { @stream.dup.send frame }.to raise_error(StreamClosed) @@ -366,20 +376,28 @@ end it "should raise STREAM_CLOSED on receipt of frame" do - (FRAME_TYPES - [RST_STREAM]).each do |frame| + (FRAME_TYPES - [PRIORITY, RST_STREAM, WINDOW_UPDATE]).each do |frame| expect { @stream.dup.receive frame }.to raise_error(StreamClosed) end end - it "should allow RST_STREAM to be sent" do + it "should allow PRIORITY, RST_STREAM to be sent" do + expect { @stream.send PRIORITY }.to_not raise_error expect { @stream.send RST_STREAM }.to_not raise_error end - it "should not send RST_STREAM on receipt of RST_STREAM" do + it "should allow PRIORITY, RST_STREAM to be received" do + expect { @stream.receive PRIORITY }.to_not raise_error expect { @stream.receive RST_STREAM }.to_not raise_error end + + it "should reprioritize stream on PRIORITY" do + expect { @stream.receive PRIORITY }.to_not raise_error + @stream.weight.should eq 20 + end + end context "local closed via RST_STREAM frame" do @@ -407,23 +425,28 @@ # We're auto RST'ing PUSH streams in connection class, hence # skipping this transition for now. #end + end - context "local closed via END_STREAM flag" do - before(:each) do - @stream.send HEADERS # open - @stream.send DATA # contains end_stream flag - end + # FIXME: Isn't this test same as "half closed (local)"? + # context "local closed via END_STREAM flag" do + # before(:each) do + # @stream.send HEADERS # open + # @stream.send DATA # contains end_stream flag + # end + + # it "should ignore received frames" do + # FRAME_TYPES.each do |frame| + # expect { @stream.dup.receive frame }.to_not raise_error + # end + # end + # end - it "should ignore received frames" do - FRAME_TYPES.each do |frame| - expect { @stream.dup.receive frame }.to_not raise_error - end - end - end end end # end stream states + # TODO: add test cases to ensure on(:priority) emitted after close + context "flow control" do it "should initialize to default flow control window" do @stream.window.should eq DEFAULT_FLOW_WINDOW @@ -455,7 +478,7 @@ it "should observe session flow control" do settings, data = SETTINGS.dup, DATA.dup - settings[:payload] = { settings_initial_window_size: 1000 } + settings[:payload] = [[:settings_initial_window_size, 1000]] settings[:stream] = 0 framer = Framer.new @@ -482,17 +505,17 @@ it ".reprioritize should emit PRIORITY frame" do @stream.should_receive(:send) do |frame| frame[:type].should eq :priority - frame[:priority].should eq 30 + frame[:weight].should eq 30 end - @stream.reprioritize 30 + @stream.reprioritize weight: 30 end it ".reprioritize should raise error if invoked by server" do srv = Server.new stream = srv.new_stream - expect { stream.reprioritize(10) }.to raise_error(StreamError) + expect { stream.reprioritize(weight: 10) }.to raise_error(StreamError) end it ".headers should emit HEADERS frames" do @@ -528,7 +551,7 @@ end it ".data should split large DATA frames" do - data = "x" * HTTP2::MAX_FRAME_SIZE * 2 + data = "x" * 16384 * 2 @stream.stub(:send) @stream.should_receive(:send).exactly(3).times @@ -564,7 +587,7 @@ end it "should emit received headers via on(:headers)" do - headers, recv = {"header" => "value"}, nil + headers, recv = [["header", "value"]], nil @srv.on(:stream) do |stream| stream.on(:headers) {|h| recv = h} end @@ -585,16 +608,21 @@ @client_stream.data(payload) end - it "should emit received priority via on(:priority)" do - new_priority, recv = 15, 0 + it "should emit received priority parameters via on(:priority)" do + new_weight, new_dependency = 15, @client_stream.id + 2 + callback_called = false @srv.on(:stream) do |stream| stream.on(:priority) do |pri| - pri.should eq new_priority + callback_called = true + pri.is_a?(Hash).should be + pri[:weight].should eq new_weight + pri[:dependency].should eq new_dependency end end @client_stream.headers({"key" => "value"}) - @client_stream.reprioritize(new_priority) + @client_stream.reprioritize(weight: new_weight, dependency: new_dependency) + callback_called.should be end context "push" do @@ -604,7 +632,6 @@ @server_stream = stream end - # @srv << @frm.generate(SETTINGS) @client_stream.headers({"key" => "value"}) end @@ -643,7 +670,7 @@ end it "client: headers > active > headers > .. > data > close" do - order, headers = [], {} + order, headers = [], [] @client.on(:promise) do |push| order << :reserved @@ -654,7 +681,7 @@ push.on(:headers) do |h| order << :headers - headers.merge!(h) + headers += h end push.id.should be_even @@ -665,7 +692,7 @@ push.data("somedata") end - headers.should eq({"key" => "val", "key2" => "val2"}) + headers.should eq([["key", "val"], ["key2", "val2"]]) order.should eq [:reserved, :headers, :active, :headers, :half_close, :data, :close] end