Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner/spannertest): add support for adding and dropping Foreign Keys #6608

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions spanner/spannertest/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ type table struct {

// Information about the table columns.
// They are reordered on table creation so the primary key columns come first.
cols []colInfo
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
rdw *spansql.RowDeletionPolicy // RowDeletionPolicy of this table (may be nil)
cols []colInfo
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
constraints []constraintInfo // constraints information of this table
rdw *spansql.RowDeletionPolicy // RowDeletionPolicy of this table (may be nil)

// Rows are stored in primary key order.
rows []row
Expand All @@ -76,6 +77,12 @@ type colInfo struct {
Alias spansql.PathExp // an alternate name for this column (result sets only)
}

// constraintInfo represents information about a constraint in a table
type constraintInfo struct {
Name spansql.ID
Constraint spansql.Constraint
}

// commitTimestampSentinel is a sentinel value for TIMESTAMP fields with allow_commit_timestamp=true.
// It is accepted, but never stored.
var commitTimestampSentinel = &struct{}{}
Expand Down Expand Up @@ -303,6 +310,11 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return status.Newf(codes.InvalidArgument, "primary key column %q not in table", col)
}
}
for _, constraint := range stmt.Constraints {
if st := t.addConstraint(constraint); st.Code() != codes.OK {
return st
}
}
t.rdw = stmt.RowDeletionPolicy
d.tables[stmt.Name] = t
return nil
Expand Down Expand Up @@ -377,6 +389,17 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return st
}
return nil
case spansql.AddConstraint:
// We do not validate if the referenced table and column exists.
if st := t.addConstraint(alt.Constraint); st.Code() != codes.OK {
return st
}
return nil
case spansql.DropConstraint:
if st := t.dropConstraint(alt); st.Code() != codes.OK {
return st
}
return nil
}
}

Expand Down Expand Up @@ -736,6 +759,24 @@ func (t *table) addColumn(cd spansql.ColumnDef, newTable bool) *status.Status {
return nil
}

func (t *table) addConstraint(alt spansql.TableConstraint) *status.Status {
t.mu.Lock()
defer t.mu.Unlock()

for _, constraint := range t.constraints {
if constraint.Name == alt.Name {
return status.Newf(codes.AlreadyExists, "constraint name %s already exists", alt.Name)
}
}

t.constraints = append(t.constraints, constraintInfo{
Name: alt.Name,
Constraint: alt.Constraint,
})

return nil
}

func (t *table) dropColumn(name spansql.ID) *status.Status {
// Only permit dropping non-key columns that aren't part of a secondary index.
// We don't support indexes, so only check that it isn't part of the primary key.
Expand Down Expand Up @@ -881,6 +922,25 @@ func (t *table) dropRowDeletionPolicy(ard spansql.DropRowDeletionPolicy) *status
return nil
}

func (t *table) dropConstraint(alt spansql.DropConstraint) *status.Status {
t.mu.Lock()
defer t.mu.Unlock()

var ci int = -1 // index of the constraint in t.constraints: constraint index
for i, constraint := range t.constraints {
if constraint.Name == alt.Name {
ci = i
}
}

if ci == -1 {
return status.Newf(codes.InvalidArgument, "unknown constraint name %q", alt.Name)
}

t.constraints = append(t.constraints[:ci], t.constraints[ci+1:]...)
return nil
}

func (t *table) insertRow(rowNum int, r row) {
t.rows = append(t.rows, nil)
copy(t.rows[rowNum+1:], t.rows[rowNum:])
Expand Down
43 changes: 43 additions & 0 deletions spanner/spannertest/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,46 @@ func TestKeyRange(t *testing.T) {
}
}
}

func TestForeignKeyAddAndAlterConstraint(t *testing.T) {
sql := `CREATE TABLE Orders (
OrderID INT64 NOT NULL,
CustomerID INT64 NOT NULL,
Quantity INT64 NOT NULL,
ProductID INT64 NOT NULL,
CONSTRAINT FK_CustomerOrder FOREIGN KEY (CustomerID) REFERENCES Customers (CustomerID)
) PRIMARY KEY (OrderID);
ALTER TABLE Orders DROP CONSTRAINT FK_CustomerOrder;`
var db database

ddl, err := spansql.ParseDDL("filename", sql)
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}
for _, stmt := range ddl.List {
if st := db.ApplyDDL(stmt); st.Code() != codes.OK {
t.Fatalf("ApplyDDL failed: %v", st)
}
}

altersql := `CREATE TABLE Orders (
OrderID INT64 NOT NULL,
CustomerID INT64 NOT NULL,
Quantity INT64 NOT NULL,
ProductID INT64 NOT NULL,
CONSTRAINT FK_ProductOrder FOREIGN KEY (ProductID) REFERENCES Product (ProductID)
) PRIMARY KEY (OrderID);
ALTER TABLE Orders ADD CONSTRAINT FK_CustomerOrder FOREIGN KEY (CustomerID) REFERENCES Customers (CustomerID);
ALTER TABLE Orders DROP CONSTRAINT FK_ProductOrder;`
var db1 database

ddl1, err1 := spansql.ParseDDL("filename", altersql)
if err1 != nil {
t.Fatalf("%s: Bad DDL", err1)
}
for _, stmt := range ddl1.List {
if st := db1.ApplyDDL(stmt); st.Code() != codes.OK {
t.Fatalf("ApplyDDL failed: %v", st)
}
}
}