Skip to content

Commit

Permalink
sql,sql-parser: parse CREATE SOURCES FROM POSTGRES
Browse files Browse the repository at this point in the history
Parses statements like:
	CREATE SOURCES FROM POSTGRES
	  HOST        '...'
	  PUBLICATION '...'
	  NAMESPACE   '...'
	  TABLES (...)
where each of the upstream sources shares a HOST, PUBLICATION, and
NAMESPACE, but has its own table name, alias, and optional column
definitions. Leaves planning and sequencing unimplemented.
  • Loading branch information
JLDLaughlin committed Apr 2, 2021
1 parent 375d3a5 commit 4cbdae5
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 97 deletions.
1 change: 1 addition & 0 deletions src/coord/src/coord.rs
Expand Up @@ -755,6 +755,7 @@ impl Coordinator {
| Statement::CreateSchema(_)
| Statement::CreateSink(_)
| Statement::CreateSource(_)
| Statement::CreateSources(_)
| Statement::CreateTable(_)
| Statement::CreateType(_)
| Statement::CreateView(_)
Expand Down
56 changes: 46 additions & 10 deletions src/sql-parser/src/ast/defs/ddl.rs
Expand Up @@ -306,10 +306,12 @@ pub enum Connector<T: AstInfo> {
publication: String,
/// The namespace the synced table belongs to
namespace: String,
/// The name of the table to sync
table: String,
/// The expected column schema of the synced table
columns: Vec<ColumnDef<T>>,
/// The table to sync, if only a single table
/// Will be set for `CREATE SOURCE`, but not `CREATE SOURCES`
table: Option<PgTable<T>>,
/// The tables to sync, if multiple tables
/// Will be set for `CREATE SOURCES`, but not `CREATE SOURCE`
tables: Option<Vec<PgTable<T>>>,
},
PubNub {
/// PubNub's subscribe key
Expand Down Expand Up @@ -376,19 +378,23 @@ impl<T: AstInfo> AstDisplay for Connector<T> {
publication,
namespace,
table,
columns,
tables,
} => {
f.write_str("POSTGRES HOST '");
f.write_str(&display::escape_single_quote_string(conn));
f.write_str("' PUBLICATION '");
f.write_str(&display::escape_single_quote_string(publication));
f.write_str("' NAMESPACE '");
f.write_str(&display::escape_single_quote_string(namespace));
f.write_str("' TABLE '");
f.write_str(&display::escape_single_quote_string(table));
f.write_str("' (");
f.write_node(&display::comma_separated(columns));
f.write_str(")");
if let Some(table) = table {
f.write_str("' TABLE ");
f.write_str(&table.to_ast_string());
}
if let Some(tables) = tables {
f.write_str("' TABLES (");
f.write_node(&display::comma_separated(tables));
f.write_str(")");
}
}
Connector::PubNub {
subscribe_key,
Expand All @@ -405,6 +411,36 @@ impl<T: AstInfo> AstDisplay for Connector<T> {
}
impl_display_t!(Connector);

/// Information about upstream Postgres tables used for replication sources
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PgTable<T: AstInfo> {
/// The name of the table to sync
pub name: String,
/// The name for the table in Materialize. This field will
/// be set for `CREATE SOURCES`, but for `CREATE SOURCE`.
pub alias: Option<T::ObjectName>,
/// The expected column schema of the synced table
pub columns: Vec<ColumnDef<T>>,
}

impl<T: AstInfo> AstDisplay for PgTable<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str("'");
f.write_str(&display::escape_single_quote_string(&self.name));
f.write_str("'");
if let Some(alias) = &self.alias {
f.write_str(" AS ");
f.write_str(&alias.to_ast_string());
}
if !self.columns.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&self.columns));
f.write_str(")");
}
}
}
impl_display_t!(PgTable);

/// The key sources specified in the S3 source's `OBJECTS FROM` clause.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum S3KeySource {
Expand Down
16 changes: 16 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Expand Up @@ -36,6 +36,7 @@ pub enum Statement<T: AstInfo> {
CreateDatabase(CreateDatabaseStatement),
CreateSchema(CreateSchemaStatement),
CreateSource(CreateSourceStatement<T>),
CreateSources(CreateSourcesStatement<T>),
CreateSink(CreateSinkStatement<T>),
CreateView(CreateViewStatement<T>),
CreateTable(CreateTableStatement<T>),
Expand Down Expand Up @@ -90,6 +91,7 @@ impl<T: AstInfo> AstDisplay for Statement<T> {
Statement::CreateDatabase(stmt) => f.write_node(stmt),
Statement::CreateSchema(stmt) => f.write_node(stmt),
Statement::CreateSource(stmt) => f.write_node(stmt),
Statement::CreateSources(stmt) => f.write_node(stmt),
Statement::CreateSink(stmt) => f.write_node(stmt),
Statement::CreateView(stmt) => f.write_node(stmt),
Statement::CreateTable(stmt) => f.write_node(stmt),
Expand Down Expand Up @@ -397,6 +399,20 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
}
impl_display_t!(CreateSourceStatement);

/// `CREATE SOURCES`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSourcesStatement<T: AstInfo> {
pub connector: Connector<T>,
}

impl<T: AstInfo> AstDisplay for CreateSourcesStatement<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str("CREATE SOURCES FROM ");
f.write_node(&self.connector);
}
}
impl_display_t!(CreateSourcesStatement);

/// `CREATE SINK`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSinkStatement<T: AstInfo> {
Expand Down
123 changes: 95 additions & 28 deletions src/sql-parser/src/parser.rs
Expand Up @@ -100,6 +100,11 @@ enum IsLateral {
}
use IsLateral::*;

enum PostgresSourceTables {
Single,
Multiple,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ParserError {
/// The error message.
Expand Down Expand Up @@ -1420,6 +1425,8 @@ impl<'a> Parser<'a> {
} else if self.parse_keyword(SOURCE) {
self.prev_token();
self.parse_create_source()
} else if self.parse_keyword(SOURCES) {
self.parse_create_sources()
} else if self.parse_keyword(SINK) {
self.parse_create_sink()
} else if self.parse_keyword(DEFAULT) {
Expand Down Expand Up @@ -1659,6 +1666,16 @@ impl<'a> Parser<'a> {
}))
}

fn parse_create_sources(&mut self) -> Result<Statement<Raw>, ParserError> {
self.expect_keyword(FROM)?;
self.expect_keyword(POSTGRES)?;
let connector = self.parse_postgres_connector(PostgresSourceTables::Multiple)?;

Ok(Statement::CreateSources(CreateSourcesStatement {
connector,
}))
}

fn parse_create_sink(&mut self) -> Result<Statement<Raw>, ParserError> {
let if_not_exists = self.parse_if_not_exists()?;
let name = self.parse_object_name()?;
Expand Down Expand Up @@ -1722,34 +1739,7 @@ impl<'a> Parser<'a> {
channel,
})
}
POSTGRES => {
self.expect_keyword(HOST)?;
let conn = self.parse_literal_string()?;
self.expect_keyword(PUBLICATION)?;
let publication = self.parse_literal_string()?;
self.expect_keyword(NAMESPACE)?;
let namespace = self.parse_literal_string()?;
self.expect_keyword(TABLE)?;
let table = self.parse_literal_string()?;

let (columns, constraints) = self.parse_columns(Optional)?;

if !constraints.is_empty() {
return parser_err!(
self,
self.peek_prev_pos(),
"Cannot specify constraints in Postgres table definition"
);
}

Ok(Connector::Postgres {
conn,
publication,
namespace,
table,
columns,
})
}
POSTGRES => self.parse_postgres_connector(PostgresSourceTables::Single),
FILE => {
let path = self.parse_literal_string()?;
let compression = if self.parse_keyword(COMPRESSION) {
Expand Down Expand Up @@ -1825,6 +1815,83 @@ impl<'a> Parser<'a> {
}
}

fn parse_postgres_connector(
&mut self,
typ: PostgresSourceTables,
) -> Result<Connector<Raw>, ParserError> {
self.expect_keyword(HOST)?;
let conn = self.parse_literal_string()?;
self.expect_keyword(PUBLICATION)?;
let publication = self.parse_literal_string()?;
self.expect_keyword(NAMESPACE)?;
let namespace = self.parse_literal_string()?;
let (table, tables) = match typ {
PostgresSourceTables::Single => {
self.expect_keyword(TABLE)?;
(
Some(self.parse_postgres_table(PostgresSourceTables::Single)?),
None,
)
}
PostgresSourceTables::Multiple => {
self.expect_keyword(TABLES)?;
self.expect_token(&Token::LParen)?;
let mut tables = vec![];
loop {
tables.push(self.parse_postgres_table(PostgresSourceTables::Multiple)?);
if self.consume_token(&Token::Comma) {
// continue
} else if self.consume_token(&Token::RParen) {
break;
} else {
return self.expected(
self.peek_pos(),
"',' or ')' after Postgres table definition",
self.peek_token(),
);
}
}
(None, Some(tables))
}
};
Ok(Connector::Postgres {
conn,
publication,
namespace,
table,
tables,
})
}

fn parse_postgres_table(
&mut self,
typ: PostgresSourceTables,
) -> Result<PgTable<Raw>, ParserError> {
let table = self.parse_literal_string()?;
let alias = match typ {
PostgresSourceTables::Single => None,
PostgresSourceTables::Multiple => {
self.expect_keyword(AS)?;
Some(RawName::Name(self.parse_object_name()?))
}
};
let (columns, constraints) = self.parse_columns(Optional)?;

if !constraints.is_empty() {
return parser_err!(
self,
self.peek_prev_pos(),
"Cannot specify constraints in Postgres table definition"
);
}

Ok(PgTable {
name: table,
alias,
columns,
})
}

fn parse_create_view(&mut self) -> Result<Statement<Raw>, ParserError> {
let mut if_exists = if self.parse_keyword(OR) {
self.expect_keyword(REPLACE)?;
Expand Down
37 changes: 36 additions & 1 deletion src/sql-parser/tests/testdata/ddl
Expand Up @@ -501,7 +501,7 @@ CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocke
----
CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int4 NOT NULL, evolution int4)
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: "psychic", columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: Some(PgTable { name: "psychic", alias: None, columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }), tables: None }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false })

parse-statement
CREATE SOURCE psychic FROM PUBNUB SUBSCRIBE KEY 'subscribe_key' CHANNEL 'channel';
Expand All @@ -524,6 +524,41 @@ error: Expected NOT, found EXISTS
CREATE SOURCE IF EXISTS foo FROM FILE 'bar' USING SCHEMA ''
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ();
----
error: Expected literal string, found right parenthesis
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ();
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1);
----
error: Expected literal string, found SELECT
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1);
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' (pokedex_id int NOT NULL, evolution int))
----
error: Expected AS, found left parenthesis
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' (pokedex_id int NOT NULL, evolution int))
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' as "public.psychic" (pokedex_id int NOT NULL, evolution int))
----
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4))
=>
CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: None, tables: Some([PgTable { name: "psychic", alias: Some(Name(UnresolvedObjectName([Ident("public.psychic")]))), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }]) } })

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' as "public.psychic" (pokedex_id int NOT NULL, evolution int), 'another_table' as "another_one")
----
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ('psychic' AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4), 'another_table' AS another_one)
=>
CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: None, tables: Some([PgTable { name: "psychic", alias: Some(Name(UnresolvedObjectName([Ident("public.psychic")]))), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, PgTable { name: "another_table", alias: Some(Name(UnresolvedObjectName([Ident("another_one")]))), columns: [] }]) } })

parse-statement
CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES
----
Expand Down
19 changes: 16 additions & 3 deletions src/sql/src/normalize.rs
Expand Up @@ -23,6 +23,7 @@ use aws_util::aws;
use repr::ColumnName;
use sql_parser::ast::display::AstDisplay;
use sql_parser::ast::visit_mut::{self, VisitMut};
use sql_parser::ast::PgTable;
use sql_parser::ast::{
AstInfo, Connector, CreateIndexStatement, CreateSinkStatement, CreateSourceStatement,
CreateTableStatement, CreateTypeStatement, CreateViewStatement, Function, FunctionArgs, Ident,
Expand Down Expand Up @@ -266,10 +267,22 @@ pub fn create_statement(
*name = allocate_name(name)?;
*if_not_exists = false;
*materialized = false;
if let Connector::Postgres { columns, .. } = connector {
if let Connector::Postgres { table, tables, .. } = connector {
let visit_table_columns =
|normalizer: &mut QueryNormalizer, table: &mut PgTable<Aug>| {
for column in table.columns.iter_mut() {
normalizer.visit_column_def_mut(column)
}
};

let mut normalizer = QueryNormalizer::new(scx);
for c in columns {
normalizer.visit_column_def_mut(c);
if let Some(table) = table {
visit_table_columns(&mut normalizer, table)
}
if let Some(tables) = tables {
for table in tables {
visit_table_columns(&mut normalizer, table)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/plan/statement.rs
Expand Up @@ -109,6 +109,7 @@ pub fn describe(
Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
Statement::CreateSources(stmt) => ddl::describe_create_sources(&scx, stmt)?,
Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
Expand Down Expand Up @@ -193,6 +194,7 @@ pub fn plan(
Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
Statement::CreateSources(stmt) => ddl::plan_create_sources(scx, stmt),
Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt, params),
Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
Expand Down

0 comments on commit 4cbdae5

Please sign in to comment.