Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MaterializedPostgreSQL: Support "generated columns" and default values #57568

Merged
merged 9 commits into from Dec 11, 2023
5 changes: 5 additions & 0 deletions src/Core/ExternalResultDescription.cpp
Expand Up @@ -20,6 +20,11 @@ namespace ErrorCodes
extern const int UNKNOWN_TYPE;
}

ExternalResultDescription::ExternalResultDescription(const Block & sample_block_)
{
init(sample_block_);
}

void ExternalResultDescription::init(const Block & sample_block_)
{
sample_block = sample_block_;
Expand Down
3 changes: 3 additions & 0 deletions src/Core/ExternalResultDescription.h
Expand Up @@ -41,6 +41,9 @@ struct ExternalResultDescription
Block sample_block;
std::vector<std::pair<ValueType, bool /* is_nullable */>> types;

ExternalResultDescription() = default;
explicit ExternalResultDescription(const Block & sample_block_);

void init(const Block & sample_block_);
};

Expand Down
8 changes: 4 additions & 4 deletions src/Core/PostgreSQL/insertPostgreSQLValue.cpp
Expand Up @@ -36,7 +36,7 @@ void insertDefaultPostgreSQLValue(IColumn & column, const IColumn & sample_colum
void insertPostgreSQLValue(
IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type,
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx)
const std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx)
{
switch (type)
{
Expand Down Expand Up @@ -125,8 +125,8 @@ void insertPostgreSQLValue(
pqxx::array_parser parser{value};
std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();

size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions;
const auto parse_value = array_info[idx].pqxx_parser;
size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info.at(idx).num_dimensions;
const auto parse_value = array_info.at(idx).pqxx_parser;
std::vector<Row> dimensions(expected_dimensions + 1);

while (parsed.first != pqxx::array_parser::juncture::done)
Expand All @@ -138,7 +138,7 @@ void insertPostgreSQLValue(
dimensions[dimension].emplace_back(parse_value(parsed.second));

else if (parsed.first == pqxx::array_parser::juncture::null_value)
dimensions[dimension].emplace_back(array_info[idx].default_value);
dimensions[dimension].emplace_back(array_info.at(idx).default_value);

else if (parsed.first == pqxx::array_parser::juncture::row_end)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Core/PostgreSQL/insertPostgreSQLValue.h
Expand Up @@ -23,7 +23,7 @@ struct PostgreSQLArrayInfo
void insertPostgreSQLValue(
IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type,
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx);
const std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx);

void preparePostgreSQLArrayInfo(
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type);
Expand Down
72 changes: 58 additions & 14 deletions src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp
Expand Up @@ -25,6 +25,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}


Expand Down Expand Up @@ -186,20 +187,25 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
}
else
{
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string> row;
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string, std::string> row;
while (stream >> row)
{
auto data_type = convertPostgreSQLDataType(
const auto column_name = std::get<0>(row);
const auto data_type = convertPostgreSQLDataType(
std::get<1>(row), recheck_array,
use_nulls && (std::get<2>(row) == /* not nullable */"f"),
std::get<3>(row));

columns.push_back(NameAndTypePair(std::get<0>(row), data_type));
columns.push_back(NameAndTypePair(column_name, data_type));
auto attgenerated = std::get<6>(row);
LOG_TEST(&Poco::Logger::get("kssenii"), "KSSENII: attgenerated: {}", attgenerated);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug logging, I will remove it here - #58198

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New PR - #58237


attributes.emplace_back(
PostgreSQLTableStructure::PGAttribute{
.atttypid = parse<int>(std::get<4>(row)),
.atttypmod = parse<int>(std::get<5>(row)),
attributes.emplace(
column_name,
PostgreSQLTableStructure::PGAttribute{
.atttypid = parse<int>(std::get<4>(row)),
.atttypmod = parse<int>(std::get<5>(row)),
.attgenerated = attgenerated.empty() ? char{} : char(attgenerated[0])
});

++i;
Expand Down Expand Up @@ -255,14 +261,19 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
PostgreSQLTableStructure table;

auto where = fmt::format("relname = {}", quoteString(postgres_table));
if (postgres_schema.empty())
where += " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')";
else
where += fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));

where += postgres_schema.empty()
? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')"
: fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));

std::string query = fmt::format(
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
"attnotnull AS not_null, attndims AS dims, atttypid as type_id, atttypmod as type_modifier "
"SELECT attname AS name, " /// column name
"format_type(atttypid, atttypmod) AS type, " /// data type
"attnotnull AS not_null, " /// is nullable
"attndims AS dims, " /// array dimensions
"atttypid as type_id, "
"atttypmod as type_modifier, "
"attgenerated as generated " /// if column has GENERATED
"FROM pg_attribute "
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
"AND NOT attisdropped AND attnum > 0 "
Expand All @@ -274,11 +285,44 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
if (!table.physical_columns)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table_with_schema);

for (const auto & column : table.physical_columns->columns)
{
table.physical_columns->names.push_back(column.name);
}

bool check_generated = table.physical_columns->attributes.end() != std::find_if(
table.physical_columns->attributes.begin(),
table.physical_columns->attributes.end(),
[](const auto & attr){ return attr.second.attgenerated == 's'; });

if (check_generated)
{
std::string attrdef_query = fmt::format(
"SELECT adnum, pg_get_expr(adbin, adrelid) as generated_expression "
"FROM pg_attrdef "
"WHERE adrelid = (SELECT oid FROM pg_class WHERE {});", where);

pqxx::result result{tx.exec(attrdef_query)};
for (const auto row : result)
{
size_t adnum = row[0].as<int>();
if (!adnum || adnum > table.physical_columns->names.size())
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Received adnum {}, but currently fetched columns list has {} columns",
adnum, table.physical_columns->attributes.size());
}
const auto column_name = table.physical_columns->names[adnum - 1];
table.physical_columns->attributes.at(column_name).attr_def = row[1].as<std::string>();
}
}

if (with_primary_key)
{
/// wiki.postgresql.org/wiki/Retrieve_primary_key_columns
query = fmt::format(
"SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type "
"SELECT a.attname, " /// column name
"format_type(a.atttypid, a.atttypmod) AS data_type " /// data type
"FROM pg_index i "
"JOIN pg_attribute a ON a.attrelid = i.indrelid "
"AND a.attnum = ANY(i.indkey) "
Expand Down
6 changes: 5 additions & 1 deletion src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h
Expand Up @@ -16,13 +16,17 @@ struct PostgreSQLTableStructure
{
Int32 atttypid;
Int32 atttypmod;
bool atthasdef;
char attgenerated;
std::string attr_def;
};
using Attributes = std::vector<PGAttribute>;
using Attributes = std::unordered_map<std::string, PGAttribute>;

struct ColumnsInfo
{
NamesAndTypesList columns;
Attributes attributes;
std::vector<std::string> names;
ColumnsInfo(NamesAndTypesList && columns_, Attributes && attributes_) : columns(columns_), attributes(attributes_) {}
};
using ColumnsInfoPtr = std::shared_ptr<ColumnsInfo>;
Expand Down