diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index ecd7e0373..288ed9327 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -18,6 +18,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/a3m" "github.com/artefactual-sdps/enduro/internal/am" + "github.com/artefactual-sdps/enduro/internal/config" "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" "github.com/artefactual-sdps/enduro/internal/temporal" @@ -26,18 +27,23 @@ import ( ) type ProcessingWorkflow struct { - logger logr.Logger - pkgsvc package_.Service - wsvc watcher.Service - taskQueue string + logger logr.Logger + cfg config.Configuration + pkgsvc package_.Service + wsvc watcher.Service } -func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, taskQueue string) *ProcessingWorkflow { +func NewProcessingWorkflow( + logger logr.Logger, + cfg config.Configuration, + pkgsvc package_.Service, + wsvc watcher.Service, +) *ProcessingWorkflow { return &ProcessingWorkflow{ - logger: logger, - pkgsvc: pkgsvc, - wsvc: wsvc, - taskQueue: taskQueue, + logger: logger, + cfg: cfg, + pkgsvc: pkgsvc, + wsvc: wsvc, } } @@ -176,7 +182,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, - TaskQueue: w.taskQueue, + TaskQueue: w.cfg.Preservation.TaskQueue, }) for attempt := 1; attempt <= maxAttempts; attempt++ { sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{ @@ -317,8 +323,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // For the a3m workflow bundle the transfer to a directory shared with // the a3m container. var transferDir string - if w.taskQueue == temporal.A3mWorkerTaskQueue { - transferDir = "/home/a3m/.local/share/a3m/share" + if w.cfg.Preservation.TaskQueue == temporal.A3mWorkerTaskQueue { + transferDir = w.cfg.A3m.ShareDir } activityOpts := withActivityOptsForLongLivedRequest(sessCtx) @@ -356,7 +362,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Do preservation activities. { var err error - if w.taskQueue == temporal.AmWorkerTaskQueue { + if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue { err = w.transferAM(sessCtx, tinfo) } else { err = w.transferA3m(sessCtx, tinfo) @@ -379,7 +385,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context } // Stop here for the Archivematica workflow. - if w.taskQueue == temporal.AmWorkerTaskQueue { + if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue { return nil } diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 5699cf35b..4e7f373b9 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -18,8 +18,10 @@ import ( "github.com/artefactual-sdps/enduro/internal/a3m" "github.com/artefactual-sdps/enduro/internal/am" + "github.com/artefactual-sdps/enduro/internal/config" "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + "github.com/artefactual-sdps/enduro/internal/pres" sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" "github.com/artefactual-sdps/enduro/internal/temporal" watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake" @@ -32,6 +34,9 @@ type ProcessingWorkflowTestSuite struct { env *temporalsdk_testsuite.TestWorkflowEnvironment + // Each test creates it's own temporary transfer directory. + transferDir string + // Each test registers the workflow with a different name to avoid dups. workflow *ProcessingWorkflow } @@ -47,10 +52,15 @@ func TestTransferInfo_Name(t *testing.T) { func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { s.env = s.NewTestWorkflowEnvironment() s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true}) + s.transferDir = s.T().TempDir() clock := clockwork.NewFakeClock() ctrl := gomock.NewController(s.T()) logger := logr.Discard() + cfg := config.Configuration{ + Preservation: pres.Config{TaskQueue: taskQueue}, + A3m: a3m.Config{ShareDir: s.transferDir}, + } pkgsvc := packagefake.NewMockService(ctrl) wsvc := watcherfake.NewMockService(ctrl) sftpc := sftp_fake.NewMockClient(ctrl) @@ -87,7 +97,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { s.env.RegisterActivityWithOptions( am.NewPollTransferActivity( logger, - &am.Config{}, + &cfg.AM, clock, amclienttest.NewMockTransferService(ctrl), amclienttest.NewMockJobsService(ctrl), @@ -98,7 +108,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { s.env.RegisterActivityWithOptions( am.NewPollIngestActivity( logger, - &am.Config{}, + &cfg.AM, clock, amclienttest.NewMockIngestService(ctrl), amclienttest.NewMockJobsService(ctrl), @@ -107,7 +117,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName}, ) - s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue) + s.workflow = NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -154,7 +164,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { s.env.OnActivity(activities.BundleActivityName, sessionCtx, &activities.BundleActivityParams{ WatcherName: watcherName, - TransferDir: "/home/a3m/.local/share/a3m/share", + TransferDir: s.transferDir, Key: key, TempFile: "/tmp/enduro123456/" + key, }, @@ -229,7 +239,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { s.env.OnActivity(activities.BundleActivityName, sessionCtx, &activities.BundleActivityParams{ WatcherName: watcherName, - TransferDir: "/home/a3m/.local/share/a3m/share", + TransferDir: s.transferDir, Key: key, TempFile: "/tmp/enduro123456/" + key, }, @@ -412,7 +422,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { s.env.OnActivity(activities.BundleActivityName, sessionCtx, &activities.BundleActivityParams{ WatcherName: watcherName, - TransferDir: "/home/a3m/.local/share/a3m/share", + TransferDir: s.transferDir, Key: key, TempFile: "/tmp/enduro123456/" + key, }, diff --git a/main.go b/main.go index ccd641de9..a1fe346d8 100644 --- a/main.go +++ b/main.go @@ -333,7 +333,7 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})