Skip to content

Commit

Permalink
GH-15287: [Ruby] Merge column and add suffix in Table#join (#33654)
Browse files Browse the repository at this point in the history
# Rationale for this change

Current implementation is always preserve column of join key. It is convenient if columns are merged.

# What changes are included in this PR?

- Columns from left and right are marged if;
  - Join key is a String or a Symbol (<= incompatible Change)
  - Join key is nil (natural join) (<= change in unreleased feature)
- New options `left_suffix=""` and `right_suffix=""` are introduced.
  - If it is empty (by default), join key(s) do not change.
  - If it is not empty, the suffix is appended to join key(s).

# Are these changes tested?

Yes.

# Are there any user-facing changes?

There are incompatible change when join key is a String or a Symbol.

* Closes: #15287

Lead-authored-by: Hirokazu SUZUKI <heronshoes877@gmail.com>
Co-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
3 people committed Jan 17, 2023
1 parent ef98a97 commit 1a82720
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 14 deletions.
2 changes: 1 addition & 1 deletion c_glib/arrow-glib/compute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5195,7 +5195,7 @@ G_END_DECLS
arrow::Result<arrow::FieldRef>
garrow_field_reference_resolve_raw(const gchar *reference)
{
if (reference && reference[0] == '.') {
if (reference && (reference[0] == '.' || reference[0] == '[')) {
return arrow::FieldRef::FromDotPath(reference);
} else {
arrow::FieldRef arrow_reference(reference);
Expand Down
136 changes: 129 additions & 7 deletions ruby/red-arrow/lib/arrow/table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -472,18 +472,22 @@ def pack
#
# If both of `left_outputs` and `right_outputs` aren't
# specified, all columns in `self` and `right` are
# outputted.
# output.
# @param right_outputs [::Array<String, Symbol>] Output columns in
# `right`.
#
# If both of `left_outputs` and `right_outputs` aren't
# specified, all columns in `self` and `right` are
# outputted.
# output.
# @return [Arrow::Table]
# The joined `Arrow::Table`.
#
# @overload join(right, type: :inner, left_outputs: nil, right_outputs: nil)
# If key(s) are not supplied, common keys in self and right are used.
# If key(s) are not supplied, common keys in self and right are used
# (natural join).
#
# Column used as keys are merged and remain in left side
# when both of `left_outputs` and `right_outputs` are `nil`.
#
# @macro join_common_before
# @macro join_common_after
Expand All @@ -493,13 +497,19 @@ def pack
# @overload join(right, key, type: :inner, left_outputs: nil, right_outputs: nil)
# Join right by a key.
#
# Column used as keys are merged and remain in left side
# when both of `left_outputs` and `right_outputs` are `nil`.
#
# @macro join_common_before
# @param key [String, Symbol] A join key.
# @macro join_common_after
#
# @overload join(right, keys, type: :inner, left_outputs: nil, right_outputs: nil)
# @overload join(right, keys, type: :inner, left_suffix: "", right_suffix: "",
# left_outputs: nil, right_outputs: nil)
# Join right by keys.
#
# Column name can be renamed by appending `left_suffix` or `right_suffix`.
#
# @macro join_common_before
# @param keys [::Array<String, Symbol>] Join keys.
# @macro join_common_after
Expand All @@ -516,8 +526,16 @@ def pack
# @macro join_common_after
#
# @since 7.0.0
def join(right, keys=nil, type: :inner, left_outputs: nil, right_outputs: nil)
def join(right,
keys=nil,
type: :inner,
left_suffix: "",
right_suffix: "",
left_outputs: nil,
right_outputs: nil)
is_natural_join = keys.nil?
keys ||= (column_names & right.column_names)
type = JoinType.try_convert(type) || type
plan = ExecutePlan.new
left_node = plan.build_source_node(self)
right_node = plan.build_source_node(right)
Expand All @@ -533,21 +551,43 @@ def join(right, keys=nil, type: :inner, left_outputs: nil, right_outputs: nil)
hash_join_node_options = HashJoinNodeOptions.new(type,
left_keys,
right_keys)
use_manual_outputs = false
unless left_outputs.nil?
hash_join_node_options.left_outputs = left_outputs
use_manual_outputs = true
end
unless right_outputs.nil?
hash_join_node_options.right_outputs = right_outputs
use_manual_outputs = true
end
hash_join_node = plan.build_hash_join_node(left_node,
right_node,
hash_join_node_options)
type_nick = type.nick
is_filter_join = (type_nick.end_with?("-semi") or
type_nick.end_with?("-anti"))
if use_manual_outputs or is_filter_join
process_node = hash_join_node
elsif is_natural_join
process_node = join_merge_keys(plan, hash_join_node, right, keys)
elsif keys.is_a?(String) or keys.is_a?(Symbol)
process_node = join_merge_keys(plan, hash_join_node, right, [keys.to_s])
elsif !keys.is_a?(Hash) and (left_suffix != "" or right_suffix != "")
process_node = join_rename_keys(plan,
hash_join_node,
right,
keys,
left_suffix,
right_suffix)
else
process_node = hash_join_node
end
sink_node_options = SinkNodeOptions.new
plan.build_sink_node(hash_join_node, sink_node_options)
plan.build_sink_node(process_node, sink_node_options)
plan.validate
plan.start
plan.wait
reader = sink_node_options.get_reader(hash_join_node.output_schema)
reader = sink_node_options.get_reader(process_node.output_schema)
table = reader.read_all
share_input(table)
table
Expand Down Expand Up @@ -620,5 +660,87 @@ def ensure_raw_column(name, data)
raise ArgumentError, message
end
end

def join_merge_keys(plan, input_node, right, keys)
expressions = []
names = []
normalized_keys = {}
keys.each do |key|
normalized_keys[key.to_s] = true
end
key_to_outputs = {}
outputs = []
left_n_column_names = column_names.size
column_names.each_with_index do |name, i|
is_key = normalized_keys.include?(name)
output = {is_key: is_key, name: name, index: i, direction: :left}
outputs << output
key_to_outputs[name] = {left: output} if is_key
end
right.column_names.each_with_index do |name, i|
index = left_n_column_names + i
is_key = normalized_keys.include?(name)
output = {is_key: is_key, name: name, index: index, direction: :right}
outputs << output
key_to_outputs[name][:right] = output if is_key
end

outputs.each do |output|
if output[:is_key]
next if output[:direction] == :right
left_output = key_to_outputs[output[:name]][:left]
right_output = key_to_outputs[output[:name]][:right]
left_field = FieldExpression.new("[#{left_output[:index]}]")
right_field = FieldExpression.new("[#{right_output[:index]}]")
is_left_null = CallExpression.new("is_null", [left_field])
merge_column = CallExpression.new("if_else",
[
is_left_null,
right_field,
left_field,
])
expressions << merge_column
else
expressions << FieldExpression.new("[#{output[:index]}]")
end
names << output[:name]
end
project_node_options = ProjectNodeOptions.new(expressions, names)
plan.build_project_node(input_node, project_node_options)
end

def join_rename_keys(plan,
input_node,
right,
keys,
left_suffix,
right_suffix)
expressions = []
names = []
normalized_keys = {}
keys.each do |key|
normalized_keys[key.to_s] = true
end
left_n_column_names = column_names.size
column_names.each_with_index do |name, i|
expressions << FieldExpression.new("[#{i}]")
if normalized_keys.include?(name)
names << "#{name}#{left_suffix}"
else
names << name
end
end
right.column_names.each_with_index do |name, i|
index = left_n_column_names + i
expressions << FieldExpression.new("[#{index}]")
if normalized_keys.include?(name)
names << "#{name}#{right_suffix}"
else
names << name
end
end
project_node_options = ProjectNodeOptions.new(expressions, names)
plan.build_project_node(input_node, project_node_options)
end
end
end
Loading

0 comments on commit 1a82720

Please sign in to comment.