diff --git a/go.mod b/go.mod index 643b185..e0b127c 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect ) -replace github.com/goccy/go-zetasqlite => github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.7 +replace github.com/goccy/go-zetasqlite => github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.8 replace github.com/mattn/go-sqlite3 => github.com/Recidiviz/go-sqlite3 v0.0.0-20240220230115-bffb5ad78048 diff --git a/go.sum b/go.sum index 1e86c31..a730da4 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvK github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Recidiviz/go-sqlite3 v0.0.0-20240220230115-bffb5ad78048 h1:G8qFbNf/6IWYup4//DcrwsMYvAl80qZk9hEb6Z+UfKc= github.com/Recidiviz/go-sqlite3 v0.0.0-20240220230115-bffb5ad78048/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.7 h1:wvjkJOGE9xCk4WtzNedjHOPuudqmqn9yz3Son8SPVRQ= -github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.7/go.mod h1:KVfVr9Lp7/4FH0Eeiunu1Dh274lxKJvWwpQWEkoRkuA= +github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.8 h1:OFNdqgtpVfUhcU5wIq0Uipxm7eZ+eiolpVGhrEFt1yA= +github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.8/go.mod h1:KVfVr9Lp7/4FH0Eeiunu1Dh274lxKJvWwpQWEkoRkuA= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= diff --git a/internal/contentdata/repository.go b/internal/contentdata/repository.go index e1c576e..7b24023 100644 --- a/internal/contentdata/repository.go +++ b/internal/contentdata/repository.go @@ -5,11 +5,10 @@ import ( "database/sql" "fmt" "github.com/goccy/go-zetasqlite" - "reflect" - "strings" - "go.uber.org/zap" bigqueryv2 "google.golang.org/api/bigquery/v2" + "reflect" + "strings" "github.com/goccy/bigquery-emulator/internal/connection" "github.com/goccy/bigquery-emulator/internal/logger" diff --git a/internal/metadata/dataset.go b/internal/metadata/dataset.go index b8ddd23..dc98a9a 100644 --- a/internal/metadata/dataset.go +++ b/internal/metadata/dataset.go @@ -13,47 +13,11 @@ import ( var ErrDuplicatedTable = errors.New("table is already created") type Dataset struct { - ID string - ProjectID string - tables []*Table - tableMap map[string]*Table - models []*Model - modelMap map[string]*Model - routines []*Routine - routineMap map[string]*Routine - mu sync.RWMutex - content *bigqueryv2.Dataset - repo *Repository -} - -func (d *Dataset) TableIDs() []string { - d.mu.RLock() - defer d.mu.RUnlock() - tableIDs := make([]string, 0, len(d.tables)) - for _, table := range d.tables { - tableIDs = append(tableIDs, table.ID) - } - return tableIDs -} - -func (d *Dataset) ModelIDs() []string { - d.mu.RLock() - defer d.mu.RUnlock() - modelIDs := make([]string, 0, len(d.models)) - for _, model := range d.models { - modelIDs = append(modelIDs, model.ID) - } - return modelIDs -} - -func (d *Dataset) RoutineIDs() []string { - d.mu.RLock() - defer d.mu.RUnlock() - routineIDs := make([]string, 0, len(d.routines)) - for _, routine := range d.routines { - routineIDs = append(routineIDs, routine.ID) - } - return routineIDs + ID string + ProjectID string + mu sync.RWMutex + content *bigqueryv2.Dataset + repo *Repository } func (d *Dataset) Content() *bigqueryv2.Dataset { @@ -101,51 +65,47 @@ func (d *Dataset) UpdateContent(newContent *bigqueryv2.Dataset) { } func (d *Dataset) Insert(ctx context.Context, tx *sql.Tx) error { + d.mu.Lock() + defer d.mu.Unlock() return d.repo.AddDataset(ctx, tx, d) } func (d *Dataset) Delete(ctx context.Context, tx *sql.Tx) error { + d.mu.Lock() + defer d.mu.Unlock() return d.repo.DeleteDataset(ctx, tx, d) } func (d *Dataset) DeleteModel(ctx context.Context, tx *sql.Tx, id string) error { d.mu.Lock() defer d.mu.Unlock() - model, exists := d.modelMap[id] - if !exists { - return fmt.Errorf("model '%s' is not found in dataset '%s'", id, d.ID) - } - if err := model.Delete(ctx, tx); err != nil { + model, err := d.repo.FindModel(ctx, d.ProjectID, d.ID, id) + if err != nil { return err } - newModels := make([]*Model, 0, len(d.models)) - for _, model := range d.models { - if model.ID == id { - continue - } - newModels = append(newModels, model) + if model != nil { + return fmt.Errorf("model '%s' is not found in dataset '%s'", id, d.ID) } - d.models = newModels - delete(d.modelMap, id) - if err := d.repo.UpdateDataset(ctx, tx, d); err != nil { + if err := model.Delete(ctx, tx); err != nil { return err } return nil } func (d *Dataset) AddTable(ctx context.Context, tx *sql.Tx, table *Table) error { - d.mu.Lock() - if _, exists := d.tableMap[table.ID]; exists { - d.mu.Unlock() + exists, err := d.repo.TableExists(ctx, tx, d.ProjectID, d.ID, table.ID) + if err != nil { + return err + } + if exists { return fmt.Errorf("table %s: %w", table.ID, ErrDuplicatedTable) } + + d.mu.Lock() + defer d.mu.Unlock() if err := table.Insert(ctx, tx); err != nil { - d.mu.Unlock() return err } - d.tables = append(d.tables, table) - d.tableMap[table.ID] = table - d.mu.Unlock() if err := d.repo.UpdateDataset(ctx, tx, d); err != nil { return err @@ -153,40 +113,40 @@ func (d *Dataset) AddTable(ctx context.Context, tx *sql.Tx, table *Table) error return nil } -func (d *Dataset) Table(id string) *Table { - d.mu.RLock() - defer d.mu.RUnlock() - return d.tableMap[id] +func (d *Dataset) Table(ctx context.Context, id string) (*Table, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.repo.FindTable(ctx, d.ProjectID, d.ID, id) } -func (d *Dataset) Model(id string) *Model { - d.mu.RLock() - defer d.mu.RUnlock() - return d.modelMap[id] +func (d *Dataset) Model(ctx context.Context, id string) (*Model, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.repo.FindModel(ctx, d.ProjectID, d.ID, id) } -func (d *Dataset) Routine(id string) *Routine { - d.mu.RLock() - defer d.mu.RUnlock() - return d.routineMap[id] +func (d *Dataset) Routine(ctx context.Context, id string) (*Routine, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.repo.FindRoutine(ctx, d.ProjectID, d.ID, id) } -func (d *Dataset) Tables() []*Table { - d.mu.RLock() - defer d.mu.RUnlock() - return d.tables +func (d *Dataset) Tables(ctx context.Context) ([]*Table, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.repo.FindTablesInDatasets(ctx, d.ProjectID, d.ID) } -func (d *Dataset) Models() []*Model { - d.mu.RLock() - defer d.mu.RUnlock() - return d.models +func (d *Dataset) Models(ctx context.Context) ([]*Model, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.repo.FindModelsInDataset(ctx, d.ProjectID, d.ID) } -func (d *Dataset) Routines() []*Routine { - d.mu.RLock() - defer d.mu.RUnlock() - return d.routines +func (d *Dataset) Routines(ctx context.Context) ([]*Routine, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.repo.FindRoutinesInDataset(ctx, d.ProjectID, d.ID) } func NewDataset( @@ -194,33 +154,11 @@ func NewDataset( projectID string, datasetID string, content *bigqueryv2.Dataset, - tables []*Table, - models []*Model, - routines []*Routine) *Dataset { - - tableMap := map[string]*Table{} - for _, table := range tables { - tableMap[table.ID] = table - } - modelMap := map[string]*Model{} - for _, model := range models { - modelMap[model.ID] = model - } - routineMap := map[string]*Routine{} - for _, routine := range routines { - routineMap[routine.ID] = routine - } - +) *Dataset { return &Dataset{ - ID: datasetID, - ProjectID: projectID, - tables: tables, - tableMap: tableMap, - models: models, - modelMap: modelMap, - routines: routines, - routineMap: routineMap, - content: content, - repo: repo, + ID: datasetID, + ProjectID: projectID, + content: content, + repo: repo, } } diff --git a/internal/metadata/project.go b/internal/metadata/project.go index cecca25..e8b9e56 100644 --- a/internal/metadata/project.go +++ b/internal/metadata/project.go @@ -8,100 +8,95 @@ import ( ) type Project struct { - ID string - datasets []*Dataset - datasetMap map[string]*Dataset - jobs []*Job - jobMap map[string]*Job - mu sync.RWMutex - repo *Repository + ID string + mu sync.RWMutex + repo *Repository } -func (p *Project) DatasetIDs() []string { - ids := make([]string, len(p.datasets)) - for i := 0; i < len(p.datasets); i++ { - ids[i] = p.datasets[i].ID +func (p *Project) DatasetIDs(ctx context.Context) ([]string, error) { + datasets, err := p.FetchDatasets(ctx) + if err != nil { + return nil, err } - return ids -} - -func (p *Project) JobIDs() []string { - ids := make([]string, len(p.jobs)) - for i := 0; i < len(p.jobs); i++ { - ids[i] = p.jobs[i].ID + ids := make([]string, len(datasets)) + for i := 0; i < len(datasets); i++ { + ids[i] = datasets[i].ID } - return ids + return ids, nil } -func (p *Project) Job(id string) *Job { +func (p *Project) Job(ctx context.Context, id string) (*Job, error) { p.mu.RLock() defer p.mu.RUnlock() - return p.jobMap[id] + job, err := p.repo.FindJob(ctx, p.ID, id) + if err != nil { + return nil, err + } + return job, nil } -func (p *Project) Jobs() []*Job { +func (p *Project) Dataset(ctx context.Context, id string) (*Dataset, error) { p.mu.RLock() defer p.mu.RUnlock() - return p.jobs + dataset, err := p.repo.FindDataset(ctx, p.ID, id) + if err != nil { + return nil, err + } + return dataset, nil } -func (p *Project) Dataset(id string) *Dataset { +func (p *Project) FetchDatasets(ctx context.Context) ([]*Dataset, error) { p.mu.RLock() defer p.mu.RUnlock() - return p.datasetMap[id] + datasets, err := p.repo.FindDatasetsInProject(ctx, p.ID) + if err != nil { + return nil, err + } + return datasets, nil } -func (p *Project) Datasets() []*Dataset { +func (p *Project) FetchJobs(ctx context.Context) ([]*Job, error) { p.mu.RLock() defer p.mu.RUnlock() - return p.datasets + datasets, err := p.repo.FindJobsInProject(ctx, p.ID) + if err != nil { + return nil, err + } + return datasets, nil } func (p *Project) Insert(ctx context.Context, tx *sql.Tx) error { + p.mu.Lock() + defer p.mu.Unlock() return p.repo.AddProject(ctx, tx, p) } func (p *Project) Delete(ctx context.Context, tx *sql.Tx) error { + p.mu.Lock() + defer p.mu.Unlock() return p.repo.DeleteProject(ctx, tx, p) } func (p *Project) AddDataset(ctx context.Context, tx *sql.Tx, dataset *Dataset) error { p.mu.Lock() defer p.mu.Unlock() - if _, exists := p.datasetMap[dataset.ID]; exists { - return fmt.Errorf("dataset %s is already created", dataset.ID) - } if err := dataset.Insert(ctx, tx); err != nil { return err } - p.datasets = append(p.datasets, dataset) - p.datasetMap[dataset.ID] = dataset - if err := p.repo.UpdateProject(ctx, tx, p); err != nil { - return err - } return nil } func (p *Project) DeleteDataset(ctx context.Context, tx *sql.Tx, id string) error { - p.mu.Lock() - defer p.mu.Unlock() - dataset, exists := p.datasetMap[id] - if !exists { - return fmt.Errorf("dataset '%s' is not found in project '%s'", id, p.ID) - } - if err := dataset.Delete(ctx, tx); err != nil { + dataset, err := p.Dataset(ctx, id) + if err != nil { return err } - newDatasets := make([]*Dataset, 0, len(p.datasets)) - for _, dataset := range p.datasets { - if dataset.ID == id { - continue - } - newDatasets = append(newDatasets, dataset) + if dataset == nil { + return fmt.Errorf("dataset '%s' is not found in project '%s'", id, p.ID) } - p.datasets = newDatasets - delete(p.datasetMap, id) - if err := p.repo.UpdateProject(ctx, tx, p); err != nil { + p.mu.Lock() + defer p.mu.Unlock() + if err := dataset.Delete(ctx, tx); err != nil { return err } return nil @@ -110,60 +105,31 @@ func (p *Project) DeleteDataset(ctx context.Context, tx *sql.Tx, id string) erro func (p *Project) AddJob(ctx context.Context, tx *sql.Tx, job *Job) error { p.mu.Lock() defer p.mu.Unlock() - if _, exists := p.jobMap[job.ID]; exists { - return fmt.Errorf("job %s is already created", job.ID) - } if err := job.Insert(ctx, tx); err != nil { return err } - p.jobs = append(p.jobs, job) - p.jobMap[job.ID] = job - if err := p.repo.UpdateProject(ctx, tx, p); err != nil { - return err - } return nil } func (p *Project) DeleteJob(ctx context.Context, tx *sql.Tx, id string) error { - p.mu.Lock() - defer p.mu.Unlock() - job, exists := p.jobMap[id] - if !exists { - return fmt.Errorf("job '%s' is not found in project '%s'", id, p.ID) - } - if err := job.Delete(ctx, tx); err != nil { + job, err := p.Job(ctx, id) + if err != nil { return err } - newJobs := make([]*Job, 0, len(p.jobs)) - for _, job := range p.jobs { - if job.ID == id { - continue - } - newJobs = append(newJobs, job) + if job == nil { + return fmt.Errorf("job '%s' is not found in project '%s'", id, p.ID) } - p.jobs = newJobs - delete(p.jobMap, id) - if err := p.repo.UpdateProject(ctx, tx, p); err != nil { + p.mu.Lock() + defer p.mu.Unlock() + if err := job.Delete(ctx, tx); err != nil { return err } return nil } -func NewProject(repo *Repository, id string, datasets []*Dataset, jobs []*Job) *Project { - datasetMap := map[string]*Dataset{} - for _, dataset := range datasets { - datasetMap[dataset.ID] = dataset - } - jobMap := map[string]*Job{} - for _, job := range jobs { - jobMap[job.ID] = job - } +func NewProject(repo *Repository, id string) *Project { return &Project{ - ID: id, - datasets: datasets, - jobs: jobs, - datasetMap: datasetMap, - jobMap: jobMap, - repo: repo, + ID: id, + repo: repo, } } diff --git a/internal/metadata/repository.go b/internal/metadata/repository.go index 2d31796..4a79ebe 100644 --- a/internal/metadata/repository.go +++ b/internal/metadata/repository.go @@ -3,10 +3,10 @@ package metadata import ( "context" "database/sql" + "encoding/json" "errors" "fmt" - - "github.com/goccy/go-json" + "github.com/goccy/bigquery-emulator/internal" "github.com/goccy/go-zetasqlite" bigqueryv2 "google.golang.org/api/bigquery/v2" @@ -18,8 +18,6 @@ var schemata = []string{ ` CREATE TABLE IF NOT EXISTS projects ( id STRING NOT NULL, - datasetIDs ARRAY, - jobIDs ARRAY, PRIMARY KEY (id) )`, ` @@ -35,14 +33,11 @@ CREATE TABLE IF NOT EXISTS jobs ( CREATE TABLE IF NOT EXISTS datasets ( id STRING NOT NULL, projectID STRING NOT NULL, - tableIDs ARRAY, - modelIDs ARRAY, - routineIDs ARRAY, metadata STRING, PRIMARY KEY (projectID, id) -)`, - ` -CREATE TABLE IF NOT EXISTS tables ( +) +`, + `CREATE TABLE IF NOT EXISTS tables ( id STRING NOT NULL, projectID STRING NOT NULL, datasetID STRING NOT NULL, @@ -68,7 +63,62 @@ CREATE TABLE IF NOT EXISTS routines ( } type Repository struct { - db *sql.DB + db *sql.DB + queries *internal.PreparedStatementRepository +} + +const ( + StmtFindProject internal.Statement = `SELECT id FROM projects WHERE id = @id` + StmtInsertProject internal.Statement = `INSERT INTO projects (id) VALUES (@id)` + StmtDeleteProject internal.Statement = `DELETE FROM projects WHERE id = @id` + StmtFindJobsInProject internal.Statement = `SELECT id, projectID, metadata, result, error FROM jobs WHERE projectID = @projectid` + StmtDeleteTable internal.Statement = `DELETE FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id = @tableID` + StmtInsertTable internal.Statement = `INSERT INTO tables (id, projectID, datasetID, metadata) VALUES (@id, @projectID, @datasetID, @metadata)` + StmtFindTable internal.Statement = `SELECT id, metadata FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id = @tableID` + StmtUpdateTable internal.Statement = `UPDATE tables SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id` + StmtTableExists internal.Statement = `SELECT TRUE FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id = @tableID` + StmtFindTablesInDataset internal.Statement = `SELECT id, datasetID, metadata FROM tables WHERE projectID = @projectID AND datasetID = @datasetID` + StmtFindModelsInDataset internal.Statement = `SELECT id, datasetID, metadata FROM models WHERE projectID = @projectID AND datasetID = @datasetID` + StmtUpdateModel internal.Statement = `UPDATE models SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id` + StmtInsertModel internal.Statement = `INSERT INTO models (id, projectID, datasetID, metadata) VALUES (@id, @projectID, @datasetID, @metadata)` + StmtDeleteModel internal.Statement = `DELETE FROM models WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id` + StmtFindRoutinesInDataset internal.Statement = `SELECT id, datasetID, metadata FROM routines WHERE projectID = @projectID AND datasetID = @datasetID` + StmtUpdateRoutine internal.Statement = `UPDATE routines SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id` + StmtInsertRoutine internal.Statement = `INSERT INTO routines (id, projectID, datasetID, metadata) VALUES (@id, @projectID, @datasetID, @metadata)` + StmtDeleteRoutine internal.Statement = `DELETE FROM routines WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id` + StmtFindJob internal.Statement = `SELECT id, metadata, result, error FROM jobs WHERE projectID = @projectID AND id = @jobID` + StmtInsertJob internal.Statement = `INSERT INTO jobs (id, projectID, metadata, result, error) VALUES (@id, @projectID, @metadata, @result, @error)` + StmtUpdateJob internal.Statement = `UPDATE jobs SET metadata = @metadata, result = @result, error = @error WHERE projectID = @projectID AND id = @id` + StmtDeleteJob internal.Statement = `DELETE FROM jobs WHERE projectID = @projectID AND id = @jobID` + StmtFindDataset internal.Statement = `SELECT id, projectID, metadata FROM datasets WHERE projectID = @projectID AND id = @datasetID` + StmtInsertDataset internal.Statement = `INSERT INTO datasets (id, projectID, metadata) VALUES (@id, @projectID, @metadata)` + StmtDeleteDataset internal.Statement = `DELETE FROM datasets WHERE projectID = @projectID AND id = @id` + StmtDatasetExists internal.Statement = `SELECT TRUE FROM datasets WHERE projectID = @projectID AND id = @datasetID` + StmtUpdateDataset internal.Statement = `UPDATE datasets SET metadata = @metadata WHERE projectID = @projectID AND id = @datasetID` +) + +var preparedStatements = []internal.Statement{ + StmtFindProject, + StmtInsertProject, + StmtDeleteProject, + StmtFindJobsInProject, + StmtInsertDataset, + StmtDeleteTable, + StmtInsertTable, + StmtFindTable, + StmtUpdateTable, + StmtTableExists, + StmtFindTablesInDataset, + StmtFindModelsInDataset, + StmtFindRoutinesInDataset, + StmtFindJob, + StmtInsertJob, + StmtUpdateJob, + StmtDeleteJob, + StmtFindDataset, + StmtDeleteDataset, + StmtDatasetExists, + StmtUpdateDataset, } func NewRepository(db *sql.DB) (*Repository, error) { @@ -76,14 +126,29 @@ func NewRepository(db *sql.DB) (*Repository, error) { if err != nil { return nil, err } - defer tx.Commit() for _, ddl := range schemata { if _, err := tx.ExecContext(context.Background(), ddl); err != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + return nil, rollbackErr + } return nil, err } } + err = tx.Commit() + if err != nil { + return nil, err + } + preparedQueryRepository := internal.NewPreparedStatementRepository( + db, + preparedStatements, + ) + if err != nil { + return nil, err + } return &Repository{ - db: db, + db: db, + queries: preparedQueryRepository, }, nil } @@ -105,19 +170,20 @@ func (r *Repository) getConnection(ctx context.Context) (*sql.Conn, error) { return conn, nil } -func (r *Repository) ProjectFromData(data *types.Project) *Project { +func (r *Repository) ProjectFromData(data *types.Project) (*Project, []*Dataset, []*Job) { datasets := make([]*Dataset, 0, len(data.Datasets)) for _, ds := range data.Datasets { - datasets = append(datasets, r.DatasetFromData(data.ID, ds)) + dataset, _, _, _ := r.DatasetFromData(data.ID, ds) + datasets = append(datasets, dataset) } jobs := make([]*Job, 0, len(data.Jobs)) for _, j := range data.Jobs { jobs = append(jobs, r.JobFromData(data.ID, j)) } - return NewProject(r, data.ID, datasets, jobs) + return NewProject(r, data.ID), datasets, jobs } -func (r *Repository) DatasetFromData(projectID string, data *types.Dataset) *Dataset { +func (r *Repository) DatasetFromData(projectID string, data *types.Dataset) (*Dataset, []*Table, []*Model, []*Routine) { tables := make([]*Table, 0, len(data.Tables)) for _, table := range data.Tables { tables = append(tables, r.TableFromData(projectID, data.ID, table)) @@ -130,7 +196,7 @@ func (r *Repository) DatasetFromData(projectID string, data *types.Dataset) *Dat for _, routine := range data.Routines { routines = append(routines, r.RoutineFromData(projectID, data.ID, routine)) } - return NewDataset(r, projectID, data.ID, nil, tables, models, routines) + return NewDataset(r, projectID, data.ID, nil), tables, models, routines } func (r *Repository) JobFromData(projectID string, data *types.Job) *Job { @@ -174,45 +240,34 @@ func (r *Repository) FindProject(ctx context.Context, id string) (*Project, erro return nil, err } defer tx.Commit() - projects, err := r.findProjects(ctx, tx, []string{id}) - if err != nil { - return nil, err - } - if len(projects) != 1 { - return nil, nil - } - if projects[0].ID != id { - return nil, nil - } - return projects[0], nil + return r.FindProjectWithConn(ctx, tx, id) } func (r *Repository) findProjects(ctx context.Context, tx *sql.Tx, ids []string) ([]*Project, error) { - rows, err := tx.QueryContext(ctx, "SELECT id, datasetIDs, jobIDs FROM projects WHERE id IN UNNEST(@ids)", ids) + stmt, err := r.queries.Get(ctx, tx, StmtFindProject) if err != nil { - return nil, fmt.Errorf("failed to get projects: %w", err) + return nil, err } - defer rows.Close() + projects := []*Project{} - for rows.Next() { - var ( - projectID string - datasetIDs []interface{} - jobIDs []interface{} - ) - if err := rows.Scan(&projectID, &datasetIDs, &jobIDs); err != nil { - return nil, err - } - datasets, err := r.findDatasets(ctx, tx, projectID, r.convertToStrings(datasetIDs)) + for _, id := range ids { + var projectID string + err := stmt.QueryRowContext( + ctx, + sql.Named("id", id), + ).Scan(&projectID) if err != nil { - return nil, err + if errors.Is(err, sql.ErrNoRows) { + continue + } + return nil, fmt.Errorf("failed to get projects: %w", err) } - jobs, err := r.findJobs(ctx, tx, projectID, r.convertToStrings(jobIDs)) if err != nil { return nil, err } - projects = append(projects, NewProject(r, projectID, datasets, jobs)) + projects = append(projects, NewProject(r, projectID)) } + return projects, nil } @@ -228,33 +283,28 @@ func (r *Repository) FindAllProjects(ctx context.Context) ([]*Project, error) { return nil, err } defer tx.Commit() - - rows, err := tx.QueryContext(ctx, "SELECT id, datasetIDs, jobIDs FROM projects") + rows, err := tx.QueryContext(ctx, "SELECT id FROM projects") if err != nil { return nil, err } defer rows.Close() - projects := []*Project{} + projectIDs := []string{} for rows.Next() { var ( - projectID string - datasetIDs []interface{} - jobIDs []interface{} + projectID string ) - if err := rows.Scan(&projectID, &datasetIDs, &jobIDs); err != nil { - return nil, err - } - datasets, err := r.findDatasets(ctx, tx, projectID, r.convertToStrings(datasetIDs)) - if err != nil { - return nil, err - } - jobs, err := r.findJobs(ctx, tx, projectID, r.convertToStrings(jobIDs)) - if err != nil { + if err := rows.Scan(&projectID); err != nil { return nil, err } - projects = append(projects, NewProject(r, projectID, datasets, jobs)) + projectIDs = append(projectIDs, projectID) + } + + projects := []*Project{} + for _, projectID := range projectIDs { + projects = append(projects, NewProject(r, projectID)) } + return projects, nil } @@ -270,23 +320,13 @@ func (r *Repository) AddProjectIfNotExists(ctx context.Context, tx *sql.Tx, proj } func (r *Repository) AddProject(ctx context.Context, tx *sql.Tx, project *Project) error { - if _, err := tx.Exec( - "INSERT projects (id, datasetIDs, jobIDs) VALUES (@id, @datasetIDs, @jobIDs)", - sql.Named("id", project.ID), - sql.Named("datasetIDs", project.DatasetIDs()), - sql.Named("jobIDs", project.JobIDs()), - ); err != nil { + stmt, err := r.queries.Get(ctx, tx, StmtInsertProject) + if err != nil { return err } - return nil -} - -func (r *Repository) UpdateProject(ctx context.Context, tx *sql.Tx, project *Project) error { - if _, err := tx.Exec( - "UPDATE projects SET datasetIDs = @datasetIDs, jobIDs = @jobIDs WHERE id = @id", + if _, err := stmt.ExecContext( + ctx, sql.Named("id", project.ID), - sql.Named("datasetIDs", project.DatasetIDs()), - sql.Named("jobIDs", project.JobIDs()), ); err != nil { return err } @@ -294,7 +334,12 @@ func (r *Repository) UpdateProject(ctx context.Context, tx *sql.Tx, project *Pro } func (r *Repository) DeleteProject(ctx context.Context, tx *sql.Tx, project *Project) error { - if _, err := tx.Exec("DELETE FROM projects WHERE id = @id", project.ID); err != nil { + stmt, err := r.queries.Get(ctx, tx, StmtDeleteProject) + if err != nil { + return err + } + + if _, err := stmt.ExecContext(ctx, sql.Named("id", project.ID)); err != nil { return err } return nil @@ -325,11 +370,76 @@ func (r *Repository) FindJob(ctx context.Context, projectID, jobID string) (*Job } func (r *Repository) findJobs(ctx context.Context, tx *sql.Tx, projectID string, jobIDs []string) ([]*Job, error) { - rows, err := tx.QueryContext( + jobs := []*Job{} + stmt, err := r.queries.Get(ctx, tx, StmtFindJob) + if err != nil { + return nil, err + } + + for _, id := range jobIDs { + var ( + jobID string + metadata string + result string + jobErr string + ) + + err := stmt.QueryRowContext( + ctx, + sql.Named("projectID", projectID), + sql.Named("jobID", id), + ).Scan(&jobID, &metadata, &result, &jobErr) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + continue + } + return nil, err + } + + var content bigqueryv2.Job + if len(metadata) > 0 { + if err := json.Unmarshal([]byte(metadata), &content); err != nil { + return nil, fmt.Errorf("failed to decode metadata content %s: %w", metadata, err) + } + } + var response internaltypes.QueryResponse + if len(result) > 0 { + if err := json.Unmarshal([]byte(result), &response); err != nil { + return nil, fmt.Errorf("failed to decode job response %s: %w", result, err) + } + } + var resErr error + if jobErr != "" { + resErr = errors.New(jobErr) + } + jobs = append( + jobs, + NewJob(r, projectID, jobID, &content, &response, resErr), + ) + } + + return jobs, nil +} + +func (r *Repository) FindJobsInProject(ctx context.Context, pID string) ([]*Job, error) { + conn, err := r.getConnection(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Commit() + + stmt, err := r.queries.Get(ctx, tx, StmtFindJobsInProject) + if err != nil { + return nil, err + } + rows, err := stmt.QueryContext( ctx, - "SELECT id, projectID, metadata, result, error FROM jobs WHERE projectID = @projectID AND id IN UNNEST(@jobIDs)", - sql.Named("projectID", projectID), - sql.Named("jobIDs", jobIDs), + sql.Named("projectID", pID), ) if err != nil { return nil, err @@ -384,8 +494,13 @@ func (r *Repository) AddJob(ctx context.Context, tx *sql.Tx, job *Job) error { if job.err != nil { jobErr = job.err.Error() } - if _, err := tx.Exec( - "INSERT jobs (id, projectID, metadata, result, error) VALUES (@id, @projectID, @metadata, @result, @error)", + stmt, err := r.queries.Get(ctx, tx, StmtInsertJob) + if err != nil { + return err + } + + if _, err := stmt.ExecContext( + ctx, sql.Named("id", job.ID), sql.Named("projectID", job.ProjectID), sql.Named("metadata", string(metadata)), @@ -410,8 +525,12 @@ func (r *Repository) UpdateJob(ctx context.Context, tx *sql.Tx, job *Job) error if job.err != nil { jobErr = job.err.Error() } - if _, err := tx.Exec( - "UPDATE jobs SET metadata = @metadata, result = @result, error = @error WHERE projectID = @projectID AND id = @id", + stmt, err := r.queries.Get(ctx, tx, StmtUpdateJob) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, sql.Named("id", job.ID), sql.Named("projectID", job.ProjectID), sql.Named("metadata", string(metadata)), @@ -424,16 +543,84 @@ func (r *Repository) UpdateJob(ctx context.Context, tx *sql.Tx, job *Job) error } func (r *Repository) DeleteJob(ctx context.Context, tx *sql.Tx, job *Job) error { - if _, err := tx.Exec( - "DELETE FROM jobs WHERE projectID = @projectID AND id = @id", - job.ProjectID, - job.ID, + stmt, err := r.queries.Get(ctx, tx, StmtDeleteJob) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, + sql.Named("projectID", job.ProjectID), + sql.Named("jobID", job.ID), ); err != nil { return err } return nil } +func (r *Repository) FindDatasetsInProject(ctx context.Context, projectID string) ([]*Dataset, error) { + conn, err := r.getConnection(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Commit() + + rows, err := tx.QueryContext(ctx, + "SELECT id, metadata FROM datasets WHERE projectID = @projectID", + sql.Named("projectID", projectID), + ) + if err != nil { + return nil, err + } + defer rows.Close() + + datasets := []*Dataset{} + for rows.Next() { + var ( + datasetID string + metadata string + ) + if err := rows.Scan(&datasetID, &metadata); err != nil { + return nil, err + } + var content bigqueryv2.Dataset + if err := json.Unmarshal([]byte(metadata), &content); err != nil { + return nil, err + } + datasets = append( + datasets, + NewDataset(r, projectID, datasetID, &content), + ) + } + return datasets, nil +} + +func (r *Repository) TableExists(ctx context.Context, tx *sql.Tx, projectID, datasetID, tableID string) (bool, error) { + var exists bool + + stmt, err := r.queries.Get(ctx, tx, StmtTableExists) + if err != nil { + return false, err + } + err = stmt.QueryRowContext( + ctx, + sql.Named("projectID", projectID), + sql.Named("datasetID", datasetID), + sql.Named("tableID", tableID), + ).Scan(&exists) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, err + } + return true, nil +} + func (r *Repository) FindDataset(ctx context.Context, projectID, datasetID string) (*Dataset, error) { conn, err := r.getConnection(ctx) if err != nil { @@ -445,6 +632,10 @@ func (r *Repository) FindDataset(ctx context.Context, projectID, datasetID strin return nil, err } defer tx.Commit() + return r.FindDatasetWithConnection(ctx, tx, projectID, datasetID) +} + +func (r *Repository) FindDatasetWithConnection(ctx context.Context, tx *sql.Tx, projectID string, datasetID string) (*Dataset, error) { datasets, err := r.findDatasets(ctx, tx, projectID, []string{datasetID}) if err != nil { return nil, err @@ -459,50 +650,38 @@ func (r *Repository) FindDataset(ctx context.Context, projectID, datasetID strin } func (r *Repository) findDatasets(ctx context.Context, tx *sql.Tx, projectID string, datasetIDs []string) ([]*Dataset, error) { - rows, err := tx.QueryContext(ctx, - "SELECT id, projectID, tableIDs, modelIDs, routineIDs, metadata FROM datasets WHERE projectID = @projectID AND id IN UNNEST(@datasetIDs)", - sql.Named("projectID", projectID), - sql.Named("datasetIDs", datasetIDs), - ) + datasets := []*Dataset{} + stmt, err := r.queries.Get(ctx, tx, StmtFindDataset) if err != nil { return nil, err } - defer rows.Close() - - datasets := []*Dataset{} - for rows.Next() { + for _, datasetID := range datasetIDs { var ( - datasetID string - projectID string - tableIDs []interface{} - modelIDs []interface{} - routineIDs []interface{} - metadata string + resultDatasetID string + resultProjectID string + resultMetadata string ) - if err := rows.Scan(&datasetID, &projectID, &tableIDs, &modelIDs, &routineIDs, &metadata); err != nil { - return nil, err - } - tables, err := r.findTables(ctx, tx, projectID, datasetID, r.convertToStrings(tableIDs)) - if err != nil { - return nil, err - } - models, err := r.findModels(ctx, tx, projectID, datasetID, r.convertToStrings(modelIDs)) - if err != nil { - return nil, err - } - routines, err := r.findRoutines(ctx, tx, projectID, datasetID, r.convertToStrings(routineIDs)) + err := stmt.QueryRowContext(ctx, + sql.Named("projectID", projectID), + sql.Named("datasetID", datasetID), + ).Scan(&resultDatasetID, &resultProjectID, &resultMetadata) if err != nil { + if errors.Is(err, sql.ErrNoRows) { + continue + } return nil, err } + var content bigqueryv2.Dataset - if err := json.Unmarshal([]byte(metadata), &content); err != nil { + if err := json.Unmarshal([]byte(resultMetadata), &content); err != nil { return nil, err } datasets = append( datasets, - NewDataset(r, projectID, datasetID, &content, tables, models, routines), + NewDataset(r, projectID, datasetID, &content), ) } + return datasets, nil } @@ -511,13 +690,13 @@ func (r *Repository) AddDataset(ctx context.Context, tx *sql.Tx, dataset *Datase if err != nil { return err } - if _, err := tx.Exec( - "INSERT datasets (id, projectID, tableIDs, modelIDs, routineIDs, metadata) VALUES (@id, @projectID, @tableIDs, @modelIDs, @routineIDs, @metadata)", + stmt, err := r.queries.Get(ctx, tx, StmtInsertDataset) + if err != nil { + return err + } + if _, err := stmt.ExecContext(ctx, sql.Named("id", dataset.ID), sql.Named("projectID", dataset.ProjectID), - sql.Named("tableIDs", dataset.TableIDs()), - sql.Named("modelIDs", dataset.ModelIDs()), - sql.Named("routineIDs", dataset.RoutineIDs()), sql.Named("metadata", string(metadata)), ); err != nil { return err @@ -530,13 +709,14 @@ func (r *Repository) UpdateDataset(ctx context.Context, tx *sql.Tx, dataset *Dat if err != nil { return err } - if _, err := tx.Exec( - "UPDATE datasets SET tableIDs = @tableIDs, modelIDs = @modelIDs, routineIDs = @routineIDs, metadata = @metadata WHERE projectID = @projectID AND id = @id", - sql.Named("id", dataset.ID), + stmt, err := r.queries.Get(ctx, tx, StmtUpdateDataset) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, + sql.Named("dataestID", dataset.ID), sql.Named("projectID", dataset.ProjectID), - sql.Named("tableIDs", dataset.TableIDs()), - sql.Named("modelIDs", dataset.ModelIDs()), - sql.Named("routineIDs", dataset.RoutineIDs()), sql.Named("metadata", string(metadata)), ); err != nil { return err @@ -545,8 +725,12 @@ func (r *Repository) UpdateDataset(ctx context.Context, tx *sql.Tx, dataset *Dat } func (r *Repository) DeleteDataset(ctx context.Context, tx *sql.Tx, dataset *Dataset) error { - if _, err := tx.Exec( - "DELETE FROM datasets WHERE projectID = @projectID AND id = @id", + stmt, err := r.queries.Get(ctx, tx, StmtDeleteDataset) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, dataset.ProjectID, dataset.ID, ); err != nil { @@ -566,6 +750,10 @@ func (r *Repository) FindTable(ctx context.Context, projectID, datasetID, tableI return nil, err } defer tx.Commit() + return r.FindTableWithConnection(ctx, tx, projectID, datasetID, tableID) +} + +func (r *Repository) FindTableWithConnection(ctx context.Context, tx *sql.Tx, projectID, datasetID, tableID string) (*Table, error) { tables, err := r.findTables(ctx, tx, projectID, datasetID, []string{tableID}) if err != nil { return nil, err @@ -579,30 +767,113 @@ func (r *Repository) FindTable(ctx context.Context, projectID, datasetID, tableI return tables[0], nil } -func (r *Repository) findTables(ctx context.Context, tx *sql.Tx, projectID, datasetID string, tableIDs []string) ([]*Table, error) { - rows, err := tx.QueryContext( - ctx, - "SELECT id, metadata FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id IN UNNEST(@tableIDs)", - sql.Named("projectID", projectID), - sql.Named("datasetID", datasetID), - sql.Named("tableIDs", tableIDs), - ) +func (r *Repository) FindTablesInDatasets(ctx context.Context, projectID, datasetID string) ([]*Table, error) { + conn, err := r.getConnection(ctx) if err != nil { return nil, err } - defer rows.Close() + defer conn.Close() + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Commit() + return r.FindTablesInDatasetsWithConnection(ctx, tx, projectID, datasetID) +} + +func (r *Repository) FindTablesInDatasetsWithConnection(ctx context.Context, tx *sql.Tx, projectID, datasetID string) ([]*Table, error) { + tablesByDataset, err := r.findTablesInDatasets(ctx, tx, projectID, []string{datasetID}) + if err != nil { + return nil, err + } + tables, exists := tablesByDataset[datasetID] + if !exists { + return nil, fmt.Errorf("could not find any tables in dataset [%s]", datasetID) + } + return tables, nil +} + +func (r *Repository) findTablesInDatasets(ctx context.Context, tx *sql.Tx, projectID string, datasetIDs []string) (map[string][]*Table, error) { + tables := map[string][]*Table{} + + for _, datasetID := range datasetIDs { + tables[datasetID] = []*Table{} + } + for _, datasetID := range datasetIDs { + stmt, err := r.queries.Get(ctx, tx, StmtFindTablesInDataset) + if err != nil { + return nil, err + } + rows, err := stmt.QueryContext( + ctx, + sql.Named("projectID", projectID), + sql.Named("datasetID", datasetID), + ) + if err != nil { + rows.Close() + return nil, err + } + + for rows.Next() { + var ( + tableID string + datasetID string + metadata string + ) + if err := rows.Scan(&tableID, &datasetID, &metadata); err != nil { + return nil, err + } + var content map[string]interface{} + if err := json.Unmarshal([]byte(metadata), &content); err != nil { + return nil, err + } + + if _, ok := tables[datasetID]; !ok { + continue + } + + tables[datasetID] = append( + tables[datasetID], + NewTable(r, projectID, datasetID, tableID, content), + ) + } + err = rows.Close() + if err != nil { + return nil, err + } + } + return tables, nil +} +func (r *Repository) findTables(ctx context.Context, tx *sql.Tx, projectID, datasetID string, tableIDs []string) ([]*Table, error) { tables := []*Table{} - for rows.Next() { + + stmt, err := r.queries.Get(ctx, tx, StmtFindTable) + if err != nil { + return nil, err + } + + for _, tableID := range tableIDs { var ( - tableID string - metadata string + resultTableID string + resultMetadata string ) - if err := rows.Scan(&tableID, &metadata); err != nil { + + err := stmt.QueryRowContext( + ctx, + sql.Named("projectID", projectID), + sql.Named("datasetID", datasetID), + sql.Named("tableID", tableID), + ).Scan(&resultTableID, &resultMetadata) + if err != nil { + if err == sql.ErrNoRows { + continue + } return nil, err } + var content map[string]interface{} - if err := json.Unmarshal([]byte(metadata), &content); err != nil { + if err := json.Unmarshal([]byte(resultMetadata), &content); err != nil { return nil, err } tables = append( @@ -618,8 +889,12 @@ func (r *Repository) AddTable(ctx context.Context, tx *sql.Tx, table *Table) err if err != nil { return err } - if _, err := tx.Exec( - "INSERT tables (id, projectID, datasetID, metadata) VALUES (@id, @projectID, @datasetID, @metadata)", + stmt, err := r.queries.Get(ctx, tx, StmtInsertTable) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, sql.Named("id", table.ID), sql.Named("projectID", table.ProjectID), sql.Named("datasetID", table.DatasetID), @@ -635,8 +910,12 @@ func (r *Repository) UpdateTable(ctx context.Context, tx *sql.Tx, table *Table) if err != nil { return err } - if _, err := tx.Exec( - "UPDATE tables SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id", + stmt, err := r.queries.Get(ctx, tx, StmtUpdateTable) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, sql.Named("id", table.ID), sql.Named("projectID", table.ProjectID), sql.Named("datasetID", table.DatasetID), @@ -648,17 +927,92 @@ func (r *Repository) UpdateTable(ctx context.Context, tx *sql.Tx, table *Table) } func (r *Repository) DeleteTable(ctx context.Context, tx *sql.Tx, table *Table) error { - if _, err := tx.Exec( - "DELETE FROM tables WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id", - table.ProjectID, - table.DatasetID, - table.ID, + stmt, err := r.queries.Get(ctx, tx, StmtDeleteTable) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, + sql.Named("projectID", table.ProjectID), + sql.Named("datasetID", table.DatasetID), + sql.Named("tableID", table.ID), ); err != nil { return err } return nil } +func (r *Repository) FindModelsInDataset(ctx context.Context, projectID, datasetID string) ([]*Model, error) { + conn, err := r.getConnection(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Commit() + modelsByDataset, err := r.findModelsInDatasets(ctx, tx, projectID, []string{datasetID}) + if err != nil { + return nil, err + } + models, exists := modelsByDataset[datasetID] + if !exists { + return nil, fmt.Errorf("could not find any models in dataset [%s]", datasetID) + } + return models, nil +} + +func (r *Repository) findModelsInDatasets(ctx context.Context, tx *sql.Tx, projectID string, datasetIDs []string) (map[string][]*Model, error) { + models := map[string][]*Model{} + + stmt, err := r.queries.Get(ctx, tx, StmtFindModelsInDataset) + if err != nil { + return nil, err + } + + for _, datasetID := range datasetIDs { + models[datasetID] = []*Model{} + + rows, err := stmt.QueryContext( + ctx, + sql.Named("projectID", projectID), + sql.Named("datasetID", datasetID), + ) + if err != nil { + rows.Close() + return nil, err + } + + for rows.Next() { + var ( + modelID string + datasetID string + metadata string + ) + if err := rows.Scan(&modelID, &datasetID, &metadata); err != nil { + return nil, err + } + var content map[string]interface{} + if err := json.Unmarshal([]byte(metadata), &content); err != nil { + return nil, err + } + + models[datasetID] = append( + models[datasetID], + NewModel(r, projectID, datasetID, modelID, content), + ) + } + + err = rows.Close() + if err != nil { + return nil, err + } + } + return models, nil +} + func (r *Repository) FindModel(ctx context.Context, projectID, datasetID, modelID string) (*Model, error) { conn, err := r.getConnection(ctx) if err != nil { @@ -717,13 +1071,88 @@ func (r *Repository) findModels(ctx context.Context, tx *sql.Tx, projectID, data return models, nil } +func (r *Repository) FindRoutinesInDataset(ctx context.Context, projectID, datasetID string) ([]*Routine, error) { + conn, err := r.getConnection(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Commit() + routinesByDataset, err := r.findRoutinesInDatasets(ctx, tx, projectID, []string{datasetID}) + if err != nil { + return nil, err + } + routines, exists := routinesByDataset[datasetID] + if !exists { + return nil, fmt.Errorf("could not find any routines in dataset [%s]", datasetID) + } + return routines, nil +} + +func (r *Repository) findRoutinesInDatasets(ctx context.Context, tx *sql.Tx, projectID string, datasetIDs []string) (map[string][]*Routine, error) { + routines := map[string][]*Routine{} + + stmt, err := r.queries.Get(ctx, tx, StmtFindRoutinesInDataset) + if err != nil { + return nil, err + } + for _, datasetID := range datasetIDs { + rows, err := stmt.QueryContext( + ctx, + sql.Named("projectID", projectID), + sql.Named("datasetID", datasetID), + ) + if err != nil { + rows.Close() + return nil, err + } + + for _, datasetID := range datasetIDs { + routines[datasetID] = []*Routine{} + } + for rows.Next() { + var ( + routineID string + datasetID string + metadata string + ) + if err := rows.Scan(&routineID, &datasetID, &metadata); err != nil { + return nil, err + } + var content map[string]interface{} + if err := json.Unmarshal([]byte(metadata), &content); err != nil { + return nil, err + } + + routines[datasetID] = append( + routines[datasetID], + NewRoutine(r, projectID, datasetID, routineID, content), + ) + } + err = rows.Close() + if err != nil { + return nil, err + } + } + return routines, nil +} + func (r *Repository) AddModel(ctx context.Context, tx *sql.Tx, model *Model) error { metadata, err := json.Marshal(model.metadata) if err != nil { return err } - if _, err := tx.Exec( - "INSERT models (id, projectID, datasetID, metadata) VALUES (@id, @projectID, @datasetID, @metadata)", + + stmt, err := r.queries.Get(ctx, tx, StmtInsertModel) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, sql.Named("id", model.ID), sql.Named("projectID", model.ProjectID), sql.Named("datasetID", model.DatasetID), @@ -739,8 +1168,13 @@ func (r *Repository) UpdateModel(ctx context.Context, tx *sql.Tx, model *Model) if err != nil { return err } - if _, err := tx.Exec( - "UPDATE models SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id", + + stmt, err := r.queries.Get(ctx, tx, StmtUpdateModel) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, sql.Named("id", model.ID), sql.Named("projectID", model.ProjectID), sql.Named("datasetID", model.DatasetID), @@ -752,11 +1186,15 @@ func (r *Repository) UpdateModel(ctx context.Context, tx *sql.Tx, model *Model) } func (r *Repository) DeleteModel(ctx context.Context, tx *sql.Tx, model *Model) error { - if _, err := tx.Exec( - "DELETE FROM models WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id", - model.ProjectID, - model.DatasetID, - model.ID, + stmt, err := r.queries.Get(ctx, tx, StmtDeleteModel) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, + sql.Named("projectID", model.ProjectID), + sql.Named("datasetID", model.DatasetID), + sql.Named("id", model.ID), ); err != nil { return err } @@ -827,8 +1265,9 @@ func (r *Repository) AddRoutine(ctx context.Context, tx *sql.Tx, routine *Routin if err != nil { return err } - if _, err := tx.Exec( - "INSERT routines (id, projectID, datasetID, metadata) VALUES (@id, @projectID, @datasetID, @metadata)", + stmt, err := r.queries.Get(ctx, tx, StmtInsertRoutine) + if _, err := stmt.ExecContext( + ctx, sql.Named("id", routine.ID), sql.Named("projectID", routine.ProjectID), sql.Named("datasetID", routine.DatasetID), @@ -844,8 +1283,12 @@ func (r *Repository) UpdateRoutine(ctx context.Context, tx *sql.Tx, routine *Rou if err != nil { return err } - if _, err := tx.Exec( - "UPDATE routines SET metadata = @metadata WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id", + stmt, err := r.queries.Get(ctx, tx, StmtUpdateRoutine) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, sql.Named("id", routine.ID), sql.Named("projectID", routine.ProjectID), sql.Named("datasetID", routine.DatasetID), @@ -857,11 +1300,15 @@ func (r *Repository) UpdateRoutine(ctx context.Context, tx *sql.Tx, routine *Rou } func (r *Repository) DeleteRoutine(ctx context.Context, tx *sql.Tx, routine *Routine) error { - if _, err := tx.Exec( - "DELETE FROM routines WHERE projectID = @projectID AND datasetID = @datasetID AND id = @id", - routine.ProjectID, - routine.DatasetID, - routine.ID, + stmt, err := r.queries.Get(ctx, tx, StmtDeleteRoutine) + if err != nil { + return err + } + if _, err := stmt.ExecContext( + ctx, + sql.Named("projectID", routine.ProjectID), + sql.Named("datasetID", routine.DatasetID), + sql.Named("id", routine.ID), ); err != nil { return err } diff --git a/internal/prepared_statements.go b/internal/prepared_statements.go new file mode 100644 index 0000000..66989b4 --- /dev/null +++ b/internal/prepared_statements.go @@ -0,0 +1,41 @@ +package internal + +import ( + "context" + "database/sql" + "fmt" + "github.com/goccy/go-zetasqlite" +) + +type Statement string + +type PreparedStatementBuilder func(ctx context.Context, tx *sql.Tx) (*sql.Stmt, error) + +type PreparedStatementRepository struct { + preparedQueries map[Statement]*sql.Stmt +} + +func NewPreparedStatementRepository(db *sql.DB, queries []Statement) *PreparedStatementRepository { + var preparedQueries = map[Statement]*sql.Stmt{} + ctx := zetasqlite.WithQueryFormattingDisabled(context.Background()) + for _, query := range queries { + stmt, err := db.PrepareContext(ctx, string(query)) + if err != nil { + return nil + } + preparedQueries[query] = stmt + } + + return &PreparedStatementRepository{ + preparedQueries: preparedQueries, + } +} + +func (r *PreparedStatementRepository) Get(ctx context.Context, tx *sql.Tx, name Statement) (*sql.Stmt, error) { + ctx = zetasqlite.WithQueryFormattingDisabled(ctx) + if stmt, ok := r.preparedQueries[name]; ok { + return tx.StmtContext(ctx, stmt), nil + } + + return nil, fmt.Errorf("could not find prepared statement: %s", name) +} diff --git a/server/handler.go b/server/handler.go index 43ebcef..7bee3eb 100644 --- a/server/handler.go +++ b/server/handler.go @@ -327,7 +327,11 @@ func (h *uploadContentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) return } jobID := uploadID[0] - job := project.Job(jobID) + job, err := project.Job(ctx, jobID) + if err != nil { + errorResponse(ctx, w, errJobInternalError(err.Error())) + return + } if err := h.Handle(ctx, &uploadContentRequest{ server: server, project: project, @@ -401,8 +405,14 @@ func (h *uploadContentHandler) normalizeColumnNameForJSONData(columnMap map[stri func (h *uploadContentHandler) Handle(ctx context.Context, r *uploadContentRequest) error { load := r.job.Content().Configuration.Load tableRef := load.DestinationTable - dataset := r.project.Dataset(tableRef.DatasetId) - table := dataset.Table(tableRef.TableId) + dataset, err := r.project.Dataset(ctx, tableRef.DatasetId) + if err != nil { + return err + } + table, err := dataset.Table(ctx, tableRef.TableId) + if err != nil { + return err + } if table == nil { if load.CreateDisposition == "CREATE_NEVER" { return fmt.Errorf("`%s` is not found", tableRef.TableId) @@ -418,7 +428,10 @@ func (h *uploadContentHandler) Handle(ctx context.Context, r *uploadContentReque }); err != nil { return err } - table = dataset.Table(tableRef.TableId) + table, err = dataset.Table(ctx, tableRef.TableId) + if err != nil { + return err + } } tableContent, err := table.Content() @@ -632,16 +645,23 @@ func (h *datasetsDeleteHandler) Handle(ctx context.Context, r *datasetsDeleteReq return fmt.Errorf("failed to start transaction: %w", err) } defer tx.RollbackIfNotCommitted() - if err := r.project.DeleteDataset(ctx, tx.Tx(), r.dataset.ID); err != nil { + if err := r.dataset.Delete(ctx, tx.Tx()); err != nil { return fmt.Errorf("failed to delete dataset: %w", err) } if r.deleteContents { - for _, table := range r.dataset.Tables() { + tables, err := r.server.metaRepo.FindTablesInDatasetsWithConnection(ctx, tx.Tx(), r.dataset.ProjectID, r.dataset.ID) + if err != nil { + return fmt.Errorf("failed to find tables in dataset: %w", err) + } + tableIDs := make([]string, len(tables)) + for i, table := range tables { + tableIDs[i] = table.ID if err := table.Delete(ctx, tx.Tx()); err != nil { return err } } - if err := r.server.contentRepo.DeleteTables(ctx, tx, r.project.ID, r.dataset.ID, r.dataset.TableIDs()); err != nil { + + if err := r.server.contentRepo.DeleteTables(ctx, tx, r.project.ID, r.dataset.ID, tableIDs); err != nil { return fmt.Errorf("failed to delete tables: %w", err) } } @@ -736,9 +756,6 @@ func (h *datasetsInsertHandler) Handle(ctx context.Context, r *datasetsInsertReq r.project.ID, datasetID, r.dataset, - nil, - nil, - nil, ), ); err != nil { return nil, err @@ -783,7 +800,11 @@ type datasetsListRequest struct { func (h *datasetsListHandler) Handle(ctx context.Context, r *datasetsListRequest) (*bigqueryv2.DatasetList, error) { datasetsRes := []*bigqueryv2.DatasetListDatasets{} - for _, dataset := range r.project.Datasets() { + datasets, err := r.server.metaRepo.FindDatasetsInProject(ctx, r.project.ID) + if err != nil { + return nil, err + } + for _, dataset := range datasets { content := dataset.Content() datasetsRes = append(datasetsRes, &bigqueryv2.DatasetListDatasets{ DatasetReference: &bigqueryv2.DatasetReference{ @@ -937,7 +958,7 @@ func (h *jobsDeleteHandler) Handle(ctx context.Context, r *jobsDeleteRequest) er return fmt.Errorf("failed to start transaction: %w", err) } defer tx.RollbackIfNotCommitted() - if err := r.project.DeleteJob(ctx, tx.Tx(), r.job.ID); err != nil { + if err := r.job.Delete(ctx, tx.Tx()); err != nil { return fmt.Errorf("failed to delete job: %w", err) } if err := tx.Commit(); err != nil { @@ -1449,11 +1470,14 @@ func (h *jobsInsertHandler) Handle(ctx context.Context, r *jobsInsertRequest) (* if err != nil { return nil, err } - destinationDataset := r.project.Dataset(tableRef.DatasetId) - if destinationDataset == nil { - return nil, fmt.Errorf("failed to find destination dataset: %s", tableRef.DatasetId) + destinationDataset, err := r.server.metaRepo.FindDatasetWithConnection(ctx, tx.Tx(), tableRef.ProjectId, tableRef.DatasetId) + if err != nil { + return nil, fmt.Errorf("failed to find destination dataset: %s, err: %s", tableRef.DatasetId, err) + } + destinationTable, err := r.server.metaRepo.FindTableWithConnection(ctx, tx.Tx(), destinationDataset.ProjectID, destinationDataset.ID, tableRef.TableId) + if err != nil { + return nil, fmt.Errorf("failed to query for destination table: %w", err) } - destinationTable := destinationDataset.Table(tableRef.TableId) destinationTableExists := destinationTable != nil if !destinationTableExists { _, err := createTableMetadata(ctx, tx, r.server, r.project, destinationDataset, tableDef.ToBigqueryV2(r.project.ID, tableRef.DatasetId)) @@ -1559,7 +1583,10 @@ func addTableMetadata(ctx context.Context, server *Server, spec *zetasqlite.Tabl if err != nil { return err } - dataset := project.Dataset(datasetID) + dataset, err := project.Dataset(ctx, datasetID) + if err != nil { + return err + } if dataset == nil { return fmt.Errorf("dataset %s is not found", datasetID) } @@ -1607,11 +1634,17 @@ func deleteTableMetadata(ctx context.Context, server *Server, spec *zetasqlite.T if err != nil { return err } - dataset := project.Dataset(datasetID) + dataset, err := project.Dataset(ctx, datasetID) + if err != nil { + return err + } if dataset == nil { return fmt.Errorf("dataset %s is not found", datasetID) } - table := dataset.Table(tableID) + table, err := dataset.Table(ctx, tableID) + if err != nil { + return err + } conn, err := server.connMgr.Connection(ctx, projectID, datasetID) if err != nil { return err @@ -1653,9 +1686,6 @@ func (h *jobsInsertHandler) addQueryResultToDynamicDestinationTable(ctx context. DatasetId: datasetID, }, }, - []*metadata.Table{table}, - nil, - nil, ) if err := r.project.AddDataset(ctx, tx.Tx(), dataset); err != nil { return err @@ -1693,10 +1723,14 @@ type jobsListRequest struct { } func (h *jobsListHandler) Handle(ctx context.Context, r *jobsListRequest) (*bigqueryv2.JobList, error) { - jobs := []*bigqueryv2.JobListJobs{} - for _, job := range r.project.Jobs() { + jobsList := []*bigqueryv2.JobListJobs{} + jobs, err := r.project.FetchJobs(ctx) + if err != nil { + return nil, err + } + for _, job := range jobs { content := job.Content() - jobs = append(jobs, &bigqueryv2.JobListJobs{ + jobsList = append(jobsList, &bigqueryv2.JobListJobs{ Id: content.Id, JobReference: content.JobReference, Kind: content.Kind, @@ -1705,7 +1739,7 @@ func (h *jobsListHandler) Handle(ctx context.Context, r *jobsListRequest) (*bigq UserEmail: content.UserEmail, }) } - return &bigqueryv2.JobList{Jobs: jobs}, nil + return &bigqueryv2.JobList{Jobs: jobsList}, nil } func (h *jobsQueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -1900,13 +1934,17 @@ type modelsListRequest struct { } func (h *modelsListHandler) Handle(ctx context.Context, r *modelsListRequest) (*bigqueryv2.ListModelsResponse, error) { - models := []*bigqueryv2.Model{} - for _, m := range r.dataset.Models() { + response := []*bigqueryv2.Model{} + models, err := r.dataset.Models(ctx) + if err != nil { + return nil, err + } + for _, m := range models { _ = m - models = append(models, &bigqueryv2.Model{}) + response = append(response, &bigqueryv2.Model{}) } return &bigqueryv2.ListModelsResponse{ - Models: models, + Models: response, }, nil } @@ -2129,13 +2167,17 @@ type routinesListRequest struct { } func (h *routinesListHandler) Handle(ctx context.Context, r *routinesListRequest) (*bigqueryv2.ListRoutinesResponse, error) { - var routineList []*bigqueryv2.Routine - for _, routine := range r.dataset.Routines() { + var response []*bigqueryv2.Routine + routines, err := r.dataset.Routines(ctx) + if err != nil { + return nil, err + } + for _, routine := range routines { _ = routine - routineList = append(routineList, &bigqueryv2.Routine{}) + response = append(response, &bigqueryv2.Routine{}) } return &bigqueryv2.ListRoutinesResponse{ - Routines: routineList, + Routines: response, }, fmt.Errorf("unsupported bigquery.routines.list") } @@ -2549,14 +2591,16 @@ func (h *tablesGetIamPolicyHandler) Handle(ctx context.Context, r *tablesGetIamP func (h *tablesInsertHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - server := serverFromContext(ctx) - project := projectFromContext(ctx) - dataset := datasetFromContext(ctx) + var table bigqueryv2.Table if err := json.NewDecoder(r.Body).Decode(&table); err != nil { errorResponse(ctx, w, errInvalid(err.Error())) return } + + server := serverFromContext(ctx) + project := projectFromContext(ctx) + dataset := datasetFromContext(ctx) res, err := h.Handle(ctx, &tablesInsertRequest{ server: server, project: project, @@ -2686,28 +2730,32 @@ type tablesListRequest struct { } func (h *tablesListHandler) Handle(ctx context.Context, r *tablesListRequest) (*bigqueryv2.TableList, error) { - var tables []*bigqueryv2.TableListTables - for _, tableID := range r.dataset.TableIDs() { - table, err := r.dataset.Table(tableID).Content() + var response []*bigqueryv2.TableListTables + tables, err := r.dataset.Tables(ctx) + if err != nil { + return nil, fmt.Errorf("failed to find tables in dataset: %w", err) + } + for _, table := range tables { + content, err := table.Content() if err != nil { - return nil, fmt.Errorf("failed to get table metadata from %s: %w", tableID, err) + return nil, fmt.Errorf("failed to get table metadata from %s: %w", table.ID, err) } - tables = append(tables, &bigqueryv2.TableListTables{ - Clustering: table.Clustering, - CreationTime: table.CreationTime, - ExpirationTime: table.ExpirationTime, - FriendlyName: table.FriendlyName, - Id: table.Id, - Kind: table.Kind, - Labels: table.Labels, - RangePartitioning: table.RangePartitioning, - TableReference: table.TableReference, - TimePartitioning: table.TimePartitioning, - Type: table.Type, + response = append(response, &bigqueryv2.TableListTables{ + Clustering: content.Clustering, + CreationTime: content.CreationTime, + ExpirationTime: content.ExpirationTime, + FriendlyName: content.FriendlyName, + Id: content.Id, + Kind: content.Kind, + Labels: content.Labels, + RangePartitioning: content.RangePartitioning, + TableReference: content.TableReference, + TimePartitioning: content.TimePartitioning, + Type: content.Type, }) } return &bigqueryv2.TableList{ - Tables: tables, + Tables: response, TotalItems: int64(len(tables)), }, nil } diff --git a/server/loader.go b/server/loader.go index 633076e..44a0c90 100644 --- a/server/loader.go +++ b/server/loader.go @@ -34,26 +34,17 @@ func (s *Server) addProject(ctx context.Context, project *types.Project) error { } } } - p := s.metaRepo.ProjectFromData(project) - found, err := s.metaRepo.FindProjectWithConn(ctx, tx.Tx(), p.ID) - if err != nil { + p, _, _ := s.metaRepo.ProjectFromData(project) + if err := s.metaRepo.AddProjectIfNotExists(ctx, tx.Tx(), p); err != nil { return err } - if found != nil { - if err := s.metaRepo.UpdateProject(ctx, tx.Tx(), p); err != nil { - return err - } - } else { - if err := s.metaRepo.AddProjectIfNotExists(ctx, tx.Tx(), p); err != nil { - return err - } - } - for _, dataset := range p.Datasets() { - if err := dataset.Insert(ctx, tx.Tx()); err != nil { + for _, d := range project.Datasets { + dataset, tables, _, _ := s.metaRepo.DatasetFromData(p.ID, d) + if err := s.metaRepo.AddDataset(ctx, tx.Tx(), dataset); err != nil { return err } - for _, table := range dataset.Tables() { - if err := table.Insert(ctx, tx.Tx()); err != nil { + for _, table := range tables { + if err := s.metaRepo.AddTable(ctx, tx.Tx(), table); err != nil { return err } } diff --git a/server/middleware.go b/server/middleware.go index fa5fe29..8e2f29d 100644 --- a/server/middleware.go +++ b/server/middleware.go @@ -6,6 +6,7 @@ import ( "net/http" "runtime" "sync" + "time" "github.com/gorilla/mux" "go.uber.org/zap" @@ -64,7 +65,11 @@ func accessLogMiddleware() func(http.Handler) http.Handler { fmt.Sprintf("%s %s", r.Method, r.URL.Path), zap.String("query", r.URL.RawQuery), ) + start := time.Now() next.ServeHTTP(w, r) + logger.Logger(r.Context()).Info( + fmt.Sprintf("%s %s took %v", r.Method, r.URL.Path, time.Since(start)), + ) }) } } @@ -195,7 +200,11 @@ func withDatasetMiddleware() func(http.Handler) http.Handler { datasetID, exists := datasetIDFromParams(params) if exists { project := projectFromContext(ctx) - dataset := project.Dataset(datasetID) + dataset, err := project.Dataset(ctx, datasetID) + if err != nil { + errorResponse(ctx, w, errNotFound(fmt.Sprintf("%s", err))) + return + } if dataset == nil { errorResponse(ctx, w, errNotFound(fmt.Sprintf("dataset %s is not found", datasetID))) return @@ -218,7 +227,11 @@ func withJobMiddleware() func(http.Handler) http.Handler { jobID, exists := jobIDFromParams(params) if exists { project := projectFromContext(ctx) - job := project.Job(jobID) + job, err := project.Job(ctx, jobID) + if err != nil { + errorResponse(ctx, w, errInternalError(fmt.Sprintf("error finding job %s: %s", jobID, err))) + return + } if job == nil { errorResponse(ctx, w, errNotFound(fmt.Sprintf("job %s is not found", jobID))) return @@ -240,8 +253,14 @@ func withTableMiddleware() func(http.Handler) http.Handler { params := mux.Vars(r) tableID, exists := tableIDFromParams(params) if exists { + project := projectFromContext(ctx) dataset := datasetFromContext(ctx) - table := dataset.Table(tableID) + server := serverFromContext(ctx) + table, err := server.metaRepo.FindTable(ctx, project.ID, dataset.ID, tableID) + if err != nil { + errorResponse(ctx, w, errInternalError(fmt.Sprintf("could not fetch table %s: %s", tableID, err))) + return + } if table == nil { errorResponse(ctx, w, errNotFound(fmt.Sprintf("table %s is not found", tableID))) return @@ -264,7 +283,11 @@ func withModelMiddleware() func(http.Handler) http.Handler { modelID, exists := modelIDFromParams(params) if exists { dataset := datasetFromContext(ctx) - model := dataset.Model(modelID) + model, err := dataset.Model(ctx, modelID) + if err != nil { + errorResponse(ctx, w, errInternalError(fmt.Sprintf("failed to find model: %s", err))) + return + } if model == nil { errorResponse(ctx, w, errNotFound(fmt.Sprintf("model %s is not found", modelID))) return @@ -287,7 +310,11 @@ func withRoutineMiddleware() func(http.Handler) http.Handler { routineID, exists := routineIDFromParams(params) if exists { dataset := datasetFromContext(ctx) - routine := dataset.Routine(routineID) + routine, err := dataset.Routine(ctx, routineID) + if err != nil { + errorResponse(ctx, w, errInternalError(fmt.Sprintf("failed to find routine: %s", err))) + return + } if routine == nil { errorResponse(ctx, w, errNotFound(fmt.Sprintf("routine %s is not found", routineID))) return diff --git a/server/server.go b/server/server.go index 5194902..2ea9af4 100644 --- a/server/server.go +++ b/server/server.go @@ -48,6 +48,8 @@ func New(storage Storage) (*Server, error) { } } db, err := sql.Open("zetasqlite", string(storage)) + db.SetConnMaxIdleTime(-1) + db.SetConnMaxLifetime(1<<63 - 1) if err != nil { return nil, err } @@ -129,7 +131,7 @@ func (s *Server) SetProject(id string) error { if err := s.metaRepo.AddProjectIfNotExists( ctx, tx.Tx(), - metadata.NewProject(s.metaRepo, id, nil, nil), + metadata.NewProject(s.metaRepo, id), ); err != nil { return err } diff --git a/server/storage_handler.go b/server/storage_handler.go index 9fa9458..cf8e1b7 100644 --- a/server/storage_handler.go +++ b/server/storage_handler.go @@ -805,11 +805,17 @@ func getTableMetadata(ctx context.Context, server *Server, projectID, datasetID, if err != nil { return nil, err } - dataset := project.Dataset(datasetID) + dataset, err := project.Dataset(ctx, datasetID) + if err != nil { + return nil, err + } if dataset == nil { return nil, fmt.Errorf("dataset %s is not found in project %s", datasetID, projectID) } - table := dataset.Table(tableID) + table, err := dataset.Table(ctx, tableID) + if err != nil { + return nil, fmt.Errorf("failed to find table %s: %w", tableID, err) + } if table == nil { return nil, fmt.Errorf("table %s is not found in dataset %s", tableID, datasetID) }