Skip to content

Commit

Permalink
feat(spanner/spansql): add support for missing DDL syntax for ALTER C…
Browse files Browse the repository at this point in the history
…HANGE STREAM (#7429)
  • Loading branch information
toga4 committed Feb 20, 2023
1 parent cf85910 commit d34fe02
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 53 deletions.
112 changes: 74 additions & 38 deletions spanner/spansql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2081,39 +2081,15 @@ func (p *parser) parseCreateChangeStream() (*CreateChangeStream, *parseError) {
return nil, err
}

if err := p.expect("FOR"); err != nil {
return nil, err
}

cs := &CreateChangeStream{Name: csname, Position: pos}

if p.eat("ALL") {
cs.WatchAllTables = true
} else {
for {
tname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}
pos := p.Pos()
wd := WatchDef{Table: tname, Position: pos}

if p.sniff("(") {
columns, err := p.parseColumnNameList()
if err != nil {
return nil, err
}
wd.Columns = columns
} else {
wd.WatchAllCols = true
}

cs.Watch = append(cs.Watch, wd)
if p.eat(",") {
continue
}
break
if p.sniff("FOR") {
watch, watchAllTables, err := p.parseChangeStreamWatches()
if err != nil {
return nil, err
}
cs.Watch = watch
cs.WatchAllTables = watchAllTables
}

if p.sniff("OPTIONS") {
Expand Down Expand Up @@ -2145,19 +2121,79 @@ func (p *parser) parseAlterChangeStream() (*AlterChangeStream, *parseError) {
}

acs := &AlterChangeStream{Name: csname, Position: pos}
if err := p.expect("SET"); err != nil {
return nil, err

tok := p.next()
if tok.err != nil {
return nil, tok.err
}
// TODO: Support for altering watch
if p.sniff("OPTIONS") {
options, err := p.parseChangeStreamOptions()
if err != nil {
switch {
default:
return nil, p.errorf("got %q, expected SET or DROP", tok.value)
case tok.caseEqual("SET"):
if p.sniff("OPTIONS") {
options, err := p.parseChangeStreamOptions()
if err != nil {
return nil, err
}
acs.Alteration = AlterChangeStreamOptions{Options: options}
return acs, nil
}
if p.sniff("FOR") {
watch, watchAllTables, err := p.parseChangeStreamWatches()
if err != nil {
return nil, err
}
acs.Alteration = AlterWatch{Watch: watch, WatchAllTables: watchAllTables}
return acs, nil
}
return nil, p.errorf("got %q, expected FOR or OPTIONS", p.next())
case tok.caseEqual("DROP"):
if err := p.expect("FOR", "ALL"); err != nil {
return nil, err
}
acs.Alteration = AlterChangeStreamOptions{Options: options}
acs.Alteration = DropChangeStreamWatch{}
return acs, nil
}
return nil, p.errorf("got %q, expected OPTIONS", p.next())
}

func (p *parser) parseChangeStreamWatches() ([]WatchDef, bool, *parseError) {
debugf("parseChangeStreamWatches: %v", p)

if err := p.expect("FOR"); err != nil {
return nil, false, err
}

if p.eat("ALL") {
return nil, true, nil
}

watchDefs := []WatchDef{}
for {
tname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, false, err
}
pos := p.Pos()
wd := WatchDef{Table: tname, Position: pos}

if p.sniff("(") {
columns, err := p.parseColumnNameList()
if err != nil {
return nil, false, err
}
wd.Columns = columns
} else {
wd.WatchAllCols = true
}

watchDefs = append(watchDefs, wd)
if p.eat(",") {
continue
}
break
}

return watchDefs, false, nil
}

func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) {
Expand Down
100 changes: 100 additions & 0 deletions spanner/spansql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,106 @@ func TestParseDDL(t *testing.T) {
},
},
},
{
`CREATE CHANGE STREAM csname;
CREATE CHANGE STREAM csname FOR ALL;
CREATE CHANGE STREAM csname FOR tname, tname2(cname);
CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period = '36h', value_capture_type = 'NEW_VALUES');`,
&DDL{
Filename: "filename",
List: []DDLStmt{
&CreateChangeStream{
Name: "csname",
Position: line(1),
},
&CreateChangeStream{
Name: "csname",
WatchAllTables: true,
Position: line(2),
},
&CreateChangeStream{
Name: "csname",
Watch: []WatchDef{
{Table: "tname", WatchAllCols: true, Position: line(3)},
{Table: "tname2", Columns: []ID{ID("cname")}, Position: line(3)},
},
Position: line(3),
},
&CreateChangeStream{
Name: "csname",
WatchAllTables: true,
Position: line(4),
Options: ChangeStreamOptions{
RetentionPeriod: func(b string) *string { return &b }("36h"),
ValueCaptureType: func(b string) *string { return &b }("NEW_VALUES"),
},
},
},
},
},
{
`ALTER CHANGE STREAM csname SET FOR ALL;
ALTER CHANGE STREAM csname SET FOR tname, tname2(cname);
ALTER CHANGE STREAM csname DROP FOR ALL;
ALTER CHANGE STREAM csname SET OPTIONS (retention_period = '36h', value_capture_type = 'NEW_VALUES');`,
&DDL{
Filename: "filename",
List: []DDLStmt{
&AlterChangeStream{
Name: "csname",
Alteration: AlterWatch{
WatchAllTables: true,
},
Position: line(1),
},
&AlterChangeStream{
Name: "csname",
Alteration: AlterWatch{
Watch: []WatchDef{
{
Table: "tname",
WatchAllCols: true,
Position: Position{Line: 2, Offset: 78},
},
{
Table: "tname2",
Columns: []ID{"cname"},
Position: Position{Line: 2, Offset: 85},
},
},
},
Position: line(2),
},
&AlterChangeStream{
Name: "csname",
Alteration: DropChangeStreamWatch{},
Position: line(3),
},
&AlterChangeStream{
Name: "csname",
Alteration: AlterChangeStreamOptions{
Options: ChangeStreamOptions{
RetentionPeriod: func(b string) *string { return &b }("36h"),
ValueCaptureType: func(b string) *string { return &b }("NEW_VALUES"),
},
},
Position: line(4),
},
},
},
},
{
`DROP CHANGE STREAM csname`,
&DDL{
Filename: "filename",
List: []DDLStmt{
&DropChangeStream{
Name: "csname",
Position: line(1),
},
},
},
},
}
for _, test := range tests {
got, err := ParseDDL("filename", test.in)
Expand Down
49 changes: 36 additions & 13 deletions spanner/spansql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,7 @@ func (cs CreateChangeStream) SQL() string {
if i > 0 {
str += ", "
}
str += table.Table.SQL()
if !table.WatchAllCols {
str += "("
for i, c := range table.Columns {
if i > 0 {
str += ", "
}
str += c.SQL()
}
str += ")"
}
str += table.SQL()
}
}
if cs.Options != (ChangeStreamOptions{}) {
Expand All @@ -126,6 +116,21 @@ func (cs CreateChangeStream) SQL() string {
return str
}

func (w WatchDef) SQL() string {
str := w.Table.SQL()
if !w.WatchAllCols {
str += "("
for i, c := range w.Columns {
if i > 0 {
str += ", "
}
str += c.SQL()
}
str += ")"
}
return str
}

func (dt DropTable) SQL() string {
return "DROP TABLE " + dt.Name.SQL()
}
Expand All @@ -143,11 +148,29 @@ func (dc DropChangeStream) SQL() string {
}

func (acs AlterChangeStream) SQL() string {
return "ALTER CHANGE STREAM " + acs.Name.SQL() + " SET " + acs.Alteration.SQL()
return "ALTER CHANGE STREAM " + acs.Name.SQL() + " " + acs.Alteration.SQL()
}

func (scsw AlterWatch) SQL() string {
str := "SET FOR "
if scsw.WatchAllTables {
return str + "ALL"
}
for i, table := range scsw.Watch {
if i > 0 {
str += ", "
}
str += table.SQL()
}
return str
}

func (ao AlterChangeStreamOptions) SQL() string {
return ao.Options.SQL()
return "SET " + ao.Options.SQL()
}

func (dcsw DropChangeStreamWatch) SQL() string {
return "DROP FOR ALL"
}

func (cso ChangeStreamOptions) SQL() string {
Expand Down
57 changes: 56 additions & 1 deletion spanner/spansql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,26 @@ func TestSQL(t *testing.T) {
"ALTER DATABASE dbname SET OPTIONS (optimizer_version=null, optimizer_statistics_package=null, version_retention_period=null, enable_key_visualizer=null, default_leader=null)",
reparseDDL,
},
{
&CreateChangeStream{
Name: "csname",
Watch: []WatchDef{
{Table: "Ta", WatchAllCols: true, Position: line(1)},
{Table: "Tsub", Columns: []ID{ID("Hash")}, Position: line(1)},
},
Position: line(1),
},
"CREATE CHANGE STREAM csname FOR Ta, Tsub(`Hash`)",
reparseDDL,
},
{
&DropChangeStream{
Name: "csname",
Position: line(1),
},
"DROP CHANGE STREAM csname",
reparseDDL,
},
{
&CreateChangeStream{
Name: "csname",
Expand All @@ -440,17 +460,52 @@ func TestSQL(t *testing.T) {
"CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')",
reparseDDL,
},
{
&AlterChangeStream{
Name: "csname",
Alteration: AlterWatch{
WatchAllTables: true,
},
Position: line(1),
},
"ALTER CHANGE STREAM csname SET FOR ALL",
reparseDDL,
},
{
&AlterChangeStream{
Name: "csname",
Alteration: AlterWatch{
Watch: []WatchDef{
{Table: "Ta", WatchAllCols: true, Position: Position{Line: 1, Offset: 35}},
{Table: "Tsub", Columns: []ID{ID("Hash")}, Position: Position{Line: 1, Offset: 39}},
},
},
Position: line(1),
},
"ALTER CHANGE STREAM csname SET FOR Ta, Tsub(`Hash`)",
reparseDDL,
},
{
&AlterChangeStream{
Name: "csname",
Alteration: AlterChangeStreamOptions{
Options: ChangeStreamOptions{
RetentionPeriod: func(s string) *string { return &s }("7d"),
ValueCaptureType: func(s string) *string { return &s }("NEW_VALUES"),
},
},
Position: line(1),
},
"ALTER CHANGE STREAM csname SET OPTIONS (value_capture_type='NEW_VALUES')",
"ALTER CHANGE STREAM csname SET OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')",
reparseDDL,
},
{
&AlterChangeStream{
Name: "csname",
Alteration: DropChangeStreamWatch{},
Position: line(1),
},
"ALTER CHANGE STREAM csname DROP FOR ALL",
reparseDDL,
},
{
Expand Down
Loading

0 comments on commit d34fe02

Please sign in to comment.