diff --git a/src/coord/src/coord.rs b/src/coord/src/coord.rs index 0a595ab9f20b..d922606ffe76 100644 --- a/src/coord/src/coord.rs +++ b/src/coord/src/coord.rs @@ -757,6 +757,7 @@ impl Coordinator { | Statement::CreateSchema(_) | Statement::CreateSink(_) | Statement::CreateSource(_) + | Statement::CreateSources(_) | Statement::CreateTable(_) | Statement::CreateType(_) | Statement::CreateView(_) diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 2ce711336211..7ee0f8cd2fc8 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -405,6 +405,71 @@ impl AstDisplay for Connector { } impl_display_t!(Connector); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum MultiConnector { + Postgres { + /// The postgres connection string + conn: String, + /// The name of the publication to sync + publication: String, + /// The namespace the synced table belongs to + namespace: String, + /// The tables to sync + tables: Vec>, + }, +} + +impl AstDisplay for MultiConnector { + fn fmt(&self, f: &mut AstFormatter) { + match self { + MultiConnector::Postgres { + conn, + publication, + namespace, + tables, + } => { + f.write_str("POSTGRES HOST '"); + f.write_str(&display::escape_single_quote_string(conn)); + f.write_str("' PUBLICATION '"); + f.write_str(&display::escape_single_quote_string(publication)); + f.write_str("' NAMESPACE '"); + f.write_str(&display::escape_single_quote_string(namespace)); + f.write_str("' TABLES ("); + f.write_node(&display::comma_separated(tables)); + f.write_str(")"); + } + } + } +} +impl_display_t!(MultiConnector); + +/// Information about upstream Postgres tables used for replication sources +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PgTable { + /// The name of the table to sync + pub name: String, + /// The name for the table in Materialize + pub alias: T::ObjectName, + /// The expected column schema of the synced table + pub columns: Vec>, +} + +impl AstDisplay for PgTable { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("'"); + f.write_str(&display::escape_single_quote_string(&self.name)); + f.write_str("'"); + f.write_str(" AS "); + f.write_str(self.alias.to_ast_string()); + if !self.columns.is_empty() { + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.columns)); + f.write_str(")"); + } + } +} +impl_display_t!(PgTable); + /// The key sources specified in the S3 source's `OBJECTS FROM` clause. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum S3KeySource { diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index b21aae88a2e9..1546106b67f7 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -20,8 +20,8 @@ use crate::ast::display::{self, AstDisplay, AstFormatter}; use crate::ast::{ - AstInfo, ColumnDef, Connector, DataType, Envelope, Expr, Format, Ident, Query, TableConstraint, - UnresolvedObjectName, Value, + AstInfo, ColumnDef, Connector, DataType, Envelope, Expr, Format, Ident, MultiConnector, Query, + TableConstraint, UnresolvedObjectName, Value, }; /// A top-level statement (SELECT, INSERT, CREATE, etc.) @@ -36,6 +36,7 @@ pub enum Statement { CreateDatabase(CreateDatabaseStatement), CreateSchema(CreateSchemaStatement), CreateSource(CreateSourceStatement), + CreateSources(CreateSourcesStatement), CreateSink(CreateSinkStatement), CreateView(CreateViewStatement), CreateTable(CreateTableStatement), @@ -90,6 +91,7 @@ impl AstDisplay for Statement { Statement::CreateDatabase(stmt) => f.write_node(stmt), Statement::CreateSchema(stmt) => f.write_node(stmt), Statement::CreateSource(stmt) => f.write_node(stmt), + Statement::CreateSources(stmt) => f.write_node(stmt), Statement::CreateSink(stmt) => f.write_node(stmt), Statement::CreateView(stmt) => f.write_node(stmt), Statement::CreateTable(stmt) => f.write_node(stmt), @@ -397,6 +399,20 @@ impl AstDisplay for CreateSourceStatement { } impl_display_t!(CreateSourceStatement); +/// `CREATE SOURCES` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CreateSourcesStatement { + pub connector: MultiConnector, +} + +impl AstDisplay for CreateSourcesStatement { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("CREATE SOURCES FROM "); + f.write_node(&self.connector); + } +} +impl_display_t!(CreateSourcesStatement); + /// `CREATE SINK` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateSinkStatement { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 92040d783c2f..affa11c7485e 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1420,6 +1420,8 @@ impl<'a> Parser<'a> { } else if self.parse_keyword(SOURCE) { self.prev_token(); self.parse_create_source() + } else if self.parse_keyword(SOURCES) { + self.parse_create_sources() } else if self.parse_keyword(SINK) { self.parse_create_sink() } else if self.parse_keyword(DEFAULT) { @@ -1659,6 +1661,15 @@ impl<'a> Parser<'a> { })) } + fn parse_create_sources(&mut self) -> Result, ParserError> { + self.expect_keyword(FROM)?; + let connector = self.parse_multi_connector()?; + + Ok(Statement::CreateSources(CreateSourcesStatement { + connector, + })) + } + fn parse_create_sink(&mut self) -> Result, ParserError> { let if_not_exists = self.parse_if_not_exists()?; let name = self.parse_object_name()?; @@ -1825,6 +1836,30 @@ impl<'a> Parser<'a> { } } + fn parse_multi_connector(&mut self) -> Result, ParserError> { + match self.expect_one_of_keywords(&[POSTGRES])? { + POSTGRES => { + self.expect_keyword(HOST)?; + let conn = self.parse_literal_string()?; + self.expect_keyword(PUBLICATION)?; + let publication = self.parse_literal_string()?; + self.expect_keyword(NAMESPACE)?; + let namespace = self.parse_literal_string()?; + self.expect_keyword(TABLES)?; + self.expect_token(&Token::LParen)?; + let tables = self.parse_postgres_tables()?; + + Ok(MultiConnector::Postgres { + conn, + publication, + namespace, + tables, + }) + } + _ => unreachable!(), + } + } + fn parse_create_view(&mut self) -> Result, ParserError> { let mut if_exists = if self.parse_keyword(OR) { self.expect_keyword(REPLACE)?; @@ -2052,6 +2087,41 @@ impl<'a> Parser<'a> { })) } + fn parse_postgres_tables(&mut self) -> Result>, ParserError> { + let mut tables = vec![]; + loop { + let name = self.parse_literal_string()?; + self.expect_keyword(AS)?; + let alias = RawName::Name(self.parse_object_name()?); + let (columns, constraints) = self.parse_columns(Optional)?; + if !constraints.is_empty() { + return parser_err!( + self, + self.peek_prev_pos(), + "Cannot specify constraints in Postgres table definition" + ); + } + tables.push(PgTable { + name, + alias, + columns, + }); + + if self.consume_token(&Token::Comma) { + // Continue. + } else if self.consume_token(&Token::RParen) { + break; + } else { + return self.expected( + self.peek_pos(), + "',' or ')' after table definition", + self.peek_token(), + ); + } + } + Ok(tables) + } + fn parse_columns( &mut self, optional: IsOptional, diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 422ee87ce35e..5e14750c595f 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -524,6 +524,41 @@ error: Expected NOT, found EXISTS CREATE SOURCE IF EXISTS foo FROM FILE 'bar' USING SCHEMA '' ^ +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (); +---- +error: Expected literal string, found right parenthesis +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (); + ^ + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1); +---- +error: Expected literal string, found SELECT +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1); + ^ + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int)) +---- +error: Expected literal string, found identifier +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int)) + ^ + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' as "public.psychic" (pokedex_id int NOT NULL, evolution int)) +---- +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4)) +=> +CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", tables: [PgTable { name: "psychic", alias: Name(UnresolvedObjectName([Ident("public.psychic")])), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }] } }) + +parse-statement +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('random.psychic' as "public.psychic" (pokedex_id int NOT NULL, evolution int), 'another_table' as "another_one") +---- +CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('random.psychic' AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4), 'another_table' AS another_one) +=> +CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", tables: [PgTable { name: "random.psychic", alias: Name(UnresolvedObjectName([Ident("public.psychic")])), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, PgTable { name: "another_table", alias: Name(UnresolvedObjectName([Ident("another_one")])), columns: [] }] } }) + parse-statement CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES ---- diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index cbcfe8676e2a..062a8effccf0 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -109,6 +109,7 @@ pub fn describe( Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?, Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?, Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?, + Statement::CreateSources(stmt) => ddl::describe_create_sources(&scx, stmt)?, Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?, Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?, Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?, @@ -193,6 +194,7 @@ pub fn plan( Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt), Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt), Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt), + Statement::CreateSources(stmt) => ddl::plan_create_sources(scx, stmt), Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt, params), Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt), Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt), diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index d14ed786f643..ff4d64f55139 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -55,10 +55,10 @@ use crate::ast::{ AlterIndexOptionsList, AlterIndexOptionsStatement, AlterObjectRenameStatement, AvroSchema, ColumnOption, Compression, Connector, CreateDatabaseStatement, CreateIndexStatement, CreateRoleOption, CreateRoleStatement, CreateSchemaStatement, CreateSinkStatement, - CreateSourceStatement, CreateTableStatement, CreateTypeAs, CreateTypeStatement, - CreateViewStatement, DataType, DropDatabaseStatement, DropObjectsStatement, Envelope, Expr, - Format, Ident, IfExistsBehavior, ObjectType, Raw, SqlOption, Statement, UnresolvedObjectName, - Value, WithOption, + CreateSourceStatement, CreateSourcesStatement, CreateTableStatement, CreateTypeAs, + CreateTypeStatement, CreateViewStatement, DataType, DropDatabaseStatement, + DropObjectsStatement, Envelope, Expr, Format, Ident, IfExistsBehavior, ObjectType, Raw, + SqlOption, Statement, UnresolvedObjectName, Value, WithOption, }; use crate::catalog::{CatalogItem, CatalogItemType}; use crate::kafka_util; @@ -227,6 +227,13 @@ pub fn describe_create_source( Ok(StatementDesc::new(None)) } +pub fn describe_create_sources( + _: &StatementContext, + _: CreateSourcesStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + // Flatten one Debezium entry ("before" or "after") // into its corresponding data fields, plus an extra column for the diff. fn plan_dbz_flatten_one( @@ -1027,6 +1034,13 @@ pub fn plan_create_source( }) } +pub fn plan_create_sources( + _scx: &StatementContext, + _stmt: CreateSourcesStatement, +) -> Result { + unsupported!("CREATE SOURCES"); +} + pub fn describe_create_view( _: &StatementContext, _: CreateViewStatement, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index f38b867d20f4..530fc31de7b9 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -23,8 +23,8 @@ use tokio::time::Duration; use repr::strconv; use sql_parser::ast::display::AstDisplay; use sql_parser::ast::{ - AvroSchema, ColumnOption, ColumnOptionDef, Connector, CreateSourceStatement, CsrSeed, Format, - Ident, Raw, Statement, + AvroSchema, ColumnDef, ColumnOption, ColumnOptionDef, Connector, CreateSourceStatement, + CreateSourcesStatement, CsrSeed, Format, Ident, MultiConnector, Raw, Statement, }; use sql_parser::parser::parse_columns; @@ -111,60 +111,7 @@ pub async fn purify(mut stmt: Statement) -> Result, anyhow:: table, columns, .. - } => { - let fetched_columns = postgres_util::fetch_columns(conn, namespace, table) - .await? - .iter() - .map(|c| c.to_ast_string()) - .collect::>() - .join(", "); - let (upstream_columns, _constraints) = - parse_columns(&format!("({})", fetched_columns))?; - if columns.is_empty() { - *columns = upstream_columns; - } else { - if columns != &upstream_columns { - if columns.len() != upstream_columns.len() { - bail!( - "incorrect column specification: {} columns were specified, upstream has {}: {}", - columns.len(), - upstream_columns.len(), - upstream_columns - .iter() - .map(|u| u.name.to_string()) - .collect::>() - .join(", ") - ) - } - for (c, u) in columns.into_iter().zip(upstream_columns) { - // By default, Postgres columns are nullable. This means that both - // of the following column definitions are nullable: - // example_col bool - // example_col bool NULL - // Fetched upstream column information, on the other hand, will always - // include NULL if a column is nullable. In order to compare the user-provided - // columns with the fetched columns, we need to append a NULL constraint - // to any user-provided columns that are implicitly NULL. - if !c.options.contains(&ColumnOptionDef { - name: None, - option: ColumnOption::NotNull, - }) && !c.options.contains(&ColumnOptionDef { - name: None, - option: ColumnOption::Null, - }) { - c.options.push(ColumnOptionDef { - name: None, - option: ColumnOption::Null, - }); - } - if c != &u { - bail!("incorrect column specification: specified column does not match upstream source, specified: {}, upstream: {}", c, u); - } - } - } - } - () - } + } => purify_postgres_table(conn, namespace, table, columns).await?, Connector::PubNub { .. } => (), } @@ -173,9 +120,82 @@ pub async fn purify(mut stmt: Statement) -> Result, anyhow:: purify_format(format, connector, col_names, None, &config_options).await?; } } + if let Statement::CreateSources(CreateSourcesStatement { connector }) = &mut stmt { + match connector { + MultiConnector::Postgres { + conn, + namespace, + tables, + .. + } => { + for table in tables { + purify_postgres_table(conn, namespace, &table.name, &mut table.columns).await? + } + } + } + } Ok(stmt) } +async fn purify_postgres_table( + conn: &str, + namespace: &str, + table: &str, + columns: &mut Vec>, +) -> Result<(), anyhow::Error> { + let fetched_columns = postgres_util::fetch_columns(conn, namespace, table) + .await? + .iter() + .map(|c| c.to_ast_string()) + .collect::>() + .join(", "); + let (upstream_columns, _constraints) = parse_columns(&format!("({})", fetched_columns))?; + if columns.is_empty() { + *columns = upstream_columns; + } else { + if columns != &upstream_columns { + if columns.len() != upstream_columns.len() { + bail!( + "incorrect column specification: {} columns were specified, upstream has {}: {}", + columns.len(), + upstream_columns.len(), + upstream_columns + .iter() + .map(|u| u.name.to_string()) + .collect::>() + .join(", ") + ) + } + for (c, u) in columns.into_iter().zip(upstream_columns) { + // By default, Postgres columns are nullable. This means that both + // of the following column definitions are nullable: + // example_col bool + // example_col bool NULL + // Fetched upstream column information, on the other hand, will always + // include NULL if a column is nullable. In order to compare the user-provided + // columns with the fetched columns, we need to append a NULL constraint + // to any user-provided columns that are implicitly NULL. + if !c.options.contains(&ColumnOptionDef { + name: None, + option: ColumnOption::NotNull, + }) && !c.options.contains(&ColumnOptionDef { + name: None, + option: ColumnOption::Null, + }) { + c.options.push(ColumnOptionDef { + name: None, + option: ColumnOption::Null, + }); + } + if c != &u { + bail!("incorrect column specification: specified column does not match upstream source, specified: {}, upstream: {}", c, u); + } + } + } + } + Ok(()) +} + async fn purify_format( format: &mut Option>, connector: &mut Connector, diff --git a/test/pg-cdc/pg-cdc.td b/test/pg-cdc/pg-cdc.td index fb9f5f98bc40..069e4a96a277 100644 --- a/test/pg-cdc/pg-cdc.td +++ b/test/pg-cdc/pg-cdc.td @@ -10,10 +10,14 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE TABLE numbers_${testdrive.seed} (number int PRIMARY KEY, is_prime bool, name text); ALTER TABLE numbers_${testdrive.seed} REPLICA IDENTITY FULL; + +CREATE SCHEMA other_schema_${testdrive.seed}; +CREATE TABLE other_schema_${testdrive.seed}.numbers_${testdrive.seed} (number int PRIMARY KEY, is_prime bool NOT NULL, name text NOT NULL); + CREATE PUBLICATION mz_source_${testdrive.seed} FOR ALL TABLES; INSERT INTO numbers_${testdrive.seed} VALUES (5, true, 'five'); -## CREATE with correct information should pass purification and fail afterwards. +## CREATE SOURCE with correct information should pass purification and fail afterwards. ! CREATE MATERIALIZED SOURCE "numbers" FROM POSTGRES @@ -47,7 +51,7 @@ Postgres sources not yet supported TABLE 'numbers_${testdrive.seed}'(number int NOT NULL, is_prime bool, name text) Postgres sources not yet supported -## CREATE with incorrect information should fail purification. +## CREATE SOURCE with incorrect information should fail purification. ! CREATE MATERIALIZED SOURCE "numbers" FROM POSTGRES @@ -88,3 +92,39 @@ incorrect column specification: specified column does not match upstream source, NAMESPACE 'public' TABLE 'numbers_${testdrive.seed}' (number int NOT NULL, is_prime bool, name text NOT NULL) incorrect column specification: specified column does not match upstream source, specified: name text NOT NULL, upstream: name text NULL + +## CREATE SOURCES with correct information should pass purification and fail afterwards. + +! CREATE SOURCES + FROM POSTGRES + HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' + PUBLICATION 'mz_source_${testdrive.seed}' + NAMESPACE 'public' + TABLES ('numbers_${testdrive.seed}' AS "numbers") +CREATE SOURCES not yet supported + +! CREATE SOURCES + FROM POSTGRES + HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' + PUBLICATION 'mz_source_${testdrive.seed}' + NAMESPACE 'public' + TABLES ('numbers_${testdrive.seed}' AS "numbers", 'numbers_${testdrive.seed}' AS "second_numbers" (number int NOT NULL, is_prime bool NULL, name text NULL)) +CREATE SOURCES not yet supported + +## CREATE SOURCES with incorrect information should fail purification. + +! CREATE SOURCES + FROM POSTGRES + HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' + PUBLICATION 'mz_source_${testdrive.seed}' + NAMESPACE 'public' + TABLES ('numbers_${testdrive.seed}' AS "numbers" (number int NULL)) +incorrect column specification: 1 columns were specified, upstream has 3: number, is_prime, name + +! CREATE SOURCES + FROM POSTGRES + HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres' + PUBLICATION 'mz_source_${testdrive.seed}' + NAMESPACE 'public' + TABLES ('numbers_${testdrive.seed}' AS "numbers" (number int NULL, is_prime bool NULL, name text)) +incorrect column specification: specified column does not match upstream source, specified: number int4 NULL, upstream: number int4 NOT NULL