From aa35f6a8674fc263415bab13304624a0b14b832a Mon Sep 17 00:00:00 2001 From: JLDLaughlin Date: Fri, 2 Apr 2021 13:33:38 -0400 Subject: [PATCH] sql,sql-parser: parse CREATE SOURCES FROM POSTGRES Parses statements like: CREATE SOURCES FROM POSTGRES HOST '...' PUBLICATION '...' NAMESPACE '...' TABLES (...) where each of the upstream sources shares a HOST, PUBLICATION, and NAMESPACE, but has its own table name, alias, and optional column definitions. Leaves planning and sequencing unimplemented. --- src/coord/src/coord.rs | 1 + src/sql-parser/src/ast/defs/ddl.rs | 54 +++++++++-- src/sql-parser/src/ast/defs/name.rs | 8 ++ src/sql-parser/src/ast/defs/statement.rs | 16 ++++ src/sql-parser/src/parser.rs | 112 +++++++++++++++++------ src/sql-parser/tests/testdata/ddl | 41 ++++++++- src/sql/src/normalize.rs | 19 +++- src/sql/src/plan/statement.rs | 2 + src/sql/src/plan/statement/ddl.rs | 34 +++++-- src/sql/src/postgres_util.rs | 17 +--- src/sql/src/pure.rs | 110 ++++++++++++---------- test/pg-cdc/pg-cdc.td | 28 ++++-- 12 files changed, 319 insertions(+), 123 deletions(-) diff --git a/src/coord/src/coord.rs b/src/coord/src/coord.rs index 3ba84dd291bc6..3c60b65469287 100644 --- a/src/coord/src/coord.rs +++ b/src/coord/src/coord.rs @@ -755,6 +755,7 @@ impl Coordinator { | Statement::CreateSchema(_) | Statement::CreateSink(_) | Statement::CreateSource(_) + | Statement::CreateSources(_) | Statement::CreateTable(_) | Statement::CreateType(_) | Statement::CreateView(_) diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 2ce7113362113..6ccd36a722fae 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -306,10 +306,12 @@ pub enum Connector { publication: String, /// The namespace the synced table belongs to namespace: String, - /// The name of the table to sync - table: String, - /// The expected column schema of the synced table - columns: Vec>, + /// The table to sync, if only a single table + /// Will be set for `CREATE SOURCE`, but not `CREATE SOURCES` + table: Option>, + /// The tables to sync, if multiple tables + /// Will be set for `CREATE SOURCES`, but not `CREATE SOURCE` + tables: Option>>, }, PubNub { /// PubNub's subscribe key @@ -376,7 +378,7 @@ impl AstDisplay for Connector { publication, namespace, table, - columns, + tables, } => { f.write_str("POSTGRES HOST '"); f.write_str(&display::escape_single_quote_string(conn)); @@ -384,11 +386,15 @@ impl AstDisplay for Connector { f.write_str(&display::escape_single_quote_string(publication)); f.write_str("' NAMESPACE '"); f.write_str(&display::escape_single_quote_string(namespace)); - f.write_str("' TABLE '"); - f.write_str(&display::escape_single_quote_string(table)); - f.write_str("' ("); - f.write_node(&display::comma_separated(columns)); - f.write_str(")"); + if let Some(table) = table { + f.write_str("' TABLE "); + f.write_str(&table.to_ast_string()); + } + if let Some(tables) = tables { + f.write_str("' TABLES ("); + f.write_node(&display::comma_separated(tables)); + f.write_str(")"); + } } Connector::PubNub { subscribe_key, @@ -405,6 +411,34 @@ impl AstDisplay for Connector { } impl_display_t!(Connector); +/// Information about upstream Postgres tables used for replication sources +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PgTable { + /// The name of the table to sync + pub name: UnresolvedObjectName, + /// The name for the table in Materialize. This field will + /// be set for `CREATE SOURCES`, but for `CREATE SOURCE`. + pub alias: Option, + /// The expected column schema of the synced table + pub columns: Vec>, +} + +impl AstDisplay for PgTable { + fn fmt(&self, f: &mut AstFormatter) { + f.write_node(&self.name); + if let Some(alias) = &self.alias { + f.write_str(" AS "); + f.write_str(&alias.to_ast_string()); + } + if !self.columns.is_empty() { + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.columns)); + f.write_str(")"); + } + } +} +impl_display_t!(PgTable); + /// The key sources specified in the S3 source's `OBJECTS FROM` clause. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum S3KeySource { diff --git a/src/sql-parser/src/ast/defs/name.rs b/src/sql-parser/src/ast/defs/name.rs index 052a130134992..2d2006b930dae 100644 --- a/src/sql-parser/src/ast/defs/name.rs +++ b/src/sql-parser/src/ast/defs/name.rs @@ -117,6 +117,14 @@ impl UnresolvedObjectName { assert!(n.len() <= 3 && n.len() > 0); UnresolvedObjectName(n.iter().map(|n| (*n).into()).collect::>()) } + + /// Returns the item name of an `UnresolvedObjectName`. + pub fn item(&self) -> &str { + if self.0.is_empty() { + return ""; + } + self.0[self.0.len() - 1].as_str() + } } impl AstDisplay for UnresolvedObjectName { diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index b21aae88a2e9a..4abb4e32b1c88 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -36,6 +36,7 @@ pub enum Statement { CreateDatabase(CreateDatabaseStatement), CreateSchema(CreateSchemaStatement), CreateSource(CreateSourceStatement), + CreateSources(CreateSourcesStatement), CreateSink(CreateSinkStatement), CreateView(CreateViewStatement), CreateTable(CreateTableStatement), @@ -90,6 +91,7 @@ impl AstDisplay for Statement { Statement::CreateDatabase(stmt) => f.write_node(stmt), Statement::CreateSchema(stmt) => f.write_node(stmt), Statement::CreateSource(stmt) => f.write_node(stmt), + Statement::CreateSources(stmt) => f.write_node(stmt), Statement::CreateSink(stmt) => f.write_node(stmt), Statement::CreateView(stmt) => f.write_node(stmt), Statement::CreateTable(stmt) => f.write_node(stmt), @@ -397,6 +399,20 @@ impl AstDisplay for CreateSourceStatement { } impl_display_t!(CreateSourceStatement); +/// `CREATE SOURCES` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CreateSourcesStatement { + pub connector: Connector, +} + +impl AstDisplay for CreateSourcesStatement { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("CREATE SOURCES FROM "); + f.write_node(&self.connector); + } +} +impl_display_t!(CreateSourcesStatement); + /// `CREATE SINK` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateSinkStatement { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 92040d783c2fd..0075d59b9f439 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -100,6 +100,11 @@ enum IsLateral { } use IsLateral::*; +enum PostgresSourceTables { + Single, + Multiple, +} + #[derive(Debug, Clone, PartialEq)] pub struct ParserError { /// The error message. @@ -1420,6 +1425,8 @@ impl<'a> Parser<'a> { } else if self.parse_keyword(SOURCE) { self.prev_token(); self.parse_create_source() + } else if self.parse_keyword(SOURCES) { + self.parse_create_sources() } else if self.parse_keyword(SINK) { self.parse_create_sink() } else if self.parse_keyword(DEFAULT) { @@ -1659,6 +1666,16 @@ impl<'a> Parser<'a> { })) } + fn parse_create_sources(&mut self) -> Result, ParserError> { + self.expect_keyword(FROM)?; + self.expect_keyword(POSTGRES)?; + let connector = self.parse_postgres_connector(PostgresSourceTables::Multiple)?; + + Ok(Statement::CreateSources(CreateSourcesStatement { + connector, + })) + } + fn parse_create_sink(&mut self) -> Result, ParserError> { let if_not_exists = self.parse_if_not_exists()?; let name = self.parse_object_name()?; @@ -1722,34 +1739,7 @@ impl<'a> Parser<'a> { channel, }) } - POSTGRES => { - self.expect_keyword(HOST)?; - let conn = self.parse_literal_string()?; - self.expect_keyword(PUBLICATION)?; - let publication = self.parse_literal_string()?; - self.expect_keyword(NAMESPACE)?; - let namespace = self.parse_literal_string()?; - self.expect_keyword(TABLE)?; - let table = self.parse_literal_string()?; - - let (columns, constraints) = self.parse_columns(Optional)?; - - if !constraints.is_empty() { - return parser_err!( - self, - self.peek_prev_pos(), - "Cannot specify constraints in Postgres table definition" - ); - } - - Ok(Connector::Postgres { - conn, - publication, - namespace, - table, - columns, - }) - } + POSTGRES => self.parse_postgres_connector(PostgresSourceTables::Single), FILE => { let path = self.parse_literal_string()?; let compression = if self.parse_keyword(COMPRESSION) { @@ -1825,6 +1815,72 @@ impl<'a> Parser<'a> { } } + fn parse_postgres_connector( + &mut self, + typ: PostgresSourceTables, + ) -> Result, ParserError> { + self.expect_keyword(HOST)?; + let conn = self.parse_literal_string()?; + self.expect_keyword(PUBLICATION)?; + let publication = self.parse_literal_string()?; + self.expect_keyword(NAMESPACE)?; + let namespace = self.parse_literal_string()?; + let (table, tables) = match typ { + PostgresSourceTables::Single => { + self.expect_keyword(TABLE)?; + ( + Some(self.parse_postgres_table(PostgresSourceTables::Single)?), + None, + ) + } + PostgresSourceTables::Multiple => { + self.expect_keyword(TABLES)?; + self.expect_token(&Token::LParen)?; + let tables = self.parse_comma_separated(|parser| { + parser.parse_postgres_table(PostgresSourceTables::Multiple) + })?; + self.expect_token(&Token::RParen)?; + (None, Some(tables)) + } + }; + Ok(Connector::Postgres { + conn, + publication, + namespace, + table, + tables, + }) + } + + fn parse_postgres_table( + &mut self, + typ: PostgresSourceTables, + ) -> Result, ParserError> { + let name = self.parse_object_name()?; + let alias = match typ { + PostgresSourceTables::Single => None, + PostgresSourceTables::Multiple => { + self.expect_keyword(AS)?; + Some(RawName::Name(self.parse_object_name()?)) + } + }; + let (columns, constraints) = self.parse_columns(Optional)?; + + if !constraints.is_empty() { + return parser_err!( + self, + self.peek_prev_pos(), + "Cannot specify constraints in Postgres table definition" + ); + } + + Ok(PgTable { + name, + alias, + columns, + }) + } + fn parse_create_view(&mut self) -> Result, ParserError> { let mut if_exists = if self.parse_keyword(OR) { self.expect_keyword(REPLACE)?; diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 422ee87ce35ef..132016503322f 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -497,11 +497,11 @@ CREATE SOURCE source FROM KAFKA BROKER 'broker' TOPIC 'topic' WITH (start_offset CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("source")]), col_names: [], connector: Kafka { broker: "broker", topic: "topic", key: None }, with_options: [Value { name: Ident("start_offset"), value: Array([Number("2"), Number("40000000")]) }], format: Some(Avro(Schema { schema: File("path"), with_options: [] })), envelope: Upsert(Some(Text)), if_not_exists: false, materialized: false }) parse-statement -CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int NOT NULL, evolution int); +CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE "psychic" (pokedex_id int NOT NULL, evolution int); ---- -CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int4 NOT NULL, evolution int4) +CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE psychic (pokedex_id int4 NOT NULL, evolution int4) => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: "psychic", columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: Some(PgTable { name: UnresolvedObjectName([Ident("psychic")]), alias: None, columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }), tables: None }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false }) parse-statement CREATE SOURCE psychic FROM PUBNUB SUBSCRIBE KEY 'subscribe_key' CHANNEL 'channel'; @@ -524,6 +524,41 @@ error: Expected NOT, found EXISTS CREATE SOURCE IF EXISTS foo FROM FILE 'bar' USING SCHEMA '' ^ +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (); +---- +error: Expected identifier, found right parenthesis +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (); + ^ + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1); +---- +error: Expected AS, found number +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1); + ^ + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int)) +---- +error: Expected AS, found left parenthesis +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int)) + ^ + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (psychic as "public.psychic" (pokedex_id int NOT NULL, evolution int)) +---- +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (psychic AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4)) +=> +CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: None, tables: Some([PgTable { name: UnresolvedObjectName([Ident("psychic")]), alias: Some(Name(UnresolvedObjectName([Ident("public.psychic")]))), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }]) } }) + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("random.psychic" as "public.psychic" (pokedex_id int NOT NULL, evolution int), "another_table" as "another_one") +---- +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("random.psychic" AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4), another_table AS another_one) +=> +CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: None, tables: Some([PgTable { name: UnresolvedObjectName([Ident("random.psychic")]), alias: Some(Name(UnresolvedObjectName([Ident("public.psychic")]))), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, PgTable { name: UnresolvedObjectName([Ident("another_table")]), alias: Some(Name(UnresolvedObjectName([Ident("another_one")]))), columns: [] }]) } }) + parse-statement CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES ---- diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index 04da7736c41e9..614c67a0a06b5 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -23,6 +23,7 @@ use aws_util::aws; use repr::ColumnName; use sql_parser::ast::display::AstDisplay; use sql_parser::ast::visit_mut::{self, VisitMut}; +use sql_parser::ast::PgTable; use sql_parser::ast::{ AstInfo, Connector, CreateIndexStatement, CreateSinkStatement, CreateSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement, Function, FunctionArgs, Ident, @@ -266,10 +267,22 @@ pub fn create_statement( *name = allocate_name(name)?; *if_not_exists = false; *materialized = false; - if let Connector::Postgres { columns, .. } = connector { + if let Connector::Postgres { table, tables, .. } = connector { + let visit_table_columns = + |normalizer: &mut QueryNormalizer, table: &mut PgTable| { + for column in table.columns.iter_mut() { + normalizer.visit_column_def_mut(column) + } + }; + let mut normalizer = QueryNormalizer::new(scx); - for c in columns { - normalizer.visit_column_def_mut(c); + if let Some(table) = table { + visit_table_columns(&mut normalizer, table) + } + if let Some(tables) = tables { + for table in tables { + visit_table_columns(&mut normalizer, table) + } } } } diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index cbcfe8676e2a7..062a8effccf04 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -109,6 +109,7 @@ pub fn describe( Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?, Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?, Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?, + Statement::CreateSources(stmt) => ddl::describe_create_sources(&scx, stmt)?, Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?, Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?, Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?, @@ -193,6 +194,7 @@ pub fn plan( Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt), Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt), Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt), + Statement::CreateSources(stmt) => ddl::plan_create_sources(scx, stmt), Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt, params), Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt), Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt), diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index d14ed786f643c..8ca1f47080892 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -55,10 +55,10 @@ use crate::ast::{ AlterIndexOptionsList, AlterIndexOptionsStatement, AlterObjectRenameStatement, AvroSchema, ColumnOption, Compression, Connector, CreateDatabaseStatement, CreateIndexStatement, CreateRoleOption, CreateRoleStatement, CreateSchemaStatement, CreateSinkStatement, - CreateSourceStatement, CreateTableStatement, CreateTypeAs, CreateTypeStatement, - CreateViewStatement, DataType, DropDatabaseStatement, DropObjectsStatement, Envelope, Expr, - Format, Ident, IfExistsBehavior, ObjectType, Raw, SqlOption, Statement, UnresolvedObjectName, - Value, WithOption, + CreateSourceStatement, CreateSourcesStatement, CreateTableStatement, CreateTypeAs, + CreateTypeStatement, CreateViewStatement, DataType, DropDatabaseStatement, + DropObjectsStatement, Envelope, Expr, Format, Ident, IfExistsBehavior, ObjectType, Raw, + SqlOption, Statement, UnresolvedObjectName, Value, WithOption, }; use crate::catalog::{CatalogItem, CatalogItemType}; use crate::kafka_util; @@ -227,6 +227,13 @@ pub fn describe_create_source( Ok(StatementDesc::new(None)) } +pub fn describe_create_sources( + _: &StatementContext, + _: CreateSourcesStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + // Flatten one Debezium entry ("before" or "after") // into its corresponding data fields, plus an extra column for the diff. fn plan_dbz_flatten_one( @@ -709,24 +716,28 @@ pub fn plan_create_source( publication, namespace, table, - columns, + .. } => { scx.require_experimental_mode("Postgres Sources")?; + let table = table + .clone() + .expect("invalid Postgres connector for CREATE SOURCE"); let _connector = ExternalSourceConnector::Postgres(PostgresSourceConnector { conn: conn.clone(), publication: publication.clone(), namespace: namespace.clone(), - table: table.clone(), + table: table.name.to_ast_string(), }); // Build the expected relation description - let col_names: Vec<_> = columns + let col_names: Vec<_> = table + .columns .iter() .map(|c| Some(normalize::column_name(c.name.clone()))) .collect(); let mut col_types = vec![]; - for c in columns { + for c in table.columns { if let Some(collation) = &c.collation { unsupported!(format!( "CREATE SOURCE FROM POSTGRES with column collation: {}", @@ -1027,6 +1038,13 @@ pub fn plan_create_source( }) } +pub fn plan_create_sources( + _scx: &StatementContext, + _stmt: CreateSourcesStatement, +) -> Result { + unsupported!("CREATE SOURCES"); +} + pub fn describe_create_view( _: &StatementContext, _: CreateViewStatement, diff --git a/src/sql/src/postgres_util.rs b/src/sql/src/postgres_util.rs index 9f52d42312630..a89574dd94ffd 100644 --- a/src/sql/src/postgres_util.rs +++ b/src/sql/src/postgres_util.rs @@ -28,24 +28,13 @@ pub struct PgColumn { /// /// - Invalid connection string, user information, or user permissions. /// - Upstream table does not exist or contains invalid values. -pub async fn fetch_columns( - conn: &str, - namespace: &str, - table: &str, -) -> Result, anyhow::Error> { +pub async fn fetch_columns(conn: &str, table: &str) -> Result, anyhow::Error> { let (client, connection) = tokio_postgres::connect(&conn, NoTls).await?; tokio::spawn(connection); + let query = format!("SELECT '{}'::regclass::oid", table); let rel_id: u32 = client - .query( - "SELECT c.oid - FROM pg_catalog.pg_class c - INNER JOIN pg_catalog.pg_namespace n - ON (c.relnamespace = n.oid) - WHERE n.nspname = $1 - AND c.relname = $2;", - &[&namespace, &table], - ) + .query(query.as_str(), &[]) .await? .get(0) .ok_or_else(|| anyhow!("table not found in the upstream catalog"))? diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 45722aa2a0187..e4c98d9668d1d 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -23,7 +23,7 @@ use tokio::time::Duration; use repr::strconv; use sql_parser::ast::{ AvroSchema, ColumnOption, ColumnOptionDef, Connector, CreateSourceStatement, CsrSeed, Format, - Ident, Raw, Statement, + Ident, PgTable, Raw, Statement, }; use sql_parser::parser::parse_columns; @@ -106,58 +106,18 @@ pub async fn purify(mut stmt: Statement) -> Result, anyhow:: } Connector::Postgres { conn, - namespace, table, - columns, + tables, .. } => { - let fetched_columns = postgres_util::fetch_columns(conn, namespace, table).await?; - let (upstream_columns, _constraints) = - parse_columns(&postgres_util::format_columns(fetched_columns))?; - if columns.is_empty() { - *columns = upstream_columns; - } else { - if columns != &upstream_columns { - if columns.len() != upstream_columns.len() { - bail!( - "incorrect column specification: {} columns were specified, upstream has {}: {}", - columns.len(), - upstream_columns.len(), - upstream_columns - .iter() - .map(|u| u.name.to_string()) - .collect::>() - .join(", ") - ) - } - for (c, u) in columns.into_iter().zip(upstream_columns) { - // By default, Postgres columns are nullable. This means that both - // of the following column definitions are nullable: - // example_col bool - // example_col bool NULL - // Fetched upstream column information, on the other hand, will always - // include NULL if a column is nullable. In order to compare the user-provided - // columns with the fetched columns, we need to append a NULL constraint - // to any user-provided columns that are implicitly NULL. - if !c.options.contains(&ColumnOptionDef { - name: None, - option: ColumnOption::NotNull, - }) && !c.options.contains(&ColumnOptionDef { - name: None, - option: ColumnOption::Null, - }) { - c.options.push(ColumnOptionDef { - name: None, - option: ColumnOption::Null, - }); - } - if c != &u { - bail!("incorrect column specification: specified column does not match upstream source, specified: {}, upstream: {}", c, u); - } - } + if let Some(table) = table { + purify_postgres_connector(conn, table).await? + } + if let Some(tables) = tables { + for table in tables.iter_mut() { + purify_postgres_connector(conn, table).await? } } - () } Connector::PubNub { .. } => (), } @@ -170,6 +130,60 @@ pub async fn purify(mut stmt: Statement) -> Result, anyhow:: Ok(stmt) } +async fn purify_postgres_connector( + conn: &str, + table: &mut PgTable, +) -> Result<(), anyhow::Error> { + let fetched_columns = postgres_util::fetch_columns(conn, &table.name.item()).await?; + let (upstream_columns, _constraints) = + parse_columns(&postgres_util::format_columns(fetched_columns))?; + if table.columns.is_empty() { + table.columns = upstream_columns; + } else { + if table.columns != upstream_columns { + if table.columns.len() != upstream_columns.len() { + bail!( + "incorrect column specification: {} columns were specified, upstream has {}: {}", + table.columns.len(), + upstream_columns.len(), + upstream_columns + .iter() + .map(|u| u.name.to_string()) + .collect::>() + .join(", ") + ) + } + + for (c, u) in table.columns.iter_mut().zip(upstream_columns) { + // By default, Postgres columns are nullable. This means that both + // of the following column definitions are nullable: + // example_col bool + // example_col bool NULL + // Fetched upstream column information, on the other hand, will always + // include NULL if a column is nullable. In order to compare the user-provided + // columns with the fetched columns, we need to append a NULL constraint + // to any user-provided columns that are implicitly NULL. + if !c.options.contains(&ColumnOptionDef { + name: None, + option: ColumnOption::NotNull, + }) && !c.options.contains(&ColumnOptionDef { + name: None, + option: ColumnOption::Null, + }) { + c.options.push(ColumnOptionDef { + name: None, + option: ColumnOption::Null, + }); + } + if c != &u { + bail!("incorrect column specification: specified column does not match upstream source, specified: {}, upstream: {}", c, u); + } + } + } + } + Ok(()) +} + async fn purify_format( format: &mut Option>, connector: &mut Connector, diff --git a/test/pg-cdc/pg-cdc.td b/test/pg-cdc/pg-cdc.td index fb9f5f98bc408..0cc1a5c3ee4f5 100644 --- a/test/pg-cdc/pg-cdc.td +++ b/test/pg-cdc/pg-cdc.td @@ -20,7 +20,7 @@ INSERT INTO numbers_${testdrive.seed} VALUES (5, true, 'five'); HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' + TABLE "public.numbers_${testdrive.seed}" Postgres sources not yet supported ! CREATE MATERIALIZED SOURCE "numbers" @@ -28,7 +28,7 @@ Postgres sources not yet supported HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' () + TABLE numbers_${testdrive.seed} () Postgres sources not yet supported ! CREATE MATERIALIZED SOURCE "numbers" @@ -36,7 +36,7 @@ Postgres sources not yet supported HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' (number int NOT NULL, is_prime bool NULL, name text NULL) + TABLE "numbers_${testdrive.seed}" (number int NOT NULL, is_prime bool NULL, name text NULL) Postgres sources not yet supported ! CREATE MATERIALIZED SOURCE "numbers" @@ -44,7 +44,7 @@ Postgres sources not yet supported HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}'(number int NOT NULL, is_prime bool, name text) + TABLE "numbers_${testdrive.seed}" (number int NOT NULL, is_prime bool, name text) Postgres sources not yet supported ## CREATE with incorrect information should fail purification. @@ -54,7 +54,7 @@ Postgres sources not yet supported HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' (number int NOT NULL) + TABLE public.numbers_${testdrive.seed} (number int NOT NULL) incorrect column specification: 1 columns were specified, upstream has 3: number, is_prime, name ! CREATE MATERIALIZED SOURCE "numbers" @@ -62,7 +62,7 @@ incorrect column specification: 1 columns were specified, upstream has 3: number HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' (number int NOT NULL, is_prime bool null, name text, extra numeric NULL) + TABLE numbers_${testdrive.seed} (number int NOT NULL, is_prime bool null, name text, extra numeric NULL) incorrect column specification: 4 columns were specified, upstream has 3: number, is_prime, name ! CREATE MATERIALIZED SOURCE "numbers" @@ -70,7 +70,7 @@ incorrect column specification: 4 columns were specified, upstream has 3: number HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' (number int NOT NULL, is_prime int, name text) + TABLE numbers_${testdrive.seed} (number int NOT NULL, is_prime int, name text) incorrect column specification: specified column does not match upstream source, specified: is_prime int4 NULL, upstream: is_prime bool NULL ! CREATE MATERIALIZED SOURCE "numbers" @@ -78,7 +78,7 @@ incorrect column specification: specified column does not match upstream source, HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' (number int NULL, is_prime bool null, name text) + TABLE "numbers_${testdrive.seed}" (number int NULL, is_prime bool null, name text) incorrect column specification: specified column does not match upstream source, specified: number int4 NULL, upstream: number int4 NOT NULL ! CREATE MATERIALIZED SOURCE "numbers" @@ -86,5 +86,15 @@ incorrect column specification: specified column does not match upstream source, HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' PUBLICATION 'mz_source_${testdrive.seed}' NAMESPACE 'public' - TABLE 'numbers_${testdrive.seed}' (number int NOT NULL, is_prime bool, name text NOT NULL) + TABLE numbers_${testdrive.seed} (number int NOT NULL, is_prime bool, name text NOT NULL) incorrect column specification: specified column does not match upstream source, specified: name text NOT NULL, upstream: name text NULL + +## CREATE SOURCES + +! CREATE SOURCES + FROM POSTGRES + HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' + PUBLICATION 'mz_source_${testdrive.seed}' + NAMESPACE 'public' + TABLES ("numbers_${testdrive.seed}" AS "numbers", numbers_${testdrive.seed} AS "second_numbers" (number int NOT NULL, is_prime bool NULL, name text NULL)) +CREATE SOURCES not yet supported