Skip to content
Merged
1 change: 1 addition & 0 deletions playground/backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ default value and there is no need to set them up to launch locally:
- `SDK_CONFIG` - is the sdk configuration file path, e.g. default example for corresponding sdk. It will be saved to cloud datastore during application startup (default value = `../sdks.yaml`)
- `DATASTORE_EMULATOR_HOST` - is the datastore emulator address. If it is given in the environment, the application will connect to the datastore emulator.
- `PROPERTY_PATH` - is the application properties path (default value = `.`)
- `CACHE_REQUEST_TIMEOUT` - is the timeout to request data from cache (default value = `5 sec`)

### Application properties

Expand Down
6 changes: 3 additions & 3 deletions playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (controller *playgroundController) Cancel(ctx context.Context, info *pb.Can
// - If there is no catalog in the cache, gets the catalog from the Datastore and saves it to the cache
// - If SDK or category is specified in the request, gets the catalog from the cache and filters it by SDK and category
func (controller *playgroundController) GetPrecompiledObjects(ctx context.Context, info *pb.GetPrecompiledObjectsRequest) (*pb.GetPrecompiledObjectsResponse, error) {
catalog, err := controller.cacheComponent.GetCatalogFromCacheOrDatastore(ctx)
catalog, err := controller.cacheComponent.GetCatalogFromCacheOrDatastore(ctx, controller.env.ApplicationEnvs.CacheRequestTimeout())
if err != nil {
return nil, errors.InternalError(errorTitleGetCatalog, userCloudConnectionErrMsg)
}
Expand All @@ -299,7 +299,7 @@ func (controller *playgroundController) GetPrecompiledObject(ctx context.Context
if err != nil {
return nil, errors.InvalidArgumentError(errorTitleGetExample, userBadCloudPathErrMsg)
}
sdks, err := controller.cacheComponent.GetSdkCatalogFromCacheOrDatastore(ctx)
sdks, err := controller.cacheComponent.GetSdkCatalogFromCacheOrDatastore(ctx, controller.env.ApplicationEnvs.CacheRequestTimeout())
if err != nil {
return nil, errors.InternalError(errorTitleGetExample, err.Error())
}
Expand Down Expand Up @@ -398,7 +398,7 @@ func (controller *playgroundController) GetDefaultPrecompiledObject(ctx context.
logger.Errorf("GetDefaultPrecompiledObject(): unimplemented sdk: %s\n", info.Sdk)
return nil, errors.InvalidArgumentError("Error during preparing", "Sdk is not implemented yet: %s", info.Sdk.String())
}
precompiledObject, err := controller.cacheComponent.GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx, info.Sdk)
precompiledObject, err := controller.cacheComponent.GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx, info.Sdk, controller.env.ApplicationEnvs.CacheRequestTimeout())
if err != nil {
logger.Errorf("GetDefaultPrecompiledObject(): error during getting catalog: %s", err.Error())
return nil, errors.InternalError("Error during getting Precompiled Objects", "Error with cloud connection")
Expand Down
128 changes: 79 additions & 49 deletions playground/backend/internal/components/cache_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components
import (
"context"
"fmt"
"time"

pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
Expand All @@ -37,73 +38,102 @@ func NewService(cache cache.Cache, db db.Database) *CacheComponent {

// GetSdkCatalogFromCacheOrDatastore returns the sdk catalog from the cache
// - If there is no sdk catalog in the cache, gets the sdk catalog from the Cloud Datastore and saves it to the cache
func (cp *CacheComponent) GetSdkCatalogFromCacheOrDatastore(ctx context.Context) ([]*entity.SDKEntity, error) {
sdks, err := cp.cache.GetSdkCatalog(ctx)
func (cp *CacheComponent) GetSdkCatalogFromCacheOrDatastore(ctx context.Context, cacheRequestTimeout time.Duration) ([]*entity.SDKEntity, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make timeout a parameter of a CacheComponent, what you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not flexible to configure this parameter this way, i'd like to stay it as an env variable

cctx, cancel := context.WithTimeout(ctx, cacheRequestTimeout)
defer cancel()
sdks, err := cp.cache.GetSdkCatalog(cctx)
if err != nil {
logger.Errorf("error during getting the sdk catalog from the cache, err: %s", err.Error())
sdks, err = cp.db.GetSDKs(ctx)
if err != nil {
logger.Errorf("error during getting the sdk catalog from the cloud datastore, err: %s", err.Error())
return nil, err
}
if err = cp.cache.SetSdkCatalog(ctx, sdks); err != nil {
logger.Errorf("error during setting the sdk catalog to the cache, err: %s", err.Error())
return nil, err
}
return cp.getSdks(ctx, cacheRequestTimeout)
} else {
return sdks, nil
}
}

func (cp *CacheComponent) getSdks(ctx context.Context, cacheRequestTimeout time.Duration) ([]*entity.SDKEntity, error) {
sdks, err := cp.db.GetSDKs(ctx)
if err != nil {
logger.Errorf("error during getting the sdk catalog from the cloud datastore, err: %s", err.Error())
return nil, err
}
cctx, cancel := context.WithTimeout(ctx, cacheRequestTimeout)
defer cancel()
if err = cp.cache.SetSdkCatalog(cctx, sdks); err != nil {
logger.Errorf("error during setting the sdk catalog to the cache, err: %s", err.Error())
}
return sdks, nil
}

// GetCatalogFromCacheOrDatastore returns the example catalog from cache
// - If there is no catalog in the cache, gets the catalog from the Cloud Datastore and saves it to the cache
func (cp *CacheComponent) GetCatalogFromCacheOrDatastore(ctx context.Context) ([]*pb.Categories, error) {
catalog, err := cp.cache.GetCatalog(ctx)
func (cp *CacheComponent) GetCatalogFromCacheOrDatastore(ctx context.Context, cacheRequestTimeout time.Duration) ([]*pb.Categories, error) {
cctx, cancel := context.WithTimeout(ctx, cacheRequestTimeout)
defer cancel()
catalog, err := cp.cache.GetCatalog(cctx)
if err != nil {
logger.Errorf("error during getting the catalog from the cache, err: %s", err.Error())
sdkCatalog, err := cp.GetSdkCatalogFromCacheOrDatastore(ctx)
if err != nil {
logger.Errorf("error during getting the sdk catalog from the cache or datastore, err: %s", err.Error())
return nil, err
}
catalog, err = cp.db.GetCatalog(ctx, sdkCatalog)
if err != nil {
return nil, err
}
if len(catalog) == 0 {
logger.Warn("example catalog is empty")
return catalog, nil
}
if err = cp.cache.SetCatalog(ctx, catalog); err != nil {
logger.Errorf("SetCatalog(): cache error: %s", err.Error())
return nil, err
}
return cp.getCatalog(ctx, cacheRequestTimeout)
} else {
return catalog, nil
}
}

func (cp *CacheComponent) getCatalog(ctx context.Context, cacheRequestTimeout time.Duration) ([]*pb.Categories, error) {
sdkCatalog, err := cp.GetSdkCatalogFromCacheOrDatastore(ctx, cacheRequestTimeout)
if err != nil {
return nil, err
}
catalog, err := cp.db.GetCatalog(ctx, sdkCatalog)
if err != nil {
return nil, err
}
if len(catalog) == 0 {
logger.Warn("example catalog is empty")
return catalog, nil
}
cctx, cancel := context.WithTimeout(ctx, cacheRequestTimeout)
defer cancel()
if err = cp.cache.SetCatalog(cctx, catalog); err != nil {
logger.Errorf("SetCatalog(): cache error: %s", err.Error())
}
return catalog, nil
}

// GetDefaultPrecompiledObjectFromCacheOrDatastore returns the default example from cache by sdk
// - If there is no a default example in the cache, gets the default example from the Cloud Datastore and saves it to the cache
func (cp *CacheComponent) GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx context.Context, sdk pb.Sdk) (*pb.PrecompiledObject, error) {
defaultExample, err := cp.cache.GetDefaultPrecompiledObject(ctx, sdk)
func (cp *CacheComponent) GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx context.Context, sdk pb.Sdk, cacheRequestTimeout time.Duration) (*pb.PrecompiledObject, error) {
cctx, cancel := context.WithTimeout(ctx, cacheRequestTimeout)
defer cancel()
defaultExample, err := cp.cache.GetDefaultPrecompiledObject(cctx, sdk)
if err != nil {
logger.Errorf("error during getting a default precompiled object, err: %s", err.Error())
sdks, err := cp.GetSdkCatalogFromCacheOrDatastore(ctx)
if err != nil {
logger.Errorf("error during getting sdk catalog from the cache or the cloud datastore, err: %s", err.Error())
return nil, err
}
defaultExamples, err := cp.db.GetDefaultExamples(ctx, sdks)
for sdk, defaultExample := range defaultExamples {
if err := cp.cache.SetDefaultPrecompiledObject(ctx, sdk, defaultExample); err != nil {
logger.Errorf("error during setting a default example to the cache: %s", err.Error())
return nil, err
}
}
defaultExample, ok := defaultExamples[sdk]
if !ok {
return nil, fmt.Errorf("no default example found for this sdk: %s", sdk)
}
logger.Errorf("error during getting the default precompiled object from the cache, err: %s", err.Error())
return cp.getDefaultExample(ctx, sdk, cacheRequestTimeout)
} else {
return defaultExample, nil
}
}

func (cp *CacheComponent) getDefaultExample(ctx context.Context, sdk pb.Sdk, cacheRequestTimeout time.Duration) (*pb.PrecompiledObject, error) {
sdks, err := cp.GetSdkCatalogFromCacheOrDatastore(ctx, cacheRequestTimeout)
if err != nil {
logger.Errorf("error during getting sdk catalog from the cache or the cloud datastore, err: %s", err.Error())
return nil, err
}
defaultExamples, err := cp.db.GetDefaultExamples(ctx, sdks)
if err != nil {
logger.Errorf("error during getting default examples from the cloud datastore, err: %s", err.Error())
return nil, err
}
cctx, cancel := context.WithTimeout(ctx, cacheRequestTimeout)
defer cancel()
for sdk, defaultExample := range defaultExamples {
if err := cp.cache.SetDefaultPrecompiledObject(cctx, sdk, defaultExample); err != nil {
logger.Errorf("error during setting a default example to the cache: %s", err.Error())
}
}
defaultExample, ok := defaultExamples[sdk]
if !ok {
return nil, fmt.Errorf("no default example found for this sdk: %s", sdk)
}
return defaultExample, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"reflect"
"testing"
"time"

pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
Expand All @@ -36,6 +37,7 @@ var datastoreDb *db.Datastore
var ctx context.Context
var cacheComponent *CacheComponent
var cacheService cache.Cache
var defaultCacheRequestTimeout = 10 * time.Second

func TestMain(m *testing.M) {
setup()
Expand Down Expand Up @@ -89,7 +91,7 @@ func TestCacheComponent_GetSdkCatalogFromCacheOrDatastore(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.prepare()
result, err := cacheComponent.GetSdkCatalogFromCacheOrDatastore(ctx)
result, err := cacheComponent.GetSdkCatalogFromCacheOrDatastore(ctx, defaultCacheRequestTimeout)
if (err != nil) != tt.wantErr {
t.Error("GetSdkCatalogFromCacheOrDatastore() unexpected error")
return
Expand Down Expand Up @@ -145,7 +147,7 @@ func TestCacheComponent_GetCatalogFromCacheOrDatastore(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.prepare()
result, err := cacheComponent.GetCatalogFromCacheOrDatastore(ctx)
result, err := cacheComponent.GetCatalogFromCacheOrDatastore(ctx, defaultCacheRequestTimeout)
if (err != nil) != tt.wantErr {
t.Error("GetCatalogFromCacheOrDatastore() unexpected error")
return
Expand Down Expand Up @@ -202,7 +204,7 @@ func TestCacheComponent_GetDefaultPrecompiledObjectFromCacheOrDatastore(t *testi
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.prepare()
result, err := cacheComponent.GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx, pb.Sdk_SDK_JAVA)
result, err := cacheComponent.GetDefaultPrecompiledObjectFromCacheOrDatastore(ctx, pb.Sdk_SDK_JAVA, defaultCacheRequestTimeout)
if (err != nil) != tt.wantErr {
t.Error("GetDefaultPrecompiledObjectFromCacheOrDatastore() unexpected error")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var testable *DatastoreMapper
var datastoreMapperCtx = context.Background()

func TestMain(m *testing.M) {
appEnv := environment.NewApplicationEnvs("/app", "", "", "", "", "../../../.", nil, 0)
appEnv := environment.NewApplicationEnvs("/app", "", "", "", "", "../../../.", nil, 0, 0)
appEnv.SetSchemaVersion("MOCK_SCHEMA")
props, _ := environment.NewProperties(appEnv.PropertyPath())
testable = NewDatastoreMapper(datastoreMapperCtx, appEnv, props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func setup() {
if err != nil {
panic(err)
}
appEnvs = environment.NewApplicationEnvs("/app", "", "", "", "../../../../../sdks-emulator.yaml", "../../../../.", nil, 0)
appEnvs = environment.NewApplicationEnvs("/app", "", "", "", "../../../../../sdks-emulator.yaml", "../../../../.", nil, 0, 0)
props, err = environment.NewProperties(appEnvs.PropertyPath())
if err != nil {
panic(err)
Expand Down
11 changes: 10 additions & 1 deletion playground/backend/internal/environment/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,16 @@ type ApplicationEnvs struct {

// propertyPath is the application properties path
propertyPath string

// cacheRequestTimeout is timeout to request data from cache
cacheRequestTimeout time.Duration
}

// NewApplicationEnvs constructor for ApplicationEnvs
func NewApplicationEnvs(
workingDir, launchSite, projectId, pipelinesFolder, sdkConfigPath, propertyPath string,
cacheEnvs *CacheEnvs,
pipelineExecuteTimeout time.Duration,
pipelineExecuteTimeout, cacheRequestTimeout time.Duration,
) *ApplicationEnvs {
return &ApplicationEnvs{
workingDir: workingDir,
Expand All @@ -125,6 +128,7 @@ func NewApplicationEnvs(
pipelinesFolder: pipelinesFolder,
sdkConfigPath: sdkConfigPath,
propertyPath: propertyPath,
cacheRequestTimeout: cacheRequestTimeout,
}
}

Expand Down Expand Up @@ -177,3 +181,8 @@ func (ae *ApplicationEnvs) PropertyPath() string {
func (ae *ApplicationEnvs) SetSchemaVersion(schemaVersion string) {
ae.schemaVersion = schemaVersion
}

// CacheRequestTimeout returns timeout to request data from cache
func (ae *ApplicationEnvs) CacheRequestTimeout() time.Duration {
return ae.cacheRequestTimeout
}
36 changes: 18 additions & 18 deletions playground/backend/internal/environment/environment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const (
defaultSDKConfigPath = "../sdks.yaml"
propertyPathKey = "PROPERTY_PATH"
defaultPropertyPath = "."
cacheRequestTimeoutKey = "CACHE_REQUEST_TIMEOUT"
defaultCacheRequestTimeout = time.Second * 5
)

// Environment operates with environment structures: NetworkEnvs, BeamEnvs, ApplicationEnvs
Expand Down Expand Up @@ -99,33 +101,19 @@ func NewEnvironment(networkEnvs NetworkEnvs, beamEnvs BeamEnvs, appEnvs Applicat
// - cache address: localhost:6379
// If os environment variables don't contain a value for app working dir - returns error.
func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error) {
pipelineExecuteTimeout := defaultPipelineExecuteTimeout
cacheExpirationTime := defaultCacheKeyExpirationTime
pipelineExecuteTimeout := getEnvAsDuration(pipelineExecuteTimeoutKey, defaultPipelineExecuteTimeout, "couldn't convert provided pipeline execute timeout. Using default %s\n")
cacheExpirationTime := getEnvAsDuration(cacheKeyExpirationTimeKey, defaultCacheKeyExpirationTime, "couldn't convert provided cache expiration time. Using default %s\n")
cacheType := getEnv(cacheTypeKey, defaultCacheType)
cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress)
launchSite := getEnv(launchSiteKey, defaultLaunchSite)
projectId := os.Getenv(projectIdKey)
pipelinesFolder := getEnv(pipelinesFolderKey, defaultPipelinesFolder)
sdkConfigPath := getEnv(SDKConfigPathKey, defaultSDKConfigPath)
propertyPath := getEnv(propertyPathKey, defaultPropertyPath)

if value, present := os.LookupEnv(cacheKeyExpirationTimeKey); present {
if converted, err := time.ParseDuration(value); err == nil {
cacheExpirationTime = converted
} else {
log.Printf("couldn't convert provided cache expiration time. Using default %s\n", defaultCacheKeyExpirationTime)
}
}
if value, present := os.LookupEnv(pipelineExecuteTimeoutKey); present {
if converted, err := time.ParseDuration(value); err == nil {
pipelineExecuteTimeout = converted
} else {
log.Printf("couldn't convert provided pipeline execute timeout. Using default %s\n", defaultPipelineExecuteTimeout)
}
}
cacheRequestTimeout := getEnvAsDuration(cacheRequestTimeoutKey, defaultCacheRequestTimeout, "couldn't convert provided cache request timeout. Using default %s\n")

if value, present := os.LookupEnv(workingDirKey); present {
return NewApplicationEnvs(value, launchSite, projectId, pipelinesFolder, sdkConfigPath, propertyPath, NewCacheEnvs(cacheType, cacheAddress, cacheExpirationTime), pipelineExecuteTimeout), nil
return NewApplicationEnvs(value, launchSite, projectId, pipelinesFolder, sdkConfigPath, propertyPath, NewCacheEnvs(cacheType, cacheAddress, cacheExpirationTime), pipelineExecuteTimeout, cacheRequestTimeout), nil
}
return nil, errors.New("APP_WORK_DIR env should be provided with os.env")
}
Expand Down Expand Up @@ -260,3 +248,15 @@ func getEnvAsInt(key string, defaultValue int) int {
}
return defaultValue
}

// getEnvAsDuration returns an environment variable or default value as duration
func getEnvAsDuration(key string, defaultValue time.Duration, errMsg string) time.Duration {
if value, present := os.LookupEnv(key); present {
if converted, err := time.ParseDuration(value); err == nil {
return converted
} else {
log.Printf(errMsg, defaultValue)
}
}
return defaultValue
}
Loading