Skip to content

Commit

Permalink
feat(spanner/spansql): add support for change stream value_capture_ty…
Browse files Browse the repository at this point in the history
…pe option (#7201)

Co-authored-by: rahul2393 <irahul@google.com>
  • Loading branch information
maxmzkr and rahul2393 committed Jan 11, 2023
1 parent c2f7b34 commit 27b3398
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 25 deletions.
8 changes: 4 additions & 4 deletions spanner/spannertest/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,26 +731,26 @@ func TestAddBackQuoteForHypen(t *testing.T) {

func TestCreateAndManageChangeStream(t *testing.T) {
// Testing Create Change Stream
ddl, err := spansql.ParseDDL("filename", "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period = '36h' )")
ddl, err := spansql.ParseDDL("filename", "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS (retention_period = '36h')")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got := ddl.List[0].SQL()
want := "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period='36h' )"
want := "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS (retention_period='36h')"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}

// Testing Alter Change Stream Options
ddl, err = spansql.ParseDDL("filename", "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period = '20h' )")
ddl, err = spansql.ParseDDL("filename", "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS (retention_period = '20h')")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got = ddl.List[0].SQL()
want = "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period='20h' )"
want = "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS (retention_period='20h')"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
Expand Down
49 changes: 34 additions & 15 deletions spanner/spansql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,24 +2156,43 @@ func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) {
}

var cso ChangeStreamOptions
if p.eat("retention_period", "=") {
tok := p.next()
if tok.err != nil {
return ChangeStreamOptions{}, tok.err
}
retentionPeriod := new(string)
if tok.value == "null" {
*retentionPeriod = ""
} else {
for {
if p.eat("retention_period", "=") {
tok := p.next()
if tok.err != nil {
return ChangeStreamOptions{}, tok.err
}
retentionPeriod := new(string)
if tok.value == "null" {
*retentionPeriod = ""
} else {
if tok.typ != stringToken {
return ChangeStreamOptions{}, p.errorf("invalid retention_period: %v", tok.value)
}
*retentionPeriod = tok.string
}
cso.RetentionPeriod = retentionPeriod
} else if p.eat("value_capture_type", "=") {
tok := p.next()
if tok.err != nil {
return ChangeStreamOptions{}, tok.err
}
valueCaptureType := new(string)
if tok.typ != stringToken {
return ChangeStreamOptions{}, p.errorf("invalid retention_period: %v", tok.value)
return ChangeStreamOptions{}, p.errorf("invalid value_capture_type: %v", tok.value)
}
*retentionPeriod = tok.string
*valueCaptureType = tok.string
cso.ValueCaptureType = valueCaptureType
} else {
tok := p.next()
return ChangeStreamOptions{}, p.errorf("unknown change stream option: %v", tok.value)
}
if p.sniff(")") {
break
}
if !p.eat(",") {
return ChangeStreamOptions{}, p.errorf("missing ',' in options list")
}
cso.RetentionPeriod = retentionPeriod
} else {
tok := p.next()
return ChangeStreamOptions{}, p.errorf("unknown change stream option: %v", tok.value)
}

if err := p.expect(")"); err != nil {
Expand Down
26 changes: 21 additions & 5 deletions spanner/spansql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,8 @@ func (cs CreateChangeStream) SQL() string {
}
}
}
if cs.Options.RetentionPeriod != nil {
str += " OPTIONS( "
str += fmt.Sprintf("retention_period='%s'", *cs.Options.RetentionPeriod)
str += " )"
if cs.Options != (ChangeStreamOptions{}) {
str += " " + cs.Options.SQL()
}

return str
Expand All @@ -149,7 +147,25 @@ func (acs AlterChangeStream) SQL() string {
}

func (ao AlterChangeStreamOptions) SQL() string {
return "OPTIONS( " + fmt.Sprintf("retention_period='%s'", *ao.Options.RetentionPeriod) + " )"
return ao.Options.SQL()
}

func (cso ChangeStreamOptions) SQL() string {
str := "OPTIONS ("
hasOpt := false
if cso.RetentionPeriod != nil {
hasOpt = true
str += fmt.Sprintf("retention_period='%s'", *cso.RetentionPeriod)
}
if cso.ValueCaptureType != nil {
if hasOpt {
str += ", "
}
hasOpt = true
str += fmt.Sprintf("value_capture_type='%s'", *cso.ValueCaptureType)
}
str += ")"
return str
}

func (at AlterTable) SQL() string {
Expand Down
38 changes: 38 additions & 0 deletions spanner/spansql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,44 @@ func TestSQL(t *testing.T) {
"ALTER DATABASE dbname SET OPTIONS (optimizer_version=null, version_retention_period=null, enable_key_visualizer=null, default_leader=null)",
reparseDDL,
},
{
&CreateChangeStream{
Name: "csname",
WatchAllTables: true,
Options: ChangeStreamOptions{
ValueCaptureType: func(s string) *string { return &s }("NEW_VALUES"),
},
Position: line(1),
},
"CREATE CHANGE STREAM csname FOR ALL OPTIONS (value_capture_type='NEW_VALUES')",
reparseDDL,
},
{
&CreateChangeStream{
Name: "csname",
WatchAllTables: true,
Options: ChangeStreamOptions{
RetentionPeriod: func(s string) *string { return &s }("7d"),
ValueCaptureType: func(s string) *string { return &s }("NEW_VALUES"),
},
Position: line(1),
},
"CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')",
reparseDDL,
},
{
&AlterChangeStream{
Name: "csname",
Alteration: AlterChangeStreamOptions{
Options: ChangeStreamOptions{
ValueCaptureType: func(s string) *string { return &s }("NEW_VALUES"),
},
},
Position: line(1),
},
"ALTER CHANGE STREAM csname SET OPTIONS (value_capture_type='NEW_VALUES')",
reparseDDL,
},
{
&Insert{
Table: "Singers",
Expand Down
3 changes: 2 additions & 1 deletion spanner/spansql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,5 +1114,6 @@ func (wd WatchDef) Pos() Position { return wd.Position }
func (wd *WatchDef) clearOffset() { wd.Position.Offset = 0 }

type ChangeStreamOptions struct {
RetentionPeriod *string
RetentionPeriod *string
ValueCaptureType *string
}

0 comments on commit 27b3398

Please sign in to comment.