Skip to content

Commit

Permalink
sql,sql-parser: parse CREATE SOURCES FROM POSTGRES
Browse files Browse the repository at this point in the history
Parses statements like:
	CREATE SOURCES FROM POSTGRES
	  HOST        '...'
	  PUBLICATION '...'
	  NAMESPACE   '...'
	  TABLES (...)
where each of the upstream sources shares a HOST, PUBLICATION, and
NAMESPACE, but has its own table name, alias, and optional column
definitions. Leaves planning and sequencing unimplemented.
  • Loading branch information
JLDLaughlin committed Apr 2, 2021
1 parent 375d3a5 commit aa35f6a
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 123 deletions.
1 change: 1 addition & 0 deletions src/coord/src/coord.rs
Expand Up @@ -755,6 +755,7 @@ impl Coordinator {
| Statement::CreateSchema(_)
| Statement::CreateSink(_)
| Statement::CreateSource(_)
| Statement::CreateSources(_)
| Statement::CreateTable(_)
| Statement::CreateType(_)
| Statement::CreateView(_)
Expand Down
54 changes: 44 additions & 10 deletions src/sql-parser/src/ast/defs/ddl.rs
Expand Up @@ -306,10 +306,12 @@ pub enum Connector<T: AstInfo> {
publication: String,
/// The namespace the synced table belongs to
namespace: String,
/// The name of the table to sync
table: String,
/// The expected column schema of the synced table
columns: Vec<ColumnDef<T>>,
/// The table to sync, if only a single table
/// Will be set for `CREATE SOURCE`, but not `CREATE SOURCES`
table: Option<PgTable<T>>,
/// The tables to sync, if multiple tables
/// Will be set for `CREATE SOURCES`, but not `CREATE SOURCE`
tables: Option<Vec<PgTable<T>>>,
},
PubNub {
/// PubNub's subscribe key
Expand Down Expand Up @@ -376,19 +378,23 @@ impl<T: AstInfo> AstDisplay for Connector<T> {
publication,
namespace,
table,
columns,
tables,
} => {
f.write_str("POSTGRES HOST '");
f.write_str(&display::escape_single_quote_string(conn));
f.write_str("' PUBLICATION '");
f.write_str(&display::escape_single_quote_string(publication));
f.write_str("' NAMESPACE '");
f.write_str(&display::escape_single_quote_string(namespace));
f.write_str("' TABLE '");
f.write_str(&display::escape_single_quote_string(table));
f.write_str("' (");
f.write_node(&display::comma_separated(columns));
f.write_str(")");
if let Some(table) = table {
f.write_str("' TABLE ");
f.write_str(&table.to_ast_string());
}
if let Some(tables) = tables {
f.write_str("' TABLES (");
f.write_node(&display::comma_separated(tables));
f.write_str(")");
}
}
Connector::PubNub {
subscribe_key,
Expand All @@ -405,6 +411,34 @@ impl<T: AstInfo> AstDisplay for Connector<T> {
}
impl_display_t!(Connector);

/// Information about upstream Postgres tables used for replication sources
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PgTable<T: AstInfo> {
/// The name of the table to sync
pub name: UnresolvedObjectName,
/// The name for the table in Materialize. This field will
/// be set for `CREATE SOURCES`, but for `CREATE SOURCE`.
pub alias: Option<T::ObjectName>,
/// The expected column schema of the synced table
pub columns: Vec<ColumnDef<T>>,
}

impl<T: AstInfo> AstDisplay for PgTable<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_node(&self.name);
if let Some(alias) = &self.alias {
f.write_str(" AS ");
f.write_str(&alias.to_ast_string());
}
if !self.columns.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&self.columns));
f.write_str(")");
}
}
}
impl_display_t!(PgTable);

/// The key sources specified in the S3 source's `OBJECTS FROM` clause.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum S3KeySource {
Expand Down
8 changes: 8 additions & 0 deletions src/sql-parser/src/ast/defs/name.rs
Expand Up @@ -117,6 +117,14 @@ impl UnresolvedObjectName {
assert!(n.len() <= 3 && n.len() > 0);
UnresolvedObjectName(n.iter().map(|n| (*n).into()).collect::<Vec<_>>())
}

/// Returns the item name of an `UnresolvedObjectName`.
pub fn item(&self) -> &str {
if self.0.is_empty() {
return "";
}
self.0[self.0.len() - 1].as_str()
}
}

impl AstDisplay for UnresolvedObjectName {
Expand Down
16 changes: 16 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Expand Up @@ -36,6 +36,7 @@ pub enum Statement<T: AstInfo> {
CreateDatabase(CreateDatabaseStatement),
CreateSchema(CreateSchemaStatement),
CreateSource(CreateSourceStatement<T>),
CreateSources(CreateSourcesStatement<T>),
CreateSink(CreateSinkStatement<T>),
CreateView(CreateViewStatement<T>),
CreateTable(CreateTableStatement<T>),
Expand Down Expand Up @@ -90,6 +91,7 @@ impl<T: AstInfo> AstDisplay for Statement<T> {
Statement::CreateDatabase(stmt) => f.write_node(stmt),
Statement::CreateSchema(stmt) => f.write_node(stmt),
Statement::CreateSource(stmt) => f.write_node(stmt),
Statement::CreateSources(stmt) => f.write_node(stmt),
Statement::CreateSink(stmt) => f.write_node(stmt),
Statement::CreateView(stmt) => f.write_node(stmt),
Statement::CreateTable(stmt) => f.write_node(stmt),
Expand Down Expand Up @@ -397,6 +399,20 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
}
impl_display_t!(CreateSourceStatement);

/// `CREATE SOURCES`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSourcesStatement<T: AstInfo> {
pub connector: Connector<T>,
}

impl<T: AstInfo> AstDisplay for CreateSourcesStatement<T> {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str("CREATE SOURCES FROM ");
f.write_node(&self.connector);
}
}
impl_display_t!(CreateSourcesStatement);

/// `CREATE SINK`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateSinkStatement<T: AstInfo> {
Expand Down
112 changes: 84 additions & 28 deletions src/sql-parser/src/parser.rs
Expand Up @@ -100,6 +100,11 @@ enum IsLateral {
}
use IsLateral::*;

enum PostgresSourceTables {
Single,
Multiple,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ParserError {
/// The error message.
Expand Down Expand Up @@ -1420,6 +1425,8 @@ impl<'a> Parser<'a> {
} else if self.parse_keyword(SOURCE) {
self.prev_token();
self.parse_create_source()
} else if self.parse_keyword(SOURCES) {
self.parse_create_sources()
} else if self.parse_keyword(SINK) {
self.parse_create_sink()
} else if self.parse_keyword(DEFAULT) {
Expand Down Expand Up @@ -1659,6 +1666,16 @@ impl<'a> Parser<'a> {
}))
}

fn parse_create_sources(&mut self) -> Result<Statement<Raw>, ParserError> {
self.expect_keyword(FROM)?;
self.expect_keyword(POSTGRES)?;
let connector = self.parse_postgres_connector(PostgresSourceTables::Multiple)?;

Ok(Statement::CreateSources(CreateSourcesStatement {
connector,
}))
}

fn parse_create_sink(&mut self) -> Result<Statement<Raw>, ParserError> {
let if_not_exists = self.parse_if_not_exists()?;
let name = self.parse_object_name()?;
Expand Down Expand Up @@ -1722,34 +1739,7 @@ impl<'a> Parser<'a> {
channel,
})
}
POSTGRES => {
self.expect_keyword(HOST)?;
let conn = self.parse_literal_string()?;
self.expect_keyword(PUBLICATION)?;
let publication = self.parse_literal_string()?;
self.expect_keyword(NAMESPACE)?;
let namespace = self.parse_literal_string()?;
self.expect_keyword(TABLE)?;
let table = self.parse_literal_string()?;

let (columns, constraints) = self.parse_columns(Optional)?;

if !constraints.is_empty() {
return parser_err!(
self,
self.peek_prev_pos(),
"Cannot specify constraints in Postgres table definition"
);
}

Ok(Connector::Postgres {
conn,
publication,
namespace,
table,
columns,
})
}
POSTGRES => self.parse_postgres_connector(PostgresSourceTables::Single),
FILE => {
let path = self.parse_literal_string()?;
let compression = if self.parse_keyword(COMPRESSION) {
Expand Down Expand Up @@ -1825,6 +1815,72 @@ impl<'a> Parser<'a> {
}
}

fn parse_postgres_connector(
&mut self,
typ: PostgresSourceTables,
) -> Result<Connector<Raw>, ParserError> {
self.expect_keyword(HOST)?;
let conn = self.parse_literal_string()?;
self.expect_keyword(PUBLICATION)?;
let publication = self.parse_literal_string()?;
self.expect_keyword(NAMESPACE)?;
let namespace = self.parse_literal_string()?;
let (table, tables) = match typ {
PostgresSourceTables::Single => {
self.expect_keyword(TABLE)?;
(
Some(self.parse_postgres_table(PostgresSourceTables::Single)?),
None,
)
}
PostgresSourceTables::Multiple => {
self.expect_keyword(TABLES)?;
self.expect_token(&Token::LParen)?;
let tables = self.parse_comma_separated(|parser| {
parser.parse_postgres_table(PostgresSourceTables::Multiple)
})?;
self.expect_token(&Token::RParen)?;
(None, Some(tables))
}
};
Ok(Connector::Postgres {
conn,
publication,
namespace,
table,
tables,
})
}

fn parse_postgres_table(
&mut self,
typ: PostgresSourceTables,
) -> Result<PgTable<Raw>, ParserError> {
let name = self.parse_object_name()?;
let alias = match typ {
PostgresSourceTables::Single => None,
PostgresSourceTables::Multiple => {
self.expect_keyword(AS)?;
Some(RawName::Name(self.parse_object_name()?))
}
};
let (columns, constraints) = self.parse_columns(Optional)?;

if !constraints.is_empty() {
return parser_err!(
self,
self.peek_prev_pos(),
"Cannot specify constraints in Postgres table definition"
);
}

Ok(PgTable {
name,
alias,
columns,
})
}

fn parse_create_view(&mut self) -> Result<Statement<Raw>, ParserError> {
let mut if_exists = if self.parse_keyword(OR) {
self.expect_keyword(REPLACE)?;
Expand Down
41 changes: 38 additions & 3 deletions src/sql-parser/tests/testdata/ddl
Expand Up @@ -497,11 +497,11 @@ CREATE SOURCE source FROM KAFKA BROKER 'broker' TOPIC 'topic' WITH (start_offset
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("source")]), col_names: [], connector: Kafka { broker: "broker", topic: "topic", key: None }, with_options: [Value { name: Ident("start_offset"), value: Array([Number("2"), Number("40000000")]) }], format: Some(Avro(Schema { schema: File("path"), with_options: [] })), envelope: Upsert(Some(Text)), if_not_exists: false, materialized: false })

parse-statement
CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int NOT NULL, evolution int);
CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE "psychic" (pokedex_id int NOT NULL, evolution int);
----
CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE 'psychic' (pokedex_id int4 NOT NULL, evolution int4)
CREATE SOURCE psychic FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLE psychic (pokedex_id int4 NOT NULL, evolution int4)
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: "psychic", columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("psychic")]), col_names: [], connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: Some(PgTable { name: UnresolvedObjectName([Ident("psychic")]), alias: None, columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }), tables: None }, with_options: [], format: None, envelope: None, if_not_exists: false, materialized: false })

parse-statement
CREATE SOURCE psychic FROM PUBNUB SUBSCRIBE KEY 'subscribe_key' CHANNEL 'channel';
Expand All @@ -524,6 +524,41 @@ error: Expected NOT, found EXISTS
CREATE SOURCE IF EXISTS foo FROM FILE 'bar' USING SCHEMA ''
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ();
----
error: Expected identifier, found right parenthesis
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ();
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1);
----
error: Expected AS, found number
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (SELECT 1);
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int))
----
error: Expected AS, found left parenthesis
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("public.psychic" (pokedex_id int NOT NULL, evolution int))
^

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (psychic as "public.psychic" (pokedex_id int NOT NULL, evolution int))
----
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES (psychic AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4))
=>
CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: None, tables: Some([PgTable { name: UnresolvedObjectName([Ident("psychic")]), alias: Some(Name(UnresolvedObjectName([Ident("public.psychic")]))), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }]) } })

parse-statement
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("random.psychic" as "public.psychic" (pokedex_id int NOT NULL, evolution int), "another_table" as "another_one")
----
CREATE SOURCES FROM POSTGRES HOST 'host=kanto user=ash password=teamrocket dbname=pokemon' PUBLICATION 'red' NAMESPACE 'generation1' TABLES ("random.psychic" AS "public.psychic" (pokedex_id int4 NOT NULL, evolution int4), another_table AS another_one)
=>
CreateSources(CreateSourcesStatement { connector: Postgres { conn: "host=kanto user=ash password=teamrocket dbname=pokemon", publication: "red", namespace: "generation1", table: None, tables: Some([PgTable { name: UnresolvedObjectName([Ident("random.psychic")]), alias: Some(Name(UnresolvedObjectName([Ident("public.psychic")]))), columns: [ColumnDef { name: Ident("pokedex_id"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [ColumnOptionDef { name: None, option: NotNull }] }, ColumnDef { name: Ident("evolution"), data_type: Other { name: Name(UnresolvedObjectName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }] }, PgTable { name: UnresolvedObjectName([Ident("another_table")]), alias: Some(Name(UnresolvedObjectName([Ident("another_one")]))), columns: [] }]) } })

parse-statement
CREATE SINK foo FROM bar INTO FILE 'baz' FORMAT BYTES
----
Expand Down
19 changes: 16 additions & 3 deletions src/sql/src/normalize.rs
Expand Up @@ -23,6 +23,7 @@ use aws_util::aws;
use repr::ColumnName;
use sql_parser::ast::display::AstDisplay;
use sql_parser::ast::visit_mut::{self, VisitMut};
use sql_parser::ast::PgTable;
use sql_parser::ast::{
AstInfo, Connector, CreateIndexStatement, CreateSinkStatement, CreateSourceStatement,
CreateTableStatement, CreateTypeStatement, CreateViewStatement, Function, FunctionArgs, Ident,
Expand Down Expand Up @@ -266,10 +267,22 @@ pub fn create_statement(
*name = allocate_name(name)?;
*if_not_exists = false;
*materialized = false;
if let Connector::Postgres { columns, .. } = connector {
if let Connector::Postgres { table, tables, .. } = connector {
let visit_table_columns =
|normalizer: &mut QueryNormalizer, table: &mut PgTable<Aug>| {
for column in table.columns.iter_mut() {
normalizer.visit_column_def_mut(column)
}
};

let mut normalizer = QueryNormalizer::new(scx);
for c in columns {
normalizer.visit_column_def_mut(c);
if let Some(table) = table {
visit_table_columns(&mut normalizer, table)
}
if let Some(tables) = tables {
for table in tables {
visit_table_columns(&mut normalizer, table)
}
}
}
}
Expand Down

0 comments on commit aa35f6a

Please sign in to comment.