Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions ruby/red-arrow/lib/arrow/table-loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
module Arrow
class TableLoader
class << self
def load(path, options={})
new(path, options).load
def load(output, options={})
new(output, options).load
end
end

def initialize(path, options={})
path = path.to_path if path.respond_to?(:to_path)
@path = path
def initialize(output, options={})
output = output.to_path if output.respond_to?(:to_path)
@output = output
@options = options
fill_options
end
Expand All @@ -50,7 +50,7 @@ def load
__send__(custom_load_method)
else
# For backward compatibility.
__send__(custom_load_method, @path)
__send__(custom_load_method, @output)
end
end

Expand All @@ -60,11 +60,15 @@ def fill_options
return
end

extension = PathExtension.new(@path)
info = extension.extract
if @output.is_a?(Buffer)
info = {}
else
extension = PathExtension.new(@output)
info = extension.extract
end
format = info[:format]
@options = @options.dup
if respond_to?("load_as_#{format}", true)
if format and respond_to?("load_as_#{format}", true)
@options[:format] ||= format.to_sym
else
@options[:format] ||= :arrow
Expand All @@ -74,6 +78,14 @@ def fill_options
end
end

def open_input_stream
if @output.is_a?(Buffer)
BufferInputStream.new(@output)
else
MemoryMappedInputStream.new(@output)
end
end

def load_raw(input, reader)
schema = reader.schema
chunked_arrays = []
Expand All @@ -100,7 +112,7 @@ def load_as_arrow
RecordBatchStreamReader,
]
reader_class_candidates.each do |reader_class_candidate|
input = MemoryMappedInputStream.new(@path)
input = open_input_stream
begin
reader = reader_class_candidate.new(input)
rescue Arrow::Error
Expand All @@ -114,20 +126,20 @@ def load_as_arrow
end

def load_as_batch
input = MemoryMappedInputStream.new(@path)
input = open_input_stream
reader = RecordBatchFileReader.new(input)
load_raw(input, reader)
end

def load_as_stream
input = MemoryMappedInputStream.new(@path)
input = open_input_stream
reader = RecordBatchStreamReader.new(input)
load_raw(input, reader)
end

if Arrow.const_defined?(:ORCFileReader)
def load_as_orc
input = MemoryMappedInputStream.new(@path)
input = open_input_stream
reader = ORCFileReader.new(input)
field_indexes = @options[:field_indexes]
reader.set_field_indexes(field_indexes) if field_indexes
Expand All @@ -140,11 +152,15 @@ def load_as_orc
def load_as_csv
options = @options.dup
options.delete(:format)
CSVLoader.load(Pathname.new(@path), options)
if @output.is_a?(Buffer)
CSVLoader.load(@output.data.to_s, options)
else
CSVLoader.load(Pathname.new(@output), options)
end
end

def load_as_feather
input = MemoryMappedInputStream.new(@path)
input = open_input_stream
reader = FeatherFileReader.new(input)
table = reader.read
table.instance_variable_set(:@input, input)
Expand Down
66 changes: 38 additions & 28 deletions ruby/red-arrow/lib/arrow/table-saver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
module Arrow
class TableSaver
class << self
def save(table, path, options={})
new(table, path, options).save
def save(table, output, options={})
new(table, output, options).save
end
end

def initialize(table, path, options={})
def initialize(table, output, options={})
@table = table
path = path.to_path if path.respond_to?(:to_path)
@path = path
output = output.to_path if output.respond_to?(:to_path)
@output = output
@options = options
fill_options
end
Expand All @@ -51,7 +51,7 @@ def save
__send__(custom_save_method)
else
# For backward compatibility.
__send__(custom_save_method, @path)
__send__(custom_save_method, @output)
end
end

Expand All @@ -61,11 +61,15 @@ def fill_options
return
end

extension = PathExtension.new(@path)
info = extension.extract
if @output.is_a?(Buffer)
info = {}
else
extension = PathExtension.new(@output)
info = extension.extract
end
format = info[:format]
@options = @options.dup
if respond_to?("save_as_#{format}", true)
if format and respond_to?("save_as_#{format}", true)
@options[:format] ||= format.to_sym
else
@options[:format] ||= :arrow
Expand All @@ -75,8 +79,30 @@ def fill_options
end
end

def open_raw_output_stream(&block)
if @output.is_a?(Buffer)
BufferOutputStream.open(@output, &block)
else
FileOutputStream.open(@output, false, &block)
end
end

def open_output_stream(&block)
compression = @options[:compression]
if compression
codec = Codec.new(compression)
open_raw_output_stream do |raw_output|
CompressedOutputStream.open(codec, raw_output) do |output|
yield(output)
end
end
else
open_raw_output_stream(&block)
end
end

def save_raw(writer_class)
FileOutputStream.open(@path, false) do |output|
open_output_stream do |output|
writer_class.open(output, @table.schema) do |writer|
writer.write_table(@table)
end
Expand All @@ -95,24 +121,8 @@ def save_as_stream
save_raw(RecordBatchStreamWriter)
end

def open_output
compression = @options[:compression]
if compression
codec = Codec.new(compression)
FileOutputStream.open(@path, false) do |raw_output|
CompressedOutputStream.open(codec, raw_output) do |output|
yield(output)
end
end
else
::File.open(@path, "w") do |output|
yield(output)
end
end
end

def save_as_csv
open_output do |output|
open_output_stream do |output|
csv = CSV.new(output)
names = @table.schema.fields.collect(&:name)
csv << names
Expand All @@ -125,7 +135,7 @@ def save_as_csv
end

def save_as_feather
FileOutputStream.open(@path, false) do |output|
open_output_stream do |output|
FeatherFileWriter.open(output) do |writer|
writer.write(@table)
end
Expand Down
Loading