Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,31 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedAr
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
offsets_data.reserve(arrow_column->length());

for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
uint64_t start_offset = 0u;

for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
auto arrow_offsets_array = list_chunk.offsets();
auto & arrow_offsets = dynamic_cast<arrow::Int32Array &>(*arrow_offsets_array);
auto start = offsets_data.back();

/*
* It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks.
* When it is shared, the offsets will be monotonically increasing. Otherwise, the offsets will be zero based.
* In order to account for both cases, the starting offset is updated whenever a zero-based offset is found.
* More info can be found in: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and
* https://github.com/ClickHouse/ClickHouse/pull/43297
* */
if (list_chunk.offset() == 0)
{
start_offset = offsets_data.back();
}

for (int64_t i = 1; i < arrow_offsets.length(); ++i)
offsets_data.emplace_back(start + arrow_offsets.Value(i));
{
auto offset = arrow_offsets.Value(i);
offsets_data.emplace_back(start_offset + offset);
}
}
return offsets_column;
}
Expand Down Expand Up @@ -316,8 +333,23 @@ static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Array> chunk = list_chunk.values();
array_vector.emplace_back(std::move(chunk));

/*
* It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks.
* Therefore, simply appending arrow::ListArray::values() could lead to duplicated data to be appended.
* To properly handle this, arrow::ListArray::values() needs to be sliced based on the chunk offsets.
* arrow::ListArray::Flatten does that. More info on: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and
* https://github.com/ClickHouse/ClickHouse/pull/43297
* */
auto flatten_result = list_chunk.Flatten();
if (flatten_result.ok())
{
array_vector.emplace_back(flatten_result.ValueOrDie());
}
else
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to flatten chunk '{}' of column of type '{}' ", chunk_i, arrow_column->type()->id());
}
}
return std::make_shared<arrow::ChunkedArray>(array_vector);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Parquet
3d94071a2fe62a3b3285f170ca6f42e5 -
70000
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

echo "Parquet"

# File generated with the below script

#import pyarrow as pa
#import pyarrow.parquet as pq
#import random
#
#
#def gen_array(offset):
# array = []
# array_length = random.randint(0, 9)
# for i in range(array_length):
# array.append(i + offset)
#
# return array
#
#
#def gen_arrays(number_of_arrays):
# list_of_arrays = []
# for i in range(number_of_arrays):
# list_of_arrays.append(gen_array(i))
# return list_of_arrays
#
#arr = pa.array(gen_arrays(70000))
#table = pa.table([arr], ["arr"])
#pq.write_table(table, "int-list-zero-based-chunked-array.parquet")

DATA_FILE=$CUR_DIR/data_parquet/int-list-zero-based-chunked-array.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (arr Array(Int64)) ENGINE = Memory"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load"
${CLICKHOUSE_CLIENT} --query="drop table parquet_load"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Parquet
e1cfe4265689ead763b18489b363344d -
39352
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

echo "Parquet"

DATA_FILE=$CUR_DIR/data_parquet/list_monotonically_increasing_offsets.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (list Array(Int64), json Nullable(String)) ENGINE = Memory"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load"
${CLICKHOUSE_CLIENT} --query="drop table parquet_load"
Binary file not shown.
Binary file not shown.