Skip to content

Commit

Permalink
Merge pull request #56304 from yariks5s/npy_data_caching
Browse files Browse the repository at this point in the history
Implement countRows for Npy data format
  • Loading branch information
Avogar committed Nov 16, 2023
2 parents 1831ecc + 12a8a38 commit 8e78d4c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 1 deletion.
18 changes: 17 additions & 1 deletion src/Processors/Formats/Impl/NpyRowInputFormat.cpp
Expand Up @@ -303,6 +303,17 @@ NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params par
nested_type = getNestedType(types[0]);
}

size_t NpyRowInputFormat::countRows(size_t max_block_size)
{
size_t count;
if (counted_rows + max_block_size <= size_t(header.shape[0]))
count = max_block_size;
else
count = header.shape[0] - counted_rows;
counted_rows += count;
return count;
}

template <typename ColumnValue, typename DataValue>
void NpyRowInputFormat::readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianness)
{
Expand Down Expand Up @@ -445,13 +456,18 @@ NpySchemaReader::NpySchemaReader(ReadBuffer & in_)

NamesAndTypesList NpySchemaReader::readSchema()
{
NumpyHeader header = parseHeader(in);
header = parseHeader(in);
DataTypePtr nested_type = getDataTypeFromNumpyType(header.numpy_type);
DataTypePtr result_type = createNestedArrayType(nested_type, header.shape.size());

return {{"array", result_type}};
}

std::optional<size_t> NpySchemaReader::readNumberOrRows()
{
return header.shape[0];
}

void registerInputFormatNpy(FormatFactory & factory)
{
factory.registerInputFormat("Npy", [](
Expand Down
6 changes: 6 additions & 0 deletions src/Processors/Formats/Impl/NpyRowInputFormat.h
Expand Up @@ -29,6 +29,9 @@ class NpyRowInputFormat final : public IRowInputFormat
String getName() const override { return "NpyRowInputFormat"; }

private:
bool supportsCountRows() const override { return true; }
size_t countRows(size_t max_block_size) override;

void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension &) override;
void readData(MutableColumns & columns);
Expand All @@ -54,6 +57,7 @@ class NpyRowInputFormat final : public IRowInputFormat

DataTypePtr nested_type;
NumpyHeader header;
size_t counted_rows = 0;
};

class NpySchemaReader : public ISchemaReader
Expand All @@ -62,7 +66,9 @@ class NpySchemaReader : public ISchemaReader
explicit NpySchemaReader(ReadBuffer & in_);

private:
std::optional<size_t> readNumberOrRows() override;
NamesAndTypesList readSchema() override;
NumpyHeader header;
};

}
9 changes: 9 additions & 0 deletions tests/queries/0_stateless/02908_Npy_files_caching.reference
@@ -0,0 +1,9 @@
3
3
3
array Int64
3
1000000
1000000
array Int64
1000000
19 changes: 19 additions & 0 deletions tests/queries/0_stateless/02908_Npy_files_caching.sh
@@ -0,0 +1,19 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/one_dim.npy') settings optimize_count_from_files=0"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/one_dim.npy') settings optimize_count_from_files=1"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/one_dim.npy', auto, 'array Int64') settings optimize_count_from_files=1"
$CLICKHOUSE_LOCAL -nm -q "
desc file('$CURDIR/data_npy/one_dim.npy');
select number_of_rows from system.schema_inference_cache where format='Npy';
"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/npy_big.npy') settings optimize_count_from_files=0"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/npy_big.npy') settings optimize_count_from_files=1"
$CLICKHOUSE_LOCAL -nm -q "
desc file('$CURDIR/data_npy/npy_big.npy');
select number_of_rows from system.schema_inference_cache where format='Npy';
"
Binary file added tests/queries/0_stateless/data_npy/npy_big.npy
Binary file not shown.

0 comments on commit 8e78d4c

Please sign in to comment.