Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatumReader as a generic datum reader #64

Closed
Closed
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 51 additions & 24 deletions lang/ruby/lib/avro/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -45,7 +45,7 @@ def initialize(reader)
def byte!
@reader.read(1).unpack('C').first
end

def read_null
# null is written as zero byte's
nil
Expand Down Expand Up @@ -106,6 +106,38 @@ def read(len)
@reader.read(len)
end

def read_array_start
read_item_count
end

def read_array_next
read_item_count
end

def read_map_start
read_item_count
end

def read_map_next
read_item_count
end

def read_index
read_long
end

def read_enum
read_int
end

def read_item_count
block_count = read_long
if block_count < 0
block_count = -block_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the block count is negative, then the size of the block in bytes is encoded afterwards. It looks like that was accidentally removed, which breaks reads because the size would be read in place of the next data value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! I had accidentally broken it. It's fixed now.

end
block_count
end

def skip_null
nil
end
Expand Down Expand Up @@ -159,7 +191,7 @@ def write_null(datum)
nil
end

# a boolean is written as a single byte
# a boolean is written as a single byte
# whose value is either 0 (false) or 1 (true).
def write_boolean(datum)
on_disk = datum ? 1.chr : 0.chr
Expand Down Expand Up @@ -318,7 +350,7 @@ def read_fixed(writers_schema, readers_schema, decoder)
end

def read_enum(writers_schema, readers_schema, decoder)
index_of_symbol = decoder.read_int
index_of_symbol = decoder.read_enum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this pass the writer's schema so that the JSON decoder can find the appropriate offset?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. As per specs An enum is encoded by a int, representing the zero-based position of the symbol in the schema..

read_symbol = writers_schema.symbols[index_of_symbol]

# TODO(jmhodges): figure out what unset means for resolution
Expand All @@ -332,57 +364,52 @@ def read_enum(writers_schema, readers_schema, decoder)

def read_array(writers_schema, readers_schema, decoder)
read_items = []
block_count = decoder.read_long
while block_count != 0
if block_count < 0
block_count = -block_count
block_size = decoder.read_long
end
block_count.times do
items_count = decoder.read_array_start
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why rename this to items_count? The spec refers to this as the "block's count" so I think the original name is more clear.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it back.

while items_count != 0
items_count.times do
read_items << read_data(writers_schema.items,
readers_schema.items,
decoder)
end
block_count = decoder.read_long
items_count = decoder.read_array_next
end

read_items
end

def read_map(writers_schema, readers_schema, decoder)
read_items = {}
block_count = decoder.read_long
while block_count != 0
if block_count < 0
block_count = -block_count
block_size = decoder.read_long
end
block_count.times do
items_count = decoder.read_map_start
while items_count != 0
items_count.times do
key = decoder.read_string
read_items[key] = read_data(writers_schema.values,
readers_schema.values,
decoder)
end
block_count = decoder.read_long
items_count = decoder.read_map_next
end

read_items
end

def read_union(writers_schema, readers_schema, decoder)
index_of_schema = decoder.read_long
index_of_schema = decoder.read_index
selected_writers_schema = writers_schema.schemas[index_of_schema]

read_data(selected_writers_schema, readers_schema, decoder)
end

def read_field_data(field, readers_field, decoder)
read_data(field.type, readers_field.type, decoder)
end

def read_record(writers_schema, readers_schema, decoder)
readers_fields_hash = readers_schema.fields_hash
read_record = {}
writers_schema.fields.each do |field|
if readers_field = readers_fields_hash[field.name]
field_val = read_data(field.type, readers_field.type, decoder)
read_record[field.name] = field_val
read_record[field.name] = read_field_data(field, readers_field, decoder)
else
skip_data(field.type, decoder)
end
Expand Down