Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ucasfl committed Feb 17, 2023
1 parent eae73a1 commit a39f6f4
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 73 deletions.
103 changes: 40 additions & 63 deletions src/Storages/IStorageDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,125 +5,102 @@
#if USE_AWS_S3

# include <Common/logger_useful.h>

# include <QueryPipeline/Pipe.h>

# include <Storages/IStorage.h>
# include <Storages/StorageS3.h>

# include <filesystem>

# include <fmt/format.h>

# include <IO/S3/URI.h>

namespace DB
{

template <typename Name, typename MetadataParser>
class IStorageDataLake : public IStorage
template <typename Storage, typename Name, typename MetadataParser>
class IStorageDataLake : public Storage
{
public:
using Configuration = StorageS3::Configuration;
using Configuration = typename Storage::Configuration;
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
IStorageDataLake(
const StorageS3::Configuration & configuration_,
const typename Storage::Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_}
, log(&Poco::Logger::get(fmt::format("Storage{}({})", name, table_id_.table_name)))
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);

auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, log);

if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(new_configuration, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);

storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);

s3engine = std::make_shared<StorageS3>(
new_configuration,
: Storage(
getAdjustedConfiguration(
context_, Storage::updateConfiguration(context_, configuration_), &Poco::Logger::get("Storage" + String(name))),
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
format_settings_)
{
}

static constexpr auto name = Name::name;
String getName() const override { return name; }

// Reads latest version of Lake Table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override
{
StorageS3::updateS3Configuration(context, base_configuration);

return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}

static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
typename Storage::Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
StorageS3::updateS3Configuration(ctx, configuration);
auto new_configuration = getAdjustedS3Configuration(ctx, configuration, &Poco::Logger::get("Storage" + String(name)));
Storage::updateConfiguration(ctx, configuration);

auto new_configuration = getAdjustedConfiguration(ctx, configuration, &Poco::Logger::get("Storage" + String(name)));

return StorageS3::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
return Storage::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
}

static StorageS3::Configuration
getAdjustedS3Configuration(const ContextPtr & context, StorageS3::Configuration & configuration, Poco::Logger * log)
static typename Storage::Configuration
getAdjustedConfiguration(const ContextPtr & context, const typename Storage::Configuration & configuration, Poco::Logger * log)
{
MetadataParser parser{configuration, context};

auto keys = parser.getFiles();
String new_uri = std::filesystem::path(configuration.url.uri.toString()) / Name::data_directory_prefix
/ MetadataParser::generateQueryFromKeys(keys, configuration.format);

StorageS3::Configuration new_configuration(configuration);
typename Storage::Configuration new_configuration(configuration);

/// The only S3 related stuff remain: S3::URI
new_configuration.url = S3::URI(new_uri);

LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, new_uri);

return new_configuration;
}


static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
static typename Storage::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
{
auto configuration = StorageS3::getConfiguration(engine_args, local_context, false /* get_format_from_file */);
auto configuration = Storage::getConfiguration(engine_args, local_context, false /* get_format_from_file */);

if (configuration.format == "auto")
configuration.format = "Parquet";

return configuration;
}

private:
StorageS3::Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;
SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName());
}

void truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/,
TableExclusiveLockHolder &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}

NamesAndTypesList getVirtuals() const override { return {}; }
};

}
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/StorageDeltaLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ struct StorageDeltaLakeName
static constexpr auto data_directory_prefix = "";
};

using StorageDeltaLake = IStorageDataLake<StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
using StorageDeltaLake
= IStorageDataLake<StorageS3, StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}

#endif
3 changes: 2 additions & 1 deletion src/Storages/StorageHudi.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ struct StorageHudiName
static constexpr auto data_directory_prefix = "";
};

using StorageHudi = IStorageDataLake<StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
using StorageHudi
= IStorageDataLake<StorageS3, StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}

#endif
4 changes: 3 additions & 1 deletion src/Storages/StorageIceberg.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>

namespace DB
{
Expand Down Expand Up @@ -43,7 +44,8 @@ struct StorageIcebergName
static constexpr auto data_directory_prefix = "data";
};

using StorageIceberg = IStorageDataLake<StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
using StorageIceberg
= IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}

#endif
12 changes: 12 additions & 0 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,18 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
}


StorageS3::Configuration StorageS3::updateConfiguration(ContextPtr local_context, const StorageS3::Configuration & configuration)
{
StorageS3::Configuration new_configuration(configuration);
updateS3Configuration(local_context, new_configuration);
return new_configuration;
}

void StorageS3::updateConfiguration(ContextPtr local_context, StorageS3::Configuration & configuration)
{
updateS3Configuration(local_context, configuration);
}

void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::Configuration & upd)
{
auto settings = ctx->getStorageS3Settings().getSettings(upd.url.uri.toString());
Expand Down
17 changes: 10 additions & 7 deletions src/Storages/StorageS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,22 @@ class StorageS3 : public IStorage, WithContext

bool supportsPartitionBy() const override;

static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);

using ObjectInfos = StorageS3Source::ObjectInfos;

static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);

static SchemaCache & getSchemaCache(const ContextPtr & ctx);

static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);

static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
ObjectInfos * object_infos = nullptr);

static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);

static SchemaCache & getSchemaCache(const ContextPtr & ctx);

static void updateS3Configuration(ContextPtr, Configuration &);
static StorageS3::Configuration updateConfiguration(ContextPtr local_context, const Configuration & configuration);
static void updateConfiguration(ContextPtr local_context, Configuration & configuration);

private:
friend class StorageS3Cluster;
Expand All @@ -319,6 +320,8 @@ class StorageS3 : public IStorage, WithContext

ObjectInfos object_infos;

static void updateS3Configuration(ContextPtr, Configuration &);

static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const Configuration & s3_configuration,
const std::vector<String> & keys,
Expand Down

0 comments on commit a39f6f4

Please sign in to comment.