Skip to content

Commit

Permalink
sql,sql-parser: parse CREATE SOURCES FROM POSTGRES
Browse files Browse the repository at this point in the history
Parses statements like:
	CREATE SOURCES FROM POSTGRES
	  HOST        '...'
	  PUBLICATION '...'
	  NAMESPACE   '...'
	  TABLES (...)
where each of the upstream sources shares a HOST, PUBLICATION, and
NAMESPACE, but has its own table name, alias, and optional column
definitions. Leaves planning and sequencing unimplemented.
  • Loading branch information
JLDLaughlin committed Apr 7, 2021
1 parent 74228e4 commit 3b7f96e
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 64 deletions.
1 change: 1 addition & 0 deletions src/coord/src/coord.rs
Expand Up @@ -757,6 +757,7 @@ impl Coordinator {
| Statement::CreateSchema(_)
| Statement::CreateSink(_)
| Statement::CreateSource(_)
| Statement::CreateSources(_)
| Statement::CreateTable(_)
| Statement::CreateType(_)
| Statement::CreateView(_)
Expand Down
63 changes: 63 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Expand Up @@ -405,6 +405,69 @@ impl<T: AstInfo> AstDisplay for Connector<T> {
}
impl_display_t!(Connector);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum MultiConnector<T: AstInfo> {
Postgres {
/// The postgres connection string
conn: String,
/// The name of the publication to sync
publication: String,
/// The namespace the synced table belongs to
namespace: String,
/// The tables to sync
tables: Vec<PgTable<T>>,
},
}

impl<T: AstInfo> AstDisplay for MultiConnector<T> {
fn fmt(&self, f: &mut AstFormatter) {
match self {
MultiConnector::Postgres {
conn,
publication,
namespace,
tables,
} => {
f.write_str("POSTGRES HOST '");
f.write_str(&display::escape_single_quote_string(conn));
f.write_str("' PUBLICATION '");
f.write_str(&display::escape_single_quote_string(publication));
f.write_str("' NAMESPACE '");
f.write_str(&display::escape_single_quote_string(namespace));
f.write_str("' TABLES (");
f.write_node(&display::comma_separated(tables));
f.write_str(")");
}
}
}
}
impl_display_t!(MultiConnector);

/// Information about upstream Postgres tables used for replication sources
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PgTable<T: AstInfo> {
/// The name of the table to sync
pub name: String,
/// The name for the table in Materialize
pub alias: T::ObjectName,
/// The expected column schema of the synced table
pub columns: Vec<ColumnDef<T>>,
}

impl<T: AstInfo> AstDisplay for PgTable<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str(&self.name);
f.write_str(" AS ");
f.write_str(self.alias.to_ast_string());
if !self.columns.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&self.columns));
f.write_str(")");
}
}
}
impl_display_t!(PgTable);

/// The key sources specified in the S3 source's `OBJECTS FROM` clause.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum S3KeySource {
Expand Down
20 changes: 18 additions & 2 deletions src/sql-parser/src/ast/defs/statement.rs
Expand Up @@ -20,8 +20,8 @@

use crate::ast::display::{self, AstDisplay, AstFormatter};
use crate::ast::{
AstInfo, ColumnDef, Connector, DataType, Envelope, Expr, Format, Ident, Query, TableConstraint,
UnresolvedObjectName, Value,
AstInfo, ColumnDef, Connector, DataType, Envelope, Expr, Format, Ident, MultiConnector, Query,
TableConstraint, UnresolvedObjectName, Value,
};

/// A top-level statement (SELECT, INSERT, CREATE, etc.)
Expand All @@ -36,6 +36,7 @@ pub enum Statement<T: AstInfo> {
CreateDatabase(CreateDatabaseStatement),
CreateSchema(CreateSchemaStatement),
CreateSource(CreateSourceStatement<T>),
CreateSources(CreateSourcesStatement<T>),
CreateSink(CreateSinkStatement<T>),
CreateView(CreateViewStatement<T>),
CreateTable(CreateTableStatement<T>),
Expand Down Expand Up @@ -90,6 +91,7 @@ impl<T: AstInfo> AstDisplay for Statement<T> {
Statement::CreateDatabase(stmt) => f.write_node(stmt),
Statement::CreateSchema(stmt) => f.write_node(stmt),
Statement::CreateSource(stmt) => f.write_node(stmt),
Statement::CreateSources(stmt) => f.write_node(stmt),
Statement::CreateSink(stmt) => f.write_node(stmt),
Statement::CreateView(stmt) => f.write_node(stmt),
Statement::CreateTable(stmt) => f.write_node(stmt),
Expand Down Expand Up @@ -397,6 +399,20 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
}
impl_display_t!(CreateSourceStatement);

/// `CREATE SOURCES`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSourcesStatement<T: AstInfo> {
pub connector: MultiConnector<T>,
}

impl<T: AstInfo> AstDisplay for CreateSourcesStatement<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str("CREATE SOURCES FROM ");
f.write_node(&self.connector);
}
}
impl_display_t!(CreateSourcesStatement);

/// `CREATE SINK`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSinkStatement<T: AstInfo> {
Expand Down
70 changes: 70 additions & 0 deletions src/sql-parser/src/parser.rs
Expand Up @@ -1420,6 +1420,8 @@ impl<'a> Parser<'a> {
} else if self.parse_keyword(SOURCE) {
self.prev_token();
self.parse_create_source()
} else if self.parse_keyword(SOURCES) {
self.parse_create_sources()
} else if self.parse_keyword(SINK) {
self.parse_create_sink()
} else if self.parse_keyword(DEFAULT) {
Expand Down Expand Up @@ -1659,6 +1661,15 @@ impl<'a> Parser<'a> {
}))
}

fn parse_create_sources(&mut self) -> Result<Statement<Raw>, ParserError> {
self.expect_keyword(FROM)?;
let connector = self.parse_multi_connector()?;

Ok(Statement::CreateSources(CreateSourcesStatement {
connector,
}))
}

fn parse_create_sink(&mut self) -> Result<Statement<Raw>, ParserError> {
let if_not_exists = self.parse_if_not_exists()?;
let name = self.parse_object_name()?;
Expand Down Expand Up @@ -1825,6 +1836,30 @@ impl<'a> Parser<'a> {
}
}

fn parse_multi_connector(&mut self) -> Result<MultiConnector<Raw>, ParserError> {
match self.expect_one_of_keywords(&[POSTGRES])? {
POSTGRES => {
self.expect_keyword(HOST)?;
let conn = self.parse_literal_string()?;
self.expect_keyword(PUBLICATION)?;
let publication = self.parse_literal_string()?;
self.expect_keyword(NAMESPACE)?;
let namespace = self.parse_literal_string()?;
self.expect_keyword(TABLES)?;
self.expect_token(&Token::LParen)?;
let tables = self.parse_postgres_tables()?;

Ok(MultiConnector::Postgres {
conn,
publication,
namespace,
tables,
})
}
_ => unreachable!(),
}
}

fn parse_create_view(&mut self) -> Result<Statement<Raw>, ParserError> {
let mut if_exists = if self.parse_keyword(OR) {
self.expect_keyword(REPLACE)?;
Expand Down Expand Up @@ -2052,6 +2087,41 @@ impl<'a> Parser<'a> {
}))
}

fn parse_postgres_tables(&mut self) -> Result<Vec<PgTable<Raw>>, ParserError> {
let mut tables = vec![];
loop {
let name = self.parse_literal_string()?;
self.expect_keyword(AS)?;
let alias = RawName::Name(self.parse_object_name()?);
let (columns, constraints) = self.parse_columns(Optional)?;
if !constraints.is_empty() {
return parser_err!(
self,
self.peek_prev_pos(),
"Cannot specify constraints in Postgres table definition"
);
}
tables.push(PgTable {
name,
alias,
columns,
});

if self.consume_token(&Token::Comma) {
// Continue.
} else if self.consume_token(&Token::RParen) {
break;
} else {
return self.expected(
self.peek_pos(),
"',' or ')' after table definition",
self.peek_token(),
);
}
}
Ok(tables)
}

fn parse_columns(
&mut self,
optional: IsOptional,
Expand Down
35 changes: 35 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Expand Up @@ -524,6 +524,41 @@ error: Expected NOT, found EXISTS
CREATE SOURCE IF EXISTS foo FROM FILE 'bar' USING SCHEMA ''
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ();
----
error: Expected identifier, found right parenthesis
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ();
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1);
----
error: Expected AS, found number
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1);
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int))
----
error: Expected AS, found left parenthesis
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int))
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (psychic as "public.psychic" (pokedex_id int NOT NULL, evolution int))
----
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (psychic AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4))
=>
CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", tables: [PgTable { name: UnresolvedObjectName([Ident("psychic")]), alias: Name(UnresolvedObjectName([Ident("public.psychic")])), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }] } })

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("random.psychic" as "public.psychic" (pokedex_id int NOT NULL, evolution int), "another_table" as "another_one")
----
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("random.psychic" AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4), another_table AS another_one)
=>
CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", tables: [PgTable { name: UnresolvedObjectName([Ident("random.psychic")]), alias: Name(UnresolvedObjectName([Ident("public.psychic")])), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, PgTable { name: UnresolvedObjectName([Ident("another_table")]), alias: Name(UnresolvedObjectName([Ident("another_one")])), columns: [] }] } })

parse-statement
CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES
----
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/plan/statement.rs
Expand Up @@ -109,6 +109,7 @@ pub fn describe(
Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
Statement::CreateSources(stmt) => ddl::describe_create_sources(&scx, stmt)?,
Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
Expand Down Expand Up @@ -193,6 +194,7 @@ pub fn plan(
Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
Statement::CreateSources(stmt) => ddl::plan_create_sources(scx, stmt),
Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt, params),
Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
Expand Down
22 changes: 18 additions & 4 deletions src/sql/src/plan/statement/ddl.rs
Expand Up @@ -55,10 +55,10 @@ use crate::ast::{
AlterIndexOptionsList, AlterIndexOptionsStatement, AlterObjectRenameStatement, AvroSchema,
ColumnOption, Compression, Connector, CreateDatabaseStatement, CreateIndexStatement,
CreateRoleOption, CreateRoleStatement, CreateSchemaStatement, CreateSinkStatement,
CreateSourceStatement, CreateTableStatement, CreateTypeAs, CreateTypeStatement,
CreateViewStatement, DataType, DropDatabaseStatement, DropObjectsStatement, Envelope, Expr,
Format, Ident, IfExistsBehavior, ObjectType, Raw, SqlOption, Statement, UnresolvedObjectName,
Value, WithOption,
CreateSourceStatement, CreateSourcesStatement, CreateTableStatement, CreateTypeAs,
CreateTypeStatement, CreateViewStatement, DataType, DropDatabaseStatement,
DropObjectsStatement, Envelope, Expr, Format, Ident, IfExistsBehavior, ObjectType, Raw,
SqlOption, Statement, UnresolvedObjectName, Value, WithOption,
};
use crate::catalog::{CatalogItem, CatalogItemType};
use crate::kafka_util;
Expand Down Expand Up @@ -227,6 +227,13 @@ pub fn describe_create_source(
Ok(StatementDesc::new(None))
}

pub fn describe_create_sources(
_: &StatementContext,
_: CreateSourcesStatement<Raw>,
) -> Result<StatementDesc, anyhow::Error> {
Ok(StatementDesc::new(None))
}

// Flatten one Debezium entry ("before" or "after")
// into its corresponding data fields, plus an extra column for the diff.
fn plan_dbz_flatten_one(
Expand Down Expand Up @@ -1027,6 +1034,13 @@ pub fn plan_create_source(
})
}

pub fn plan_create_sources(
_scx: &StatementContext,
_stmt: CreateSourcesStatement<Raw>,
) -> Result<Plan, anyhow::Error> {
unsupported!("CREATE SOURCES");
}

pub fn describe_create_view(
_: &StatementContext,
_: CreateViewStatement<Raw>,
Expand Down

0 comments on commit 3b7f96e

Please sign in to comment.