Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13398][Playground] Split LifeCycle to DTO and business logic #16374

Merged
merged 7 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions playground/backend/internal/code_processing/code_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl

go cancelCheck(pipelineLifeCycleCtx, pipelineId, cancelChannel, cacheService)

executorBuilder, err := builder.SetupExecutorBuilder(lc, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
executorBuilder, err := builder.SetupExecutorBuilder(lc.Dto, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
if err != nil {
_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
return
Expand Down Expand Up @@ -122,7 +122,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
validateIsUnitTest, _ := validationResults.Load(validators.UnitTestValidatorName)
isUnitTest := validateIsUnitTest.(bool)

// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
if err := processCompileSuccess(pipelineLifeCycleCtx, []byte(""), pipelineId, cacheService); err != nil {
return
Expand All @@ -131,7 +131,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
// Compile
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
executor = executorBuilder.WithCompiler().
WithFileName(builder.GetFileNameFromFolder(lc.GetAbsoluteSourceFolderPath())).Build() // Need changed name for unit tests
WithFileName(builder.GetFileNameFromFolder(lc.Dto.GetAbsoluteSourceFileFolderPath())).Build() // Need changed name for unit tests
}
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(pipelineLifeCycleCtx)
Expand All @@ -155,7 +155,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl

// Run
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, pipelineLifeCycleCtx, executorBuilder, appEnv.WorkingDir())
executor, err = setJavaExecutableFile(lc.Dto, pipelineId, cacheService, pipelineLifeCycleCtx, executorBuilder, appEnv.WorkingDir())
if err != nil {
return
}
Expand All @@ -164,11 +164,11 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
runCmd := getExecuteCmd(&validationResults, &executor, pipelineLifeCycleCtx)
var runError bytes.Buffer
runOutput := streaming.RunOutputWriter{Ctx: pipelineLifeCycleCtx, CacheService: cacheService, PipelineId: pipelineId}
go readLogFile(pipelineLifeCycleCtx, ctx, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
go readLogFile(pipelineLifeCycleCtx, ctx, cacheService, lc.Dto.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)

if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For go SDK all logs are placed to stdErr.
file, err := os.Create(lc.GetAbsoluteLogFilePath())
file, err := os.Create(lc.Dto.GetAbsoluteLogFilePath())
if err != nil {
// If some error with creating a log file do the same as with other SDK.
logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
Expand All @@ -194,7 +194,7 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
// Run step is finished, but code contains some error (divide by 0 for example)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For Go SDK stdErr was redirected to the log file.
errData, err := os.ReadFile(lc.GetAbsoluteLogFilePath())
errData, err := os.ReadFile(lc.Dto.GetAbsoluteLogFilePath())
if err != nil {
logger.Errorf("%s: error during read errors from log file (go sdk): %s", pipelineId, err.Error())
}
Expand All @@ -219,8 +219,8 @@ func getExecuteCmd(valRes *sync.Map, executor *executors.Executor, ctxWithTimeou
}

// setJavaExecutableFile sets executable file name to runner (JAVA class name is known after compilation step)
func setJavaExecutableFile(lc *fs_tool.LifeCycle, id uuid.UUID, service cache.Cache, ctx context.Context, executorBuilder *executors.ExecutorBuilder, dir string) (executors.Executor, error) {
className, err := lc.ExecutableName(id, dir)
func setJavaExecutableFile(lcDto fs_tool.LifeCycleDTO, id uuid.UUID, service cache.Cache, ctx context.Context, executorBuilder *executors.ExecutorBuilder, dir string) (executors.Executor, error) {
className, err := lcDto.ExecutableName(id, dir)
if err != nil {
if err = processSetupError(err, id, service, ctx); err != nil {
return executorBuilder.Build(), err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func Test_Process(t *testing.T) {
t.Fatalf("error during prepare folders: %s", err.Error())
}
if tt.createExecFile {
_, _ = lc.CreateSourceCodeFile(tt.code)
_ = lc.CreateSourceCodeFile(tt.code)
}
if err = utils.SetToCache(tt.args.ctx, cacheService, tt.args.pipelineId, cache.Canceled, false); err != nil {
t.Fatal("error during set cancel flag to cache")
Expand All @@ -269,7 +269,7 @@ func Test_Process(t *testing.T) {

compileOutput, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.CompileOutput)
if tt.expectedCompileOutput != nil && strings.Contains(tt.expectedCompileOutput.(string), "%s") {
tt.expectedCompileOutput = fmt.Sprintf(tt.expectedCompileOutput.(string), lc.GetAbsoluteSourceFilePath())
tt.expectedCompileOutput = fmt.Sprintf(tt.expectedCompileOutput.(string), lc.Dto.GetAbsoluteSourceFilePath())
}
if !reflect.DeepEqual(compileOutput, tt.expectedCompileOutput) {
t.Errorf("processCode() set compileOutput: %s, but expectes: %s", compileOutput, tt.expectedCompileOutput)
Expand Down Expand Up @@ -535,7 +535,7 @@ func TestGetLastIndex(t *testing.T) {
func Test_setJavaExecutableFile(t *testing.T) {
pipelineId := uuid.New()
lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_JAVA, pipelineId, os.Getenv("APP_WORK_DIR"))
lc.ExecutableName = fakeExecutableName
lc.Dto.ExecutableName = fakeExecutableName
executorBuilder := executors.NewExecutorBuilder().WithRunner().WithCommand("fake cmd").ExecutorBuilder
type args struct {
lc *fs_tool.LifeCycle
Expand Down Expand Up @@ -572,7 +572,7 @@ func Test_setJavaExecutableFile(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := setJavaExecutableFile(tt.args.lc, tt.args.id, tt.args.service, tt.args.ctx, tt.args.executorBuilder, tt.args.dir)
got, err := setJavaExecutableFile(tt.args.lc.Dto, tt.args.id, tt.args.service, tt.args.ctx, tt.args.executorBuilder, tt.args.dir)
if (err != nil) != tt.wantErr {
t.Errorf("setJavaExecutableFile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down Expand Up @@ -692,7 +692,7 @@ func prepareFiles(b *testing.B, pipelineId uuid.UUID, code string, sdk pb.Sdk) *
if err != nil {
b.Fatalf("error during prepare folders: %s", err.Error())
}
_, err = lc.CreateSourceCodeFile(code)
err = lc.CreateSourceCodeFile(code)
if err != nil {
b.Fatalf("error during prepare source code file: %s", err.Error())
}
Expand Down
109 changes: 62 additions & 47 deletions playground/backend/internal/fs_tool/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,65 @@ type Extension struct {
ExecutableFileExtension string
}

// LifeCycle is used for preparing folders and files to process code for one request.
// For each SDK folders (Folder) and extensions (Extension) should be set correctly.
type LifeCycle struct {
folderGlobs []string //folders that should be created to process code
// LifeCycleDTO contains methods to receive files/folders paths
type LifeCycleDTO struct {
PipelineId uuid.UUID
Folder Folder
Extension Extension
ExecutableName func(uuid.UUID, string) (string, error)
pipelineId uuid.UUID
}

// GetSourceFileName returns name of the source file like {pipelineId}.{sourceFileExtension}.
func (l *LifeCycleDTO) GetSourceFileName() string {
return l.PipelineId.String() + l.Extension.SourceFileExtension
}

// GetAbsoluteSourceFileFolderPath returns absolute filepath to source folder (/path/to/workingDir/executable_files/{pipelineId}/src).
func (l *LifeCycleDTO) GetAbsoluteSourceFileFolderPath() string {
absolutePath, _ := filepath.Abs(l.Folder.SourceFileFolder)
return absolutePath
}

// GetAbsoluteSourceFilePath returns absolute filepath to source file (/path/to/workingDir/executable_files/{pipelineId}/src/{pipelineId}.{sourceFileExtension}).
func (l *LifeCycleDTO) GetAbsoluteSourceFilePath() string {
absolutePath, _ := filepath.Abs(filepath.Join(l.GetAbsoluteSourceFileFolderPath(), l.GetSourceFileName()))
return absolutePath
}

// GetExecutableFileName returns name of the executable file like {pipelineId}.{executableFileExtension}.
func (l *LifeCycleDTO) GetExecutableFileName() string {
return l.PipelineId.String() + l.Extension.ExecutableFileExtension
}

// GetAbsoluteExecutableFileFolderPath returns absolute filepath to executable folder (/path/to/workingDir/executable_files/{pipelineId}/bin).
func (l *LifeCycleDTO) GetAbsoluteExecutableFileFolderPath() string {
absolutePath, _ := filepath.Abs(l.Folder.ExecutableFileFolder)
return absolutePath
}

// GetAbsoluteExecutableFilePath returns absolute filepath to executable file (/path/to/workingDir/executable_files/{pipelineId}/bin/{pipelineId}.{executableFileExtension}).
func (l *LifeCycleDTO) GetAbsoluteExecutableFilePath() string {
absolutePath, _ := filepath.Abs(filepath.Join(l.GetAbsoluteExecutableFileFolderPath(), l.GetExecutableFileName()))
return absolutePath
}

// GetAbsoluteBaseFolderPath returns absolute path to executable folder (/path/to/workingDir/executable_files/{pipelineId}).
func (l *LifeCycleDTO) GetAbsoluteBaseFolderPath() string {
absolutePath, _ := filepath.Abs(l.Folder.BaseFolder)
return absolutePath
}

// GetAbsoluteLogFilePath returns absolute path to the logs file (/path/to/workingDir/executable_files/{pipelineId}/logs.log)
func (l *LifeCycleDTO) GetAbsoluteLogFilePath() string {
filePath := filepath.Join(l.GetAbsoluteBaseFolderPath(), logFileName)
absoluteFilePath, _ := filepath.Abs(filePath)
return absoluteFilePath
}

// LifeCycle is used for preparing folders and files to process code for one code processing request.
type LifeCycle struct {
folderGlobs []string //folders that should be created to process code
Dto LifeCycleDTO
}

// NewLifeCycle returns a corresponding LifeCycle depending on the given SDK.
Expand Down Expand Up @@ -93,26 +144,17 @@ func (l *LifeCycle) DeleteFolders() error {
}

// CreateSourceCodeFile creates an executable file (i.e. file.{sourceFileExtension}).
func (l *LifeCycle) CreateSourceCodeFile(code string) (string, error) {
if _, err := os.Stat(l.Folder.SourceFileFolder); os.IsNotExist(err) {
return "", err
func (l *LifeCycle) CreateSourceCodeFile(code string) error {
if _, err := os.Stat(l.Dto.GetAbsoluteSourceFileFolderPath()); os.IsNotExist(err) {
return err
}

fileName := l.pipelineId.String() + l.Extension.SourceFileExtension
filePath := filepath.Join(l.Folder.SourceFileFolder, fileName)
filePath := l.Dto.GetAbsoluteSourceFilePath()
err := os.WriteFile(filePath, []byte(code), fileMode)
if err != nil {
return "", err
return err
}
return fileName, nil
}

// GetAbsoluteSourceFilePath returns absolute filepath to executable file (/path/to/workingDir/executable_files/{pipelineId}/src/{pipelineId}.{sourceFileExtension}).
func (l *LifeCycle) GetAbsoluteSourceFilePath() string {
fileName := l.pipelineId.String() + l.Extension.SourceFileExtension
filePath := filepath.Join(l.Folder.SourceFileFolder, fileName)
absoluteFilePath, _ := filepath.Abs(filePath)
return absoluteFilePath
return nil
}

// CopyFile copies a file with fileName from sourceDir to destinationDir.
Expand Down Expand Up @@ -145,30 +187,3 @@ func (l *LifeCycle) CopyFile(fileName, sourceDir, destinationDir string) error {
}
return nil
}

// GetAbsoluteExecutableFilePath returns absolute filepath to compiled file (/path/to/workingDir/executable_files/{pipelineId}/bin/{pipelineId}.{executableExtension}).
func (l *LifeCycle) GetAbsoluteExecutableFilePath() string {
fileName := l.pipelineId.String() + l.Extension.ExecutableFileExtension
filePath := filepath.Join(l.Folder.ExecutableFileFolder, fileName)
absoluteFilePath, _ := filepath.Abs(filePath)
return absoluteFilePath
}

// GetAbsoluteBaseFolderPath returns absolute path to executable folder (/path/to/workingDir/executable_files/{pipelineId}).
func (l *LifeCycle) GetAbsoluteBaseFolderPath() string {
absoluteFilePath, _ := filepath.Abs(l.Folder.BaseFolder)
return absoluteFilePath
}

// GetAbsoluteLogFilePath returns absolute path to the logs file (/path/to/workingDir/executable_files/{pipelineId}/logs.log)
func (l *LifeCycle) GetAbsoluteLogFilePath() string {
filePath := filepath.Join(l.Folder.BaseFolder, logFileName)
absoluteFilePath, _ := filepath.Abs(filePath)
return absoluteFilePath
}

// GetAbsoluteSourceFolderPath returns absolute path to executable folder (/path/to/workingDir/executable_files/{pipelineId}/src).
func (l *LifeCycle) GetAbsoluteSourceFolderPath() string {
absoluteFilePath, _ := filepath.Abs(l.Folder.SourceFileFolder)
return absoluteFilePath
}
Loading