Skip to content

Commit

Permalink
Merge pull request ClickHouse#51876 from evillique/fix-mongodb-inserts
Browse files Browse the repository at this point in the history
Fix inserts into MongoDB tables
  • Loading branch information
robot-ch-test-poll committed Jul 16, 2023
2 parents d900aa4 + 7e6551d commit 3b0a404
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
60 changes: 57 additions & 3 deletions src/Storages/StorageMongoDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <unordered_set>

#include <DataTypes/DataTypeArray.h>

namespace DB
{

Expand Down Expand Up @@ -127,9 +129,7 @@ class StorageMongoDBSink : public SinkToStorage

for (const auto j : collections::range(0, num_cols))
{
WriteBufferFromOwnString ostr;
data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{});
document->add(data_names[j], ostr.str());
insertValueIntoMongoDB(*document, data_names[j], *data_types[j], *columns[j], i);
}

documents.push_back(std::move(document));
Expand All @@ -151,6 +151,60 @@ class StorageMongoDBSink : public SinkToStorage
}

private:

void insertValueIntoMongoDB(
Poco::MongoDB::Document & document,
const std::string & name,
const IDataType & data_type,
const IColumn & column,
size_t idx)
{
WhichDataType which(data_type);

if (which.isArray())
{
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();

size_t offset = offsets[idx - 1];
size_t next_offset = offsets[idx];

const IColumn & nested_column = column_array.getData();

const auto * array_type = assert_cast<const DataTypeArray *>(&data_type);
const DataTypePtr & nested_type = array_type->getNestedType();

Poco::MongoDB::Array::Ptr array = new Poco::MongoDB::Array();
for (size_t i = 0; i + offset < next_offset; ++i)
{
insertValueIntoMongoDB(*array, Poco::NumberFormatter::format(i), *nested_type, nested_column, i + offset);
}

document.add(name, array);
return;
}

/// MongoDB does not support UInt64 type, so just cast it to Int64
if (which.isNativeUInt())
document.add(name, static_cast<Poco::Int64>(column.getUInt(idx)));
else if (which.isNativeInt())
document.add(name, static_cast<Poco::Int64>(column.getInt(idx)));
else if (which.isFloat32())
document.add(name, static_cast<Float64>(column.getFloat32(idx)));
else if (which.isFloat64())
document.add(name, static_cast<Float64>(column.getFloat64(idx)));
else if (which.isDate())
document.add(name, Poco::Timestamp(DateLUT::instance().fromDayNum(DayNum(column.getUInt(idx))) * 1000000));
else if (which.isDateTime())
document.add(name, Poco::Timestamp(column.getUInt(idx) * 1000000));
else
{
WriteBufferFromOwnString ostr;
data_type.getDefaultSerialization()->serializeText(column, idx, ostr, FormatSettings{});
document.add(name, ostr.str());
}
}

String collection_name;
String db_name;
StorageMetadataPtr metadata_snapshot;
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/test_storage_mongodb/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ def test_arrays(started_cluster):
== "[]\n"
)

# Test INSERT SELECT
node.query("INSERT INTO arrays_mongo_table SELECT * FROM arrays_mongo_table")

assert node.query("SELECT COUNT() FROM arrays_mongo_table") == "200\n"
assert node.query("SELECT COUNT(DISTINCT *) FROM arrays_mongo_table") == "100\n"

node.query("DROP TABLE arrays_mongo_table")
arrays_mongo_table.drop()

Expand Down

0 comments on commit 3b0a404

Please sign in to comment.