Skip to content

Commit

Permalink
Merge pull request #18 from Recidiviz/dan/candidate-duplicate-dataset
Browse files Browse the repository at this point in the history
Return duplicate error when creating a duplicate dataset; return resourceInUse error when deleting dataset
  • Loading branch information
ohaibbq authored Jun 8, 2024
2 parents 609dd3b + 6799d23 commit af0d0ea
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 12 deletions.
4 changes: 2 additions & 2 deletions internal/metadata/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (d *Dataset) Insert(ctx context.Context, tx *sql.Tx) error {
return d.repo.AddDataset(ctx, tx, d)
}

func (d *Dataset) Delete(ctx context.Context, tx *sql.Tx) error {
func (d *Dataset) Delete(ctx context.Context, tx *sql.Tx, inUseOk bool) error {
d.mu.Lock()
defer d.mu.Unlock()
return d.repo.DeleteDataset(ctx, tx, d)
return d.repo.DeleteDataset(ctx, tx, d, inUseOk)
}

func (d *Dataset) DeleteModel(ctx context.Context, tx *sql.Tx, id string) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/metadata/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (p *Project) DeleteDataset(ctx context.Context, tx *sql.Tx, id string) erro
}
p.mu.Lock()
defer p.mu.Unlock()
if err := dataset.Delete(ctx, tx); err != nil {
if err := dataset.Delete(ctx, tx, false); err != nil {
return err
}
return nil
Expand Down
42 changes: 41 additions & 1 deletion internal/metadata/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"fmt"
"github.com/goccy/bigquery-emulator/internal"
"github.com/goccy/go-zetasqlite"
"github.com/mattn/go-sqlite3"
bigqueryv2 "google.golang.org/api/bigquery/v2"

internaltypes "github.com/goccy/bigquery-emulator/internal/types"
"github.com/goccy/bigquery-emulator/types"
)

var ErrDuplicatedDataset = errors.New("dataset is already created")
var ErrDatasetInUse = errors.New("dataset is in use, empty the dataset before deleting it")

var schemata = []string{
`
CREATE TABLE IF NOT EXISTS projects (
Expand Down Expand Up @@ -77,6 +81,7 @@ const (
StmtFindTable internal.Statement = `SELECT id, metadata FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id = @tableID`
StmtUpdateTable internal.Statement = `UPDATE tables SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id`
StmtTableExists internal.Statement = `SELECT TRUE FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id = @tableID`
StmtTablesExistInDataset internal.Statement = `SELECT TRUE FROM tables WHERE projectID = @projectID AND datasetID = @datasetID LIMIT 1`
StmtFindTablesInDataset internal.Statement = `SELECT id, datasetID, metadata FROM tables WHERE projectID = @projectID AND datasetID = @datasetID`
StmtFindModelsInDataset internal.Statement = `SELECT id, datasetID, metadata FROM models WHERE projectID = @projectID AND datasetID = @datasetID`
StmtUpdateModel internal.Statement = `UPDATE models SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id`
Expand Down Expand Up @@ -108,6 +113,7 @@ var preparedStatements = []internal.Statement{
StmtFindTable,
StmtUpdateTable,
StmtTableExists,
StmtTablesExistInDataset,
StmtFindTablesInDataset,
StmtFindModelsInDataset,
StmtFindRoutinesInDataset,
Expand Down Expand Up @@ -699,6 +705,12 @@ func (r *Repository) AddDataset(ctx context.Context, tx *sql.Tx, dataset *Datase
sql.Named("projectID", dataset.ProjectID),
sql.Named("metadata", string(metadata)),
); err != nil {
var sqliteError sqlite3.Error
if errors.As(errors.Unwrap(err), &sqliteError) {
if sqliteError.Code == sqlite3.ErrConstraint {
return fmt.Errorf("dataset %s: %w", dataset.ID, ErrDuplicatedDataset)
}
}
return err
}
return nil
Expand All @@ -724,7 +736,35 @@ func (r *Repository) UpdateDataset(ctx context.Context, tx *sql.Tx, dataset *Dat
return nil
}

func (r *Repository) DeleteDataset(ctx context.Context, tx *sql.Tx, dataset *Dataset) error {
func (r *Repository) TablesExistInDataset(ctx context.Context, tx *sql.Tx, dataset *Dataset) (bool, error) {
stmt, err := r.queries.Get(ctx, tx, StmtTablesExistInDataset)
if err != nil {
return false, err
}
var result bool
err = stmt.QueryRowContext(
ctx,
sql.Named("projectID", dataset.ProjectID),
sql.Named("datasetID", dataset.ID),
).Scan(&result)

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
return result, nil
}

func (r *Repository) DeleteDataset(ctx context.Context, tx *sql.Tx, dataset *Dataset, inUseOk bool) error {
inUse, err := r.TablesExistInDataset(ctx, tx, dataset)
if err != nil {
return err
}
if inUse && !inUseOk {
return fmt.Errorf("dataset %s: %w", dataset.ID, ErrDatasetInUse)
}
stmt, err := r.queries.Get(ctx, tx, StmtDeleteDataset)
if err != nil {
return err
Expand Down
13 changes: 12 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,11 @@ func (h *datasetsDeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
dataset: dataset,
deleteContents: isDeleteContents(r),
}); err != nil {
if errors.Is(err, metadata.ErrDatasetInUse) {
errorResponse(ctx, w, errResourceInUse(err.Error()))
return
}

errorResponse(ctx, w, errInternalError(err.Error()))
return
}
Expand All @@ -645,7 +650,7 @@ func (h *datasetsDeleteHandler) Handle(ctx context.Context, r *datasetsDeleteReq
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.RollbackIfNotCommitted()
if err := r.dataset.Delete(ctx, tx.Tx()); err != nil {
if err := r.dataset.Delete(ctx, tx.Tx(), r.deleteContents); err != nil {
return fmt.Errorf("failed to delete dataset: %w", err)
}
if r.deleteContents {
Expand Down Expand Up @@ -717,7 +722,13 @@ func (h *datasetsInsertHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
project: project,
dataset: &dataset,
})

if err != nil {
if errors.Is(err, metadata.ErrDuplicatedDataset) {
errorResponse(ctx, w, errDuplicate(err.Error()))
return
}

errorResponse(ctx, w, errInternalError(err.Error()))
return
}
Expand Down
106 changes: 99 additions & 7 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,30 @@ import (
"google.golang.org/api/option"
)

func buildClient(ctx context.Context, project *types.Project, server *server.TestServer) (*bigquery.Client, error) {
client, err := bigquery.NewClient(
ctx,
project.ID,
option.WithEndpoint(server.URL),
option.WithoutAuthentication(),
)

if err != nil {
return nil, err
}
return client, nil

}

func TestSimpleQuery(t *testing.T) {
ctx := context.Background()

bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
if err := bqServer.Load(server.StructSource(types.NewProject("test"))); err != nil {
project := types.NewProject("test")
if err := bqServer.Load(server.StructSource(project)); err != nil {
t.Fatal(err)
}
testServer := bqServer.TestServer()
Expand All @@ -46,12 +62,7 @@ func TestSimpleQuery(t *testing.T) {
bqServer.Stop(ctx)
}()

client, err := bigquery.NewClient(
ctx,
"test",
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
client, err := buildClient(ctx, project, testServer)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1094,6 +1105,87 @@ func TestDuplicateTableWithSchema(t *testing.T) {
}
}

func TestDuplicateDataset(t *testing.T) {
const (
projectName = "test"
datasetName = "dataset1"
)

ctx := context.Background()

bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
project := types.NewProject(projectName, types.NewDataset(datasetName))
if err := bqServer.Load(server.StructSource(project)); err != nil {
t.Fatal(err)
}

testServer := bqServer.TestServer()
defer func() {
testServer.Close()
bqServer.Stop(ctx)
}()

client, err := bigquery.NewClient(
ctx,
projectName,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer client.Close()
dataset := client.Dataset(datasetName)
err = dataset.Create(ctx, nil)
if err == nil || !strings.HasSuffix(err.Error(), "duplicate") {
t.Fatalf("expected duplicate error; got %s", err)
}
}

func TestDeleteDatasetInUseJob(t *testing.T) {
const (
projectName = "test"
datasetName = "dataset1"
)
ctx := context.Background()

bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}

project := types.NewProject(projectName, types.NewDataset(datasetName,
types.NewTable("table1", []*types.Column{
types.NewColumn("id", types.STRING),
}, nil),
))

if err := bqServer.Load(server.StructSource(project)); err != nil {
t.Fatal(err)
}

testServer := bqServer.TestServer()
defer func() {
testServer.Close()
bqServer.Stop(ctx)
}()

client, err := buildClient(ctx, project, testServer)
if err != nil {
t.Fatal(err)
}
defer client.Close()

dataset := client.Dataset(datasetName)
err = dataset.Delete(ctx)
if err == nil || !strings.HasSuffix(err.Error(), "resourceInUse") {
t.Fatalf("expected resource in use error; got %s", err)
}
}

func TestDataFromStruct(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit af0d0ea

Please sign in to comment.