Skip to content

Commit

Permalink
sql,sql-parser: parse CREATE SOURCES
Browse files Browse the repository at this point in the history
Parses CREATE SOURCES (CREATE SOURCE .., ..) statements, which will
be implemented only for Postgres CDC sources.
  • Loading branch information
JLDLaughlin committed Apr 1, 2021
1 parent 375d3a5 commit 3d8c5c0
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/coord/src/coord.rs
Expand Up @@ -755,6 +755,7 @@ impl Coordinator {
| Statement::CreateSchema(_)
| Statement::CreateSink(_)
| Statement::CreateSource(_)
| Statement::CreateSources(_)
| Statement::CreateTable(_)
| Statement::CreateType(_)
| Statement::CreateView(_)
Expand Down
23 changes: 23 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Expand Up @@ -36,6 +36,7 @@ pub enum Statement<T: AstInfo> {
CreateDatabase(CreateDatabaseStatement),
CreateSchema(CreateSchemaStatement),
CreateSource(CreateSourceStatement<T>),
CreateSources(CreateSourcesStatement<T>),
CreateSink(CreateSinkStatement<T>),
CreateView(CreateViewStatement<T>),
CreateTable(CreateTableStatement<T>),
Expand Down Expand Up @@ -90,6 +91,7 @@ impl<T: AstInfo> AstDisplay for Statement<T> {
Statement::CreateDatabase(stmt) => f.write_node(stmt),
Statement::CreateSchema(stmt) => f.write_node(stmt),
Statement::CreateSource(stmt) => f.write_node(stmt),
Statement::CreateSources(stmt) => f.write_node(stmt),
Statement::CreateSink(stmt) => f.write_node(stmt),
Statement::CreateView(stmt) => f.write_node(stmt),
Statement::CreateTable(stmt) => f.write_node(stmt),
Expand Down Expand Up @@ -397,6 +399,27 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
}
impl_display_t!(CreateSourceStatement);

/// `CREATE SOURCES`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSourcesStatement<T: AstInfo> {
pub sources: Vec<CreateSourceStatement<T>>,
}

impl<T: AstInfo> AstDisplay for CreateSourcesStatement<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str("CREATE SOURCES (");
f.write_str(
&self
.sources
.iter()
.map(|s| s.to_ast_string())
.collect::<Vec<String>>()
.join(", "),
);
f.write_str(")");
}
}

/// `CREATE SINK`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSinkStatement<T: AstInfo> {
Expand Down
35 changes: 35 additions & 0 deletions src/sql-parser/src/parser.rs
Expand Up @@ -1420,6 +1420,9 @@ impl<'a> Parser<'a> {
} else if self.parse_keyword(SOURCE) {
self.prev_token();
self.parse_create_source()
} else if self.parse_keyword(SOURCES) {
self.prev_token();
self.parse_create_sources()
} else if self.parse_keyword(SINK) {
self.parse_create_sink()
} else if self.parse_keyword(DEFAULT) {
Expand Down Expand Up @@ -1659,6 +1662,38 @@ impl<'a> Parser<'a> {
}))
}

fn parse_create_sources(&mut self) -> Result<Statement<Raw>, ParserError> {
self.expect_keyword(SOURCES)?;
self.expect_token(&Token::LParen)?;
let mut sources = vec![];

loop {
self.expect_keyword(CREATE)?;
match self.parse_create_source() {
Ok(Statement::CreateSource(stmt)) => sources.push(stmt),
_ => {
return self.expected(
self.peek_pos(),
"CREATE SOURCE statement",
self.peek_token(),
)
}
}
if self.consume_token(&Token::Comma) {
// continue
} else if self.consume_token(&Token::RParen) {
break;
} else {
return self.expected(
self.peek_pos(),
"',' or ')' after source definition",
self.peek_token(),
);
}
}
Ok(Statement::CreateSources(CreateSourcesStatement { sources }))
}

fn parse_create_sink(&mut self) -> Result<Statement<Raw>, ParserError> {
let if_not_exists = self.parse_if_not_exists()?;
let name = self.parse_object_name()?;
Expand Down
35 changes: 35 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Expand Up @@ -524,6 +524,41 @@ error: Expected NOT, found EXISTS
CREATE SOURCE IF EXISTS foo FROM FILE 'bar' USING SCHEMA ''
^

parse-statement
CREATE SOURCES ()
----
error: Expected CREATE, found right parenthesis
CREATE SOURCES ()
^

parse-statement
CREATE SOURCES (SELECT 1,)
----
error: Expected CREATE, found SELECT
CREATE SOURCES (SELECT 1,)
^

parse-statement
CREATE SOURCES (CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES)
----
error: Expected CREATE SOURCE statement, found SINK
CREATE SOURCES (CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES)
^

parse-statement
CREATE SOURCES (CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int NOT NULL, evolution int))
----
CREATE SOURCES (CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int4 NOT NULL, evolution int4))
=>
CreateSources(CreateSourcesStatement { sources: [CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: "psychic", columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false }] })

parse-statement
CREATE SOURCES (CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int NOT NULL, evolution int), CREATE SOURCE different FROM POSTGRES HOST 'host=hosty user=usery password=passwordy dbname=dbnamey' PUBLICATION 'different_pub' NAMESPACE 'namespacey' TABLE 'different_table', CREATE SOURCE foo FROM FILE 'bar' FORMAT AVRO USING SCHEMA 'baz');
----
CREATE SOURCES (CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int4 NOT NULL, evolution int4), CREATE SOURCE different FROM POSTGRES HOST 'host=hosty user=usery password=passwordy dbname=dbnamey' PUBLICATION 'different_pub' NAMESPACE 'namespacey' TABLE 'different_table' (), CREATE SOURCE foo FROM FILE 'bar' FORMAT AVRO USING SCHEMA 'baz')
=>
CreateSources(CreateSourcesStatement { sources: [CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: "psychic", columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false }, CreateSourceStatement { name: UnresolvedObjectName([Ident("different")]), col_names: [], connector: Postgres { conn: "host=hosty user=usery password=passwordy dbname=dbnamey", publication: "different_pub", namespace: "namespacey", table: "different_table", columns: [] }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false }, CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [], format: Some(Avro(Schema { schema: Inline("baz"), with_options: [] })), envelope: None, if_not_exists: false, materialized: false }] })

parse-statement
CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES
----
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/plan/statement.rs
Expand Up @@ -109,6 +109,7 @@ pub fn describe(
Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
Statement::CreateSources(stmt) => ddl::describe_create_sources(&scx, stmt)?,
Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
Expand Down Expand Up @@ -193,6 +194,7 @@ pub fn plan(
Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
Statement::CreateSources(stmt) => ddl::plan_create_sources(scx, stmt),
Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt, params),
Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
Expand Down
28 changes: 24 additions & 4 deletions src/sql/src/plan/statement/ddl.rs
Expand Up @@ -55,10 +55,10 @@ use crate::ast::{
AlterIndexOptionsList, AlterIndexOptionsStatement, AlterObjectRenameStatement, AvroSchema,
ColumnOption, Compression, Connector, CreateDatabaseStatement, CreateIndexStatement,
CreateRoleOption, CreateRoleStatement, CreateSchemaStatement, CreateSinkStatement,
CreateSourceStatement, CreateTableStatement, CreateTypeAs, CreateTypeStatement,
CreateViewStatement, DataType, DropDatabaseStatement, DropObjectsStatement, Envelope, Expr,
Format, Ident, IfExistsBehavior, ObjectType, Raw, SqlOption, Statement, UnresolvedObjectName,
Value, WithOption,
CreateSourceStatement, CreateSourcesStatement, CreateTableStatement, CreateTypeAs,
CreateTypeStatement, CreateViewStatement, DataType, DropDatabaseStatement,
DropObjectsStatement, Envelope, Expr, Format, Ident, IfExistsBehavior, ObjectType, Raw,
SqlOption, Statement, UnresolvedObjectName, Value, WithOption,
};
use crate::catalog::{CatalogItem, CatalogItemType};
use crate::kafka_util;
Expand Down Expand Up @@ -227,6 +227,13 @@ pub fn describe_create_source(
Ok(StatementDesc::new(None))
}

pub fn describe_create_sources(
_: &StatementContext,
_: CreateSourcesStatement<Raw>,
) -> Result<StatementDesc, anyhow::Error> {
Ok(StatementDesc::new(None))
}

// Flatten one Debezium entry ("before" or "after")
// into its corresponding data fields, plus an extra column for the diff.
fn plan_dbz_flatten_one(
Expand Down Expand Up @@ -1027,6 +1034,19 @@ pub fn plan_create_source(
})
}

pub fn plan_create_sources(
_scx: &StatementContext,
stmt: CreateSourcesStatement<Raw>,
) -> Result<Plan, anyhow::Error> {
if stmt.sources.iter().any(|s| match &s.connector {
Connector::Postgres { .. } => false,
_ => true,
}) {
unsupported!("CREATE SOURCES for non-Postgres sources")
}
unsupported!("CREATE SOURCES")
}

pub fn describe_create_view(
_: &StatementContext,
_: CreateViewStatement<Raw>,
Expand Down
20 changes: 20 additions & 0 deletions test/pg-cdc/pg-cdc.td
Expand Up @@ -88,3 +88,23 @@ incorrect column specification: specified column does not match upstream source,
NAMESPACE 'public'
TABLE 'numbers_${testdrive.seed}' (number int NOT NULL, is_prime bool, name text NOT NULL)
incorrect column specification: specified column does not match upstream source, specified: name text NOT NULL, upstream: name text NULL


## CREATE SOURCES

! CREATE SOURCES (
CREATE MATERIALIZED SOURCE "numbers"
FROM POSTGRES
HOST 'host=postgres port=5432 user=postgres password=postgres dbname=postgres'
PUBLICATION 'mz_source_${testdrive.seed}'
NAMESPACE 'public'
TABLE 'numbers_${testdrive.seed}'
)
CREATE SOURCES not yet supported

! CREATE SOURCES (
CREATE MATERIALIZED SOURCE static_csv
FROM FILE 'fake/static.csv'
FORMAT CSV WITH 3 COLUMNS
)
CREATE SOURCES for non-Postgres sources

0 comments on commit 3d8c5c0

Please sign in to comment.