Permalink
Browse files

AVRO-1260. Ruby: Improve read performance. Contributed by Martin Klep…

…pmann.

git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1451689 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 882b0b3 commit 84ad7a72bbf42ae7f1e22fceddc9edf48b81655d @cutting cutting committed Mar 1, 2013
Showing with 172 additions and 158 deletions.
  1. +2 −0 CHANGES.txt
  2. +78 −86 lang/ruby/lib/avro/io.rb
  3. +3 −2 lang/ruby/lib/avro/protocol.rb
  4. +73 −54 lang/ruby/lib/avro/schema.rb
  5. +16 −16 lang/ruby/test/random_data.rb
View
@@ -6,6 +6,8 @@ Trunk (not yet released)
IMPROVEMENTS
+ AVRO-1260. Ruby: Improve read performance. (Martin Kleppmann via cutting)
+
BUG FIXES
Avro 1.7.4 (22 February 2012)
View
@@ -220,51 +220,43 @@ def write(datum)
end
class DatumReader
- def self.check_props(schema_one, schema_two, prop_list)
- prop_list.all? do |prop|
- schema_one.send(prop) == schema_two.send(prop)
- end
- end
-
def self.match_schemas(writers_schema, readers_schema)
- w_type = writers_schema.type
- r_type = readers_schema.type
+ w_type = writers_schema.type_sym
+ r_type = readers_schema.type_sym
# This conditional is begging for some OO love.
- if w_type == 'union' || r_type == 'union'
+ if w_type == :union || r_type == :union
return true
end
if w_type == r_type
- if Schema::PRIMITIVE_TYPES.include?(w_type) &&
- Schema::PRIMITIVE_TYPES.include?(r_type)
- return true
- end
+ return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
case r_type
- when 'record'
- return check_props(writers_schema, readers_schema, [:fullname])
- when 'error'
- return check_props(writers_schema, readers_schema, [:fullname])
- when 'request'
+ when :record
+ return writers_schema.fullname == readers_schema.fullname
+ when :error
+ return writers_schema.fullname == readers_schema.fullname
+ when :request
return true
- when 'fixed'
- return check_props(writers_schema, readers_schema, [:fullname, :size])
- when 'enum'
- return check_props(writers_schema, readers_schema, [:fullname])
- when 'map'
- return check_props(writers_schema.values, readers_schema.values, [:type])
- when 'array'
- return check_props(writers_schema.items, readers_schema.items, [:type])
+ when :fixed
+ return writers_schema.fullname == readers_schema.fullname &&
+ writers_schema.size == readers_schema.size
+ when :enum
+ return writers_schema.fullname == readers_schema.fullname
+ when :map
+ return writers_schema.values.type == readers_schema.values.type
+ when :array
+ return writers_schema.items.type == readers_schema.items.type
end
end
# Handle schema promotion
- if w_type == 'int' && ['long', 'float', 'double'].include?(r_type)
+ if w_type == :int && [:long, :float, :double].include?(r_type)
return true
- elsif w_type == 'long' && ['float', 'double'].include?(r_type)
+ elsif w_type == :long && [:float, :double].include?(r_type)
return true
- elsif w_type == 'float' && r_type == 'double'
+ elsif w_type == :float && r_type == :double
return true
end
@@ -291,7 +283,7 @@ def read_data(writers_schema, readers_schema, decoder)
# schema resolution: reader's schema is a union, writer's
# schema is not
- if writers_schema.type != 'union' && readers_schema.type == 'union'
+ if writers_schema.type_sym != :union && readers_schema.type_sym == :union
rs = readers_schema.schemas.find{|s|
self.class.match_schemas(writers_schema, s)
}
@@ -301,21 +293,21 @@ def read_data(writers_schema, readers_schema, decoder)
# function dispatch for reading data based on type of writer's
# schema
- case writers_schema.type
- when 'null'; decoder.read_null
- when 'boolean'; decoder.read_boolean
- when 'string'; decoder.read_string
- when 'int'; decoder.read_int
- when 'long'; decoder.read_long
- when 'float'; decoder.read_float
- when 'double'; decoder.read_double
- when 'bytes'; decoder.read_bytes
- when 'fixed'; read_fixed(writers_schema, readers_schema, decoder)
- when 'enum'; read_enum(writers_schema, readers_schema, decoder)
- when 'array'; read_array(writers_schema, readers_schema, decoder)
- when 'map'; read_map(writers_schema, readers_schema, decoder)
- when 'union'; read_union(writers_schema, readers_schema, decoder)
- when 'record', 'error', 'request'; read_record(writers_schema, readers_schema, decoder)
+ case writers_schema.type_sym
+ when :null; decoder.read_null
+ when :boolean; decoder.read_boolean
+ when :string; decoder.read_string
+ when :int; decoder.read_int
+ when :long; decoder.read_long
+ when :float; decoder.read_float
+ when :double; decoder.read_double
+ when :bytes; decoder.read_bytes
+ when :fixed; read_fixed(writers_schema, readers_schema, decoder)
+ when :enum; read_enum(writers_schema, readers_schema, decoder)
+ when :array; read_array(writers_schema, readers_schema, decoder)
+ when :map; read_map(writers_schema, readers_schema, decoder)
+ when :union; read_union(writers_schema, readers_schema, decoder)
+ when :record, :error, :request; read_record(writers_schema, readers_schema, decoder)
else
raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
end
@@ -416,34 +408,34 @@ def read_record(writers_schema, readers_schema, decoder)
def read_default_value(field_schema, default_value)
# Basically a JSON Decoder?
- case field_schema.type
- when 'null'
+ case field_schema.type_sym
+ when :null
return nil
- when 'boolean'
+ when :boolean
return default_value
- when 'int', 'long'
+ when :int, :long
return Integer(default_value)
- when 'float', 'double'
+ when :float, :double
return Float(default_value)
- when 'enum', 'fixed', 'string', 'bytes'
+ when :enum, :fixed, :string, :bytes
return default_value
- when 'array'
+ when :array
read_array = []
default_value.each do |json_val|
item_val = read_default_value(field_schema.items, json_val)
read_array << item_val
end
return read_array
- when 'map'
+ when :map
read_map = {}
default_value.each do |key, json_val|
map_val = read_default_value(field_schema.values, json_val)
read_map[key] = map_val
end
return read_map
- when 'union'
+ when :union
return read_default_value(field_schema.schemas[0], default_value)
- when 'record', 'error'
+ when :record, :error
read_record = {}
field_schema.fields.each do |field|
json_val = default_value[field.name]
@@ -459,37 +451,37 @@ def read_default_value(field_schema, default_value)
end
def skip_data(writers_schema, decoder)
- case writers_schema.type
- when 'null'
+ case writers_schema.type_sym
+ when :null
decoder.skip_null
- when 'boolean'
+ when :boolean
decoder.skip_boolean
- when 'string'
+ when :string
decoder.skip_string
- when 'int'
+ when :int
decoder.skip_int
- when 'long'
+ when :long
decoder.skip_long
- when 'float'
+ when :float
decoder.skip_float
- when 'double'
+ when :double
decoder.skip_double
- when 'bytes'
+ when :bytes
decoder.skip_bytes
- when 'fixed'
+ when :fixed
skip_fixed(writers_schema, decoder)
- when 'enum'
+ when :enum
skip_enum(writers_schema, decoder)
- when 'array'
+ when :array
skip_array(writers_schema, decoder)
- when 'map'
+ when :map
skip_map(writers_schema, decoder)
- when 'union'
+ when :union
skip_union(writers_schema, decoder)
- when 'record', 'error', 'request'
+ when :record, :error, :request
skip_record(writers_schema, decoder)
else
- raise AvroError, "Unknown schema type: #{schm.type}"
+ raise AvroError, "Unknown schema type: #{writers_schema.type}"
end
end
@@ -552,21 +544,21 @@ def write_data(writers_schema, datum, encoder)
end
# function dispatch to write datum
- case writers_schema.type
- when 'null'; encoder.write_null(datum)
- when 'boolean'; encoder.write_boolean(datum)
- when 'string'; encoder.write_string(datum)
- when 'int'; encoder.write_int(datum)
- when 'long'; encoder.write_long(datum)
- when 'float'; encoder.write_float(datum)
- when 'double'; encoder.write_double(datum)
- when 'bytes'; encoder.write_bytes(datum)
- when 'fixed'; write_fixed(writers_schema, datum, encoder)
- when 'enum'; write_enum(writers_schema, datum, encoder)
- when 'array'; write_array(writers_schema, datum, encoder)
- when 'map'; write_map(writers_schema, datum, encoder)
- when 'union'; write_union(writers_schema, datum, encoder)
- when 'record', 'error', 'request'; write_record(writers_schema, datum, encoder)
+ case writers_schema.type_sym
+ when :null; encoder.write_null(datum)
+ when :boolean; encoder.write_boolean(datum)
+ when :string; encoder.write_string(datum)
+ when :int; encoder.write_int(datum)
+ when :long; encoder.write_long(datum)
+ when :float; encoder.write_float(datum)
+ when :double; encoder.write_double(datum)
+ when :bytes; encoder.write_bytes(datum)
+ when :fixed; write_fixed(writers_schema, datum, encoder)
+ when :enum; write_enum(writers_schema, datum, encoder)
+ when :array; write_array(writers_schema, datum, encoder)
+ when :map; write_map(writers_schema, datum, encoder)
+ when :union; write_union(writers_schema, datum, encoder)
+ when :record, :error, :request; write_record(writers_schema, datum, encoder)
else
raise AvroError.new("Unknown type: #{writers_schema.type}")
end
@@ -17,6 +17,7 @@
module Avro
class Protocol
VALID_TYPE_SCHEMA_TYPES = Set.new(%w[enum record error fixed])
+ VALID_TYPE_SCHEMA_TYPES_SYM = Set.new(VALID_TYPE_SCHEMA_TYPES.map(&:to_sym))
class ProtocolParseError < Avro::AvroError; end
attr_reader :name, :namespace, :types, :messages, :md5
@@ -71,7 +72,7 @@ def parse_types(types, type_names)
# FIXME adding type.name to type_names is not defined in the
# spec. Possible bug in the python impl and the spec.
type_object = Schema.real_parse(type, type_names)
- unless VALID_TYPE_SCHEMA_TYPES.include?(type_object.type)
+ unless VALID_TYPE_SCHEMA_TYPES_SYM.include?(type_object.type_sym)
msg = "Type #{type} not an enum, record, fixed or error."
raise ProtocolParseError, msg
end
@@ -142,7 +143,7 @@ def parse_request(request, names)
unless request.is_a?(Array)
raise ProtocolParseError, "Request property not an Array: #{request.inspect}"
end
- Schema::RecordSchema.new(nil, nil, request, names, 'request')
+ Schema::RecordSchema.new(nil, nil, request, names, :request)
end
def parse_response(response, names)
Oops, something went wrong.

0 comments on commit 84ad7a7

Please sign in to comment.