Skip to content

Commit

Permalink
feat(bigquery): support for tables primary and foreign keys (#8055)
Browse files Browse the repository at this point in the history
Add support for creating/updating tables with primary/foreign keys on BigQuery. 

Rollout not fully complete, so is not gonna pass integration tests.

Towards internal b/285349974
  • Loading branch information
alvarowolfx committed Jun 21, 2023
1 parent 42a59f5 commit 93d6a1a
Show file tree
Hide file tree
Showing 3 changed files with 396 additions and 0 deletions.
138 changes: 138 additions & 0 deletions bigquery/table.go
Expand Up @@ -147,6 +147,106 @@ type TableMetadata struct {
// - '': empty string. Default to case-sensitive behavior.
// More information: https://cloud.google.com/bigquery/docs/reference/standard-sql/collation-concepts
DefaultCollation string

// TableConstraints contains table primary and foreign keys constraints.
// Present only if the table has primary or foreign keys.
TableConstraints *TableConstraints
}

// TableConstraints defines the primary key and foreign key of a table.
type TableConstraints struct {
// PrimaryKey constraint on a table's columns.
// Present only if the table has a primary key.
// The primary key is not enforced.
PrimaryKey *PrimaryKey

// ForeignKeys represent a list of foreign keys constraints.
// Foreign keys are not enforced.
ForeignKeys []*ForeignKey
}

// PrimaryKey represents the primary key constraint on a table's columns.
type PrimaryKey struct {
// Columns that compose the primary key constraint.
Columns []string
}

func (pk *PrimaryKey) toBQ() *bq.TableConstraintsPrimaryKey {
return &bq.TableConstraintsPrimaryKey{
Columns: pk.Columns,
}
}

func bqToPrimaryKey(tc *bq.TableConstraints) *PrimaryKey {
if tc.PrimaryKey == nil {
return nil
}
return &PrimaryKey{
Columns: tc.PrimaryKey.Columns,
}
}

// ForeignKey represents a foreign key constraint on a table's columns.
type ForeignKey struct {
// Foreign key constraint name.
Name string

// Table that holds the primary key and is referenced by this foreign key.
ReferencedTable *Table

// Columns that compose the foreign key.
ColumnReferences []*ColumnReference
}

func (fk *ForeignKey) toBQ() *bq.TableConstraintsForeignKeys {
colRefs := []*bq.TableConstraintsForeignKeysColumnReferences{}
for _, colRef := range fk.ColumnReferences {
colRefs = append(colRefs, colRef.toBQ())
}
return &bq.TableConstraintsForeignKeys{
Name: fk.Name,
ReferencedTable: &bq.TableConstraintsForeignKeysReferencedTable{
DatasetId: fk.ReferencedTable.DatasetID,
ProjectId: fk.ReferencedTable.ProjectID,
TableId: fk.ReferencedTable.TableID,
},
ColumnReferences: colRefs,
}
}

func bqToForeignKeys(tc *bq.TableConstraints, c *Client) []*ForeignKey {
fks := []*ForeignKey{}
for _, fk := range tc.ForeignKeys {
colRefs := []*ColumnReference{}
for _, colRef := range fk.ColumnReferences {
colRefs = append(colRefs, &ColumnReference{
ReferencedColumn: colRef.ReferencedColumn,
ReferencingColumn: colRef.ReferencingColumn,
})
}
fks = append(fks, &ForeignKey{
Name: fk.Name,
ReferencedTable: c.DatasetInProject(fk.ReferencedTable.DatasetId, fk.ReferencedTable.ProjectId).Table(fk.ReferencedTable.TableId),
ColumnReferences: colRefs,
})
}
return fks
}

// ColumnReference represents the pair of the foreign key column and primary key column.
type ColumnReference struct {
// ReferencingColumn is the column in the current table that composes the foreign key.
ReferencingColumn string
// ReferencedColumn is the column in the primary key of the foreign table that
// is referenced by the ReferencingColumn.
ReferencedColumn string
}

func (colRef *ColumnReference) toBQ() *bq.TableConstraintsForeignKeysColumnReferences {
return &bq.TableConstraintsForeignKeysColumnReferences{
ReferencedColumn: colRef.ReferencedColumn,
ReferencingColumn: colRef.ReferencingColumn,
}
}

// TableCreateDisposition specifies the circumstances under which destination table will be created.
Expand Down Expand Up @@ -675,6 +775,19 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) {
return nil, errors.New("cannot set ETag on create")
}
t.DefaultCollation = string(tm.DefaultCollation)

if tm.TableConstraints != nil {
t.TableConstraints = &bq.TableConstraints{}
if tm.TableConstraints.PrimaryKey != nil {
t.TableConstraints.PrimaryKey = tm.TableConstraints.PrimaryKey.toBQ()
}
if len(tm.TableConstraints.ForeignKeys) > 0 {
t.TableConstraints.ForeignKeys = make([]*bq.TableConstraintsForeignKeys, len(tm.TableConstraints.ForeignKeys))
for i, fk := range tm.TableConstraints.ForeignKeys {
t.TableConstraints.ForeignKeys[i] = fk.toBQ()
}
}
}
return t, nil
}

Expand Down Expand Up @@ -788,6 +901,12 @@ func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) {
}
md.ExternalDataConfig = edc
}
if t.TableConstraints != nil {
md.TableConstraints = &TableConstraints{
PrimaryKey: bqToPrimaryKey(t.TableConstraints),
ForeignKeys: bqToForeignKeys(t.TableConstraints, c),
}
}
return md, nil
}

Expand Down Expand Up @@ -947,6 +1066,21 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
t.DefaultCollation = optional.ToString(tm.DefaultCollation)
forceSend("DefaultCollation")
}
if tm.TableConstraints != nil {
t.TableConstraints = &bq.TableConstraints{}
if tm.TableConstraints.PrimaryKey != nil {
t.TableConstraints.PrimaryKey = tm.TableConstraints.PrimaryKey.toBQ()
t.TableConstraints.PrimaryKey.ForceSendFields = append(t.TableConstraints.PrimaryKey.ForceSendFields, "Columns")
t.TableConstraints.ForceSendFields = append(t.TableConstraints.ForceSendFields, "PrimaryKey")
}
if tm.TableConstraints.ForeignKeys != nil {
t.TableConstraints.ForeignKeys = make([]*bq.TableConstraintsForeignKeys, len(tm.TableConstraints.ForeignKeys))
for i, fk := range tm.TableConstraints.ForeignKeys {
t.TableConstraints.ForeignKeys[i] = fk.toBQ()
}
t.TableConstraints.ForceSendFields = append(t.TableConstraints.ForceSendFields, "ForeignKeys")
}
}
labels, forces, nulls := tm.update()
t.Labels = labels
t.ForceSendFields = append(t.ForceSendFields, forces...)
Expand Down Expand Up @@ -1024,6 +1158,10 @@ type TableMetadataToUpdate struct {
// in the table.
DefaultCollation optional.String

// TableConstraints allows modification of table constraints
// such as primary and foreign keys.
TableConstraints *TableConstraints

labelUpdater
}

Expand Down
186 changes: 186 additions & 0 deletions bigquery/table_integration_test.go
Expand Up @@ -661,3 +661,189 @@ func TestIntegration_TableDefaultCollation(t *testing.T) {
}
}
}

func TestIntegration_TableConstraintsPK(t *testing.T) {
// Test Primary Keys for Table.Create and Table.Update
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := dataset.Table(tableIDs.New())
err := table.Create(context.Background(), &TableMetadata{
Schema: schema,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"name"},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer table.Delete(ctx)
md, err := table.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if md.TableConstraints.PrimaryKey.Columns[0] != "name" {
t.Fatalf("expected table primary key to contain column `name`, but found %q", md.TableConstraints.PrimaryKey.Columns)
}

md, err = table.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{}, // clean primary keys
},
}, "")
if err != nil {
t.Fatal(err)
}
if md.TableConstraints != nil {
t.Fatalf("expected table primary keys to be removed, but found %v", md.TableConstraints.PrimaryKey)
}

tableNoPK := dataset.Table(tableIDs.New())
err = tableNoPK.Create(context.Background(), &TableMetadata{
Schema: schema,
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableNoPK.Delete(ctx)
md, err = tableNoPK.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if md.TableConstraints != nil {
t.Fatalf("expected table to not have a PK, but found %v", md.TableConstraints.PrimaryKey.Columns)
}

md, err = tableNoPK.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"name"},
},
},
}, "")
if err != nil {
t.Fatal(err)
}
if md.TableConstraints.PrimaryKey == nil || md.TableConstraints.PrimaryKey.Columns[0] != "name" {
t.Fatalf("expected table primary key to contain column `name`, but found %v", md.TableConstraints.PrimaryKey)
}
}

func TestIntegration_TableConstraintsFK(t *testing.T) {
// Test Foreign keys for Table.Create and Table.Update
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
tableA := dataset.Table(tableIDs.New())
schemaA := []*FieldSchema{
{Name: "id", Type: IntegerFieldType},
{Name: "name", Type: StringFieldType},
}
err := tableA.Create(context.Background(), &TableMetadata{
Schema: schemaA,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableA.Delete(ctx)

tableB := dataset.Table(tableIDs.New())
schemaB := []*FieldSchema{
{Name: "id", Type: IntegerFieldType},
{Name: "name", Type: StringFieldType},
{Name: "parent", Type: IntegerFieldType},
}
err = tableB.Create(context.Background(), &TableMetadata{
Schema: schemaB,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
ForeignKeys: []*ForeignKey{
{
Name: "table_a_fk",
ReferencedTable: tableA,
ColumnReferences: []*ColumnReference{
{
ReferencingColumn: "parent",
ReferencedColumn: "id",
},
},
},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableB.Delete(ctx)
md, err := tableB.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if len(md.TableConstraints.ForeignKeys) >= 0 && md.TableConstraints.ForeignKeys[0].Name != "table_a_fk" {
t.Fatalf("expected table to contains fk `table_a_fk`, but found %v", md.TableConstraints.ForeignKeys)
}

md, err = tableB.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
ForeignKeys: []*ForeignKey{}, // clean foreign keys
},
}, "")
if err != nil {
t.Fatal(err)
}
if len(md.TableConstraints.ForeignKeys) > 0 {
t.Fatalf("expected table foreign keys to be removed, but found %v", md.TableConstraints.ForeignKeys)
}

tableNoFK := dataset.Table(tableIDs.New())
err = tableNoFK.Create(context.Background(), &TableMetadata{
Schema: schemaB,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableNoFK.Delete(ctx)
md, err = tableNoFK.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
ForeignKeys: []*ForeignKey{
{
Name: "table_a_fk",
ReferencedTable: tableA,
ColumnReferences: []*ColumnReference{
{
ReferencedColumn: "id",
ReferencingColumn: "parent",
},
},
},
},
},
}, "")
if err != nil {
t.Fatal(err)
}
if len(md.TableConstraints.ForeignKeys) == 0 || md.TableConstraints.ForeignKeys[0].Name != "table_a_fk" {
t.Fatalf("expected table to contains fk `table_a_fk`, but found %v", md.TableConstraints.ForeignKeys)
}
}

0 comments on commit 93d6a1a

Please sign in to comment.