Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StorageIceberg and table function iceberg #45384

Merged
merged 44 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
92646e3
initial
ucasfl Jan 14, 2023
a8e1363
implement storage iceberg
ucasfl Jan 18, 2023
2fb32dc
fix and add test
ucasfl Jan 18, 2023
4dcd3cc
fix style
ucasfl Jan 18, 2023
01ae8f5
fix test
ucasfl Jan 18, 2023
c5d07dd
fix
ucasfl Jan 29, 2023
ed6fcfe
refactor
ucasfl Jan 29, 2023
9b517cd
fix conflict
ucasfl Jan 29, 2023
6d9dc83
fix
ucasfl Jan 29, 2023
fc2ce9e
refactor and unify storage data lake
ucasfl Jan 29, 2023
810d3e5
fix comment
ucasfl Jan 29, 2023
2911927
fix style
ucasfl Jan 29, 2023
bb95093
remove unused file
ucasfl Jan 29, 2023
c5581b6
fix
ucasfl Jan 30, 2023
a742c14
fix
ucasfl Jan 30, 2023
ffddc0d
fix conflict
ucasfl Jan 31, 2023
fd1ee98
fix style
ucasfl Jan 31, 2023
0dd8a61
fix conflict
ucasfl Feb 6, 2023
db15634
fix conflict
ucasfl Feb 10, 2023
37a850b
Merge branch 'master' of github.com:ClickHouse/ClickHouse into iceberg
ucasfl Feb 10, 2023
09d7ca2
fix
ucasfl Feb 10, 2023
9743a05
fix style
ucasfl Feb 10, 2023
d3dd942
refactor and get rid of s3
ucasfl Feb 13, 2023
c49a293
refactor and get rid of s3
ucasfl Feb 13, 2023
68748d6
fix
ucasfl Feb 13, 2023
e05b4e9
fix conflict
ucasfl Feb 14, 2023
f314518
fix
ucasfl Feb 15, 2023
e4e3adf
fix
ucasfl Feb 15, 2023
1468e9b
fix
ucasfl Feb 15, 2023
50c0693
fix
ucasfl Feb 15, 2023
18cf721
remove more redundant header files
ucasfl Feb 15, 2023
289c5c6
fix
ucasfl Feb 15, 2023
de11a05
remove redundant header
ucasfl Feb 15, 2023
ecc3997
fix conflict
ucasfl Feb 16, 2023
2968cdc
fix
ucasfl Feb 16, 2023
7f4c23e
fix
ucasfl Feb 16, 2023
eae73a1
fix
ucasfl Feb 16, 2023
a39f6f4
refactor
ucasfl Feb 17, 2023
561b575
fix style
ucasfl Feb 17, 2023
1d5b7eb
fix
ucasfl Feb 17, 2023
4b1d997
fix
ucasfl Feb 17, 2023
b3a9468
fix
ucasfl Feb 17, 2023
ebd88aa
Merge branch 'master' into iceberg
ucasfl Feb 17, 2023
bf6e6a8
Merge branch 'master' into iceberg
kssenii Feb 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/IO/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
# include <aws/core/auth/AWSCredentialsProviderChain.h>
# include <aws/core/auth/STSCredentialsProvider.h>
# include <aws/core/client/SpecifiedRetryableErrorsRetryStrategy.h>
# include <aws/core/http/HttpClientFactory.h>
# include <aws/core/platform/Environment.h>
# include <aws/core/platform/OSVersionInfo.h>
# include <aws/core/utils/HashingUtils.h>
# include <aws/core/utils/UUID.h>
# include <aws/core/utils/json/JsonSerializer.h>
# include <aws/core/utils/logging/LogMacros.h>
# include <aws/core/utils/logging/LogSystemInterface.h>
Expand Down Expand Up @@ -798,8 +801,49 @@ namespace S3
get_request_throttler,
put_request_throttler);
}
}

std::vector<String>
listFiles(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & prefix, const String & extension)
{
std::vector<String> res;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;

bool is_finished{false};

request.SetBucket(bucket);

request.SetPrefix(prefix);

while (!is_finished)
{
outcome = client.ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));

const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();

if (std::filesystem::path(filename).extension() == extension)
res.push_back(filename);
}

request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());

is_finished = !outcome.GetResult().GetIsTruncated();
}

return res;
}
}
}

#endif
Expand Down
2 changes: 2 additions & 0 deletions src/IO/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class ClientFactory
std::atomic<bool> s3_requests_logging_enabled;
};

std::vector<String>
listFiles(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & prefix, const String & extension);
}
#endif

Expand Down
50 changes: 25 additions & 25 deletions src/Processors/Formats/Impl/AvroRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,35 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}

class InputStreamReadBufferAdapter : public avro::InputStream
bool AvroInputStreamReadBufferAdapter::next(const uint8_t ** data, size_t * len)
ucasfl marked this conversation as resolved.
Show resolved Hide resolved
{
public:
explicit InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}

bool next(const uint8_t ** data, size_t * len) override
if (in.eof())
{
if (in.eof())
{
*len = 0;
return false;
}

*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();

in.position() += in.available();
return true;
*len = 0;
return false;
}

void backup(size_t len) override { in.position() -= len; }
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();

void skip(size_t len) override { in.tryIgnore(len); }
in.position() += in.available();
return true;
}

size_t byteCount() const override { return in.count(); }
void AvroInputStreamReadBufferAdapter::backup(size_t len)
{
in.position() -= len;
}

private:
ReadBuffer & in;
};
void AvroInputStreamReadBufferAdapter::skip(size_t len)
{
in.tryIgnore(len);
}

size_t AvroInputStreamReadBufferAdapter::byteCount() const
{
return in.count();
}

/// Insert value with conversion to the column of target type.
template <typename T>
Expand Down Expand Up @@ -757,7 +757,7 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_,

void AvroRowInputFormat::readPrefix()
{
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(*in));
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*in));
deserializer_ptr = std::make_unique<AvroDeserializer>(
output.getHeader(), file_reader_ptr->dataSchema(), format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default);
file_reader_ptr->init();
Expand Down Expand Up @@ -914,7 +914,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(

void AvroConfluentRowInputFormat::readPrefix()
{
input_stream = std::make_unique<InputStreamReadBufferAdapter>(*in);
input_stream = std::make_unique<AvroInputStreamReadBufferAdapter>(*in);
decoder = avro::binaryDecoder();
decoder->init(*input_stream);
}
Expand Down Expand Up @@ -971,7 +971,7 @@ NamesAndTypesList AvroSchemaReader::readSchema()
}
else
{
auto file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(in));
auto file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(in));
root_node = file_reader_ptr->dataSchema().root();
}

Expand Down
19 changes: 18 additions & 1 deletion src/Processors/Formats/Impl/AvroRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}

class AvroInputStreamReadBufferAdapter : public avro::InputStream
{
public:
explicit AvroInputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}

bool next(const uint8_t ** data, size_t * len) override;

void backup(size_t len) override;

void skip(size_t len) override;

size_t byteCount() const override;

private:
ReadBuffer & in;
};

class AvroDeserializer
{
public:
Expand Down Expand Up @@ -185,8 +202,8 @@ class AvroSchemaReader : public ISchemaReader

NamesAndTypesList readSchema() override;

static DataTypePtr avroNodeToDataType(avro::NodePtr node);
private:
DataTypePtr avroNodeToDataType(avro::NodePtr node);

bool confluent;
const FormatSettings format_settings;
Expand Down