Skip to content

Commit

Permalink
Merge pull request apache#16031 from [BEAM-13296][Playground][Bugfix]…
Browse files Browse the repository at this point in the history
… remove redundant code from controller

* [Playground][BEAM-12941][Bugfix] Fix workflows for playground applications (#83)

* Update workflows for playground

* Attempt to fix tests

* Remove continue on error to catch errors

* Fix linter problem for backend dockerfile

* Update folder to run backend go linter

* Moved flutter test to execution via gradle tasks

* Revert "[Playground][BEAM-12941][Bugfix] Fix workflows for playground applications (#83)" (#88)

This reverts commit b73f5f7.

* Remove redundant methods from controller, move precompiled objects method to separate package

* fix RAT

Co-authored-by: Sergey Kalinin <91209855+snkalinin@users.noreply.github.com>
Co-authored-by: Ilya <ilya.kozyrev@akvelon.com>
Co-authored-by: Aydar Zainutdinov <aydar.zaynutdinov@akvelon.com>
Co-authored-by: Pavel Avilov <pavel.avilov@akvelon.com>
  • Loading branch information
5 people committed Nov 23, 2021
1 parent 27d4ad0 commit 8ad47d8
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 331 deletions.
335 changes: 4 additions & 331 deletions playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ func (controller *playgroundController) GetPrecompiledObjects(ctx context.Contex
bucket := cloud_bucket.New()
sdkToCategories, err := bucket.GetPrecompiledObjects(ctx, info.Sdk, info.Category)
if err != nil {
logger.Errorf("%s: GetPrecompiledObjects(): cloud storage error: %s", err.Error())
logger.Errorf("GetPrecompiledObjects(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("GetPrecompiledObjects(): ", err.Error())
}
response := pb.GetPrecompiledObjectsResponse{SdkCategories: make([]*pb.Categories, 0)}
for sdkName, categories := range *sdkToCategories {
sdkCategory := pb.Categories{Sdk: pb.Sdk(pb.Sdk_value[sdkName]), Categories: make([]*pb.Categories_Category, 0)}
for categoryName, precompiledObjects := range categories {
PutPrecompiledObjectsToCategory(categoryName, &precompiledObjects, &sdkCategory)
utils.PutPrecompiledObjectsToCategory(categoryName, &precompiledObjects, &sdkCategory)
}
response.SdkCategories = append(response.SdkCategories, &sdkCategory)
}
Expand All @@ -179,7 +179,7 @@ func (controller *playgroundController) GetPrecompiledObjectCode(ctx context.Con
cd := cloud_bucket.New()
codeString, err := cd.GetPrecompiledObject(ctx, info.GetCloudPath())
if err != nil {
logger.Errorf("%s: GetPrecompiledObject(): cloud storage error: %s", err.Error())
logger.Errorf("GetPrecompiledObject(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("GetPrecompiledObjects(): ", err.Error())
}
response := pb.GetPrecompiledObjectCodeResponse{Code: *codeString}
Expand All @@ -191,336 +191,9 @@ func (controller *playgroundController) GetPrecompiledObjectOutput(ctx context.C
cd := cloud_bucket.New()
output, err := cd.GetPrecompiledObjectOutput(ctx, info.GetCloudPath())
if err != nil {
logger.Errorf("%s: GetPrecompiledObjectOutput(): cloud storage error: %s", err.Error())
logger.Errorf("GetPrecompiledObjectOutput(): cloud storage error: %s", err.Error())
return nil, errors.InternalError("GetPrecompiledObjectOutput(): ", err.Error())
}
response := pb.GetRunOutputResponse{Output: *output}
return &response, nil
}

// PutPrecompiledObjectsToCategory adds categories with precompiled objects to protobuf object
func PutPrecompiledObjectsToCategory(categoryName string, precompiledObjects *cloud_bucket.PrecompiledObjects, sdkCategory *pb.Categories) {
category := pb.Categories_Category{
CategoryName: categoryName,
PrecompiledObjects: make([]*pb.PrecompiledObject, 0),
}
for _, object := range *precompiledObjects {
category.PrecompiledObjects = append(category.PrecompiledObjects, &pb.PrecompiledObject{
CloudPath: object.CloudPath,
Name: object.Name,
Description: object.Description,
Type: object.Type,
})
}
sdkCategory.Categories = append(sdkCategory.Categories, &category)
}

// setupLifeCycle creates fs_tool.LifeCycle and prepares files and folders needed to code processing
func setupLifeCycle(sdk pb.Sdk, code string, pipelineId uuid.UUID, workingDir string) (*fs_tool.LifeCycle, error) {
// create file system service
lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, workingDir)
if err != nil {
logger.Errorf("%s: RunCode(): NewLifeCycle(): %s\n", pipelineId, err.Error())
return nil, err
}

// create folders
err = lc.CreateFolders()
if err != nil {
logger.Errorf("%s: RunCode(): CreateFolders(): %s\n", pipelineId, err.Error())
return nil, err
}

// create file with code
_, err = lc.CreateExecutableFile(code)
if err != nil {
logger.Errorf("%s: RunCode(): CreateExecutableFile(): %s\n", pipelineId, err.Error())
return nil, err
}
return lc, nil
}

// setupCompileBuilder returns executors.CompileBuilder with validators and compiler based on sdk
func setupCompileBuilder(lc *fs_tool.LifeCycle, sdk pb.Sdk, executorConfig *environment.ExecutorConfig) *executors.CompileBuilder {
filePath := lc.GetAbsoluteExecutableFilePath()
val := setupValidators(sdk, filePath)

compileBuilder := executors.NewExecutorBuilder().
WithValidator().
WithSdkValidators(val).
WithCompiler()

switch sdk {
case pb.Sdk_SDK_JAVA:
workingDir := lc.GetAbsoluteExecutableFilesFolderPath()

compileBuilder = compileBuilder.
WithCommand(executorConfig.CompileCmd).
WithArgs(executorConfig.CompileArgs).
WithFileName(filePath).
WithWorkingDir(workingDir)
}
return compileBuilder
}

// setupRunBuilder returns executors.RunBuilder based on sdk
func setupRunBuilder(pipelineId uuid.UUID, lc *fs_tool.LifeCycle, sdk pb.Sdk, env *environment.Environment, compileBuilder *executors.CompileBuilder) (*executors.RunBuilder, error) {
runBuilder := compileBuilder.
WithRunner().
WithCommand(env.BeamSdkEnvs.ExecutorConfig.RunCmd).
WithArgs(env.BeamSdkEnvs.ExecutorConfig.RunArgs).
WithWorkingDir(lc.GetAbsoluteExecutableFilesFolderPath())

switch sdk {
case pb.Sdk_SDK_JAVA:
className, err := lc.ExecutableName(pipelineId, env.ApplicationEnvs.WorkingDir())
if err != nil {
logger.Errorf("%s: get executable file name: %s\n", pipelineId, err.Error())
return nil, err
}

runBuilder = runBuilder.
WithClassName(className)
}
return runBuilder, nil
}

// setupValidators returns slice of validators.Validator based on sdk
func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
var val *[]validators.Validator
switch sdk {
case pb.Sdk_SDK_JAVA:
val = validators.GetJavaValidators(filepath)
}
return val
}

// processCode validates, compiles and runs code by pipelineId.
// During each operation updates status of execution and saves it into cache:
// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
// - In case of code processing has been canceled saves playground.Status_STATUS_CANCELED as cache.Status into cache.
// - In case of validation step is failed saves playground.Status_STATUS_VALIDATION_ERROR as cache.Status into cache.
// - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
// - In case of compile step is completed with no errors saves compile output as cache.CompileOutput into cache.
// - In case of run step is failed saves playground.Status_STATUS_RUN_ERROR as cache.Status and run logs as cache.RunError into cache.
// - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
// At the end of this method deletes all created folders.
func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
ctxWithTimeout, finishCtxFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
defer func(lc *fs_tool.LifeCycle) {
finishCtxFunc()
cleanUp(pipelineId, lc)
}(lc)

errorChannel := make(chan error, 1)
dataChannel := make(chan interface{}, 1)
successChannel := make(chan bool, 1)
cancelChannel := make(chan bool, 1)

go cancelCheck(ctxWithTimeout, pipelineId, cancelChannel, cacheService)

// build executor for validate and compile steps
exec := compileBuilder.Build()

// validate
logger.Infof("%s: Validate() ...\n", pipelineId)
validateFunc := exec.Validate()
go validateFunc(successChannel, errorChannel)

if !processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, errorChannel, pb.Status_STATUS_VALIDATION_ERROR, pb.Status_STATUS_PREPARING) {
return
}

// prepare
logger.Infof("%s: Prepare() ...\n", pipelineId)
prepareFunc := exec.Prepare()
go prepareFunc(successChannel, errorChannel)

if !processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, errorChannel, pb.Status_STATUS_PREPARATION_ERROR, pb.Status_STATUS_COMPILING) {
return
}

// compile
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := exec.Compile(ctxWithTimeout)
go func(successCh chan bool, errCh chan error, dataCh chan interface{}) {
// TODO separate stderr from stdout
data, err := compileCmd.CombinedOutput()
dataCh <- data
if err != nil {
errCh <- err
successCh <- false
} else {
successCh <- true
}
}(successChannel, errorChannel, dataChannel)

if !processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, dataChannel, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING) {
return
}

runBuilder, err := setupRunBuilder(pipelineId, lc, sdk, env, compileBuilder)
if err != nil {
logger.Errorf("%s: error during setup runBuilder: %s\n", pipelineId, err.Error())
setToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
return
}

// build executor for run step
exec = runBuilder.Build()

// run
logger.Infof("%s: Run() ...\n", pipelineId)
runCmd := exec.Run(ctxWithTimeout)
go func(successCh chan bool, errCh chan error, dataCh chan interface{}) {
// TODO separate stderr from stdout
data, err := runCmd.CombinedOutput()
dataCh <- data
if err != nil {
errCh <- err
successChannel <- false
} else {
successChannel <- true
}
}(successChannel, errorChannel, dataChannel)

processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
}

// processStep processes each executor's step with cancel and timeout checks.
// If finishes by canceling, timeout or error - returns false.
// If finishes successfully returns true.
func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool, dataChannel chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
select {
case <-ctx.Done():
finishByTimeout(ctx, pipelineId, cacheService)
return false
case <-cancelChannel:
processCancel(ctx, cacheService, pipelineId)
return false
case ok := <-successChannel:
var data []byte = nil
if dataChannel != nil {
temp := <-dataChannel
data = temp.([]byte)
}
if !ok {
err := <-errorChannel
processError(ctx, err, data, pipelineId, cacheService, errorCaseStatus)
return false
}
processSuccess(ctx, data, pipelineId, cacheService, successCaseStatus)
}
return true
}

// finishByTimeout is used in case of runCode method finished by timeout
func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)

// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
}

// cancelCheck checks cancel flag for code processing.
// If cancel flag doesn't exist in cache continue working.
// If context is done it means that code processing was finished (successfully/with error/timeout). Return.
// If cancel flag exists, and it is true it means that code processing was canceled. Set true to cancelChannel and return.
func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan bool, cacheService cache.Cache) {
ticker := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case _ = <-ticker.C:
cancel, err := cacheService.GetValue(ctx, pipelineId, cache.Canceled)
if err != nil {
continue
}
if cancel.(bool) {
cancelChannel <- true
}
return
}
}
}

// cleanUp removes all prepared folders for received LifeCycle
func cleanUp(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
logger.Infof("%s: DeleteFolders() ...\n", pipelineId)
if err := lc.DeleteFolders(); err != nil {
logger.Error("%s: DeleteFolders(): %s\n", pipelineId, err.Error())
}
logger.Infof("%s: DeleteFolders() complete\n", pipelineId)
logger.Infof("%s: complete\n", pipelineId)
}

// processError processes error received during processing code via setting a corresponding status and output to cache
func processError(ctx context.Context, err error, data []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
switch status {
case pb.Status_STATUS_VALIDATION_ERROR:
logger.Errorf("%s: Validate(): %s\n", pipelineId, err.Error())

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATION_ERROR)
case pb.Status_STATUS_PREPARATION_ERROR:
logger.Errorf("%s: Prepare(): %s\n", pipelineId, err.Error())

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_PREPARATION_ERROR)
case pb.Status_STATUS_COMPILE_ERROR:
logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId, err.Error(), data)

setToCache(ctx, cacheService, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(data))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
case pb.Status_STATUS_RUN_ERROR:
logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), data)

setToCache(ctx, cacheService, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(data))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
}
}

// processSuccess processes case after successful code processing via setting a corresponding status and output to cache
func processSuccess(ctx context.Context, output []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
switch status {
case pb.Status_STATUS_PREPARING:
logger.Infof("%s: Validate() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_PREPARING)
case pb.Status_STATUS_COMPILING:
logger.Infof("%s: Prepare() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILING)
case pb.Status_STATUS_EXECUTING:
logger.Infof("%s: Compile() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.CompileOutput, string(output))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
case pb.Status_STATUS_FINISHED:
logger.Infof("%s: Run() finish\n", pipelineId)

setToCache(ctx, cacheService, pipelineId, cache.RunOutput, string(output))

setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
}
}

// processCancel process case when code processing was canceled
func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId uuid.UUID) {
logger.Infof("%s: was canceled\n", pipelineId)

// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_CANCELED
setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
}

// setToCache puts value to cache by key and subKey
func setToCache(ctx context.Context, cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, value interface{}) error {
err := cacheService.SetValue(ctx, key, subKey, value)
if err != nil {
logger.Errorf("%s: cache.SetValue: %s\n", key, err.Error())
}
return err
}
Loading

0 comments on commit 8ad47d8

Please sign in to comment.