Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ tasks.rat {
"playground/backend/internal/api/v1/api.pb.go",
"playground/backend/internal/api/v1/api_grpc.pb.go",

// Playground infrastructure autogenerated GRPC API stubs and mocks
"playground/infrastructure/api/v1/api_pb2.py",
"playground/infrastructure/api/v1/api_pb2.pyi",
"playground/infrastructure/api/v1/api_pb2_grpc.py",

// test p8 file for SnowflakeIO
"sdks/java/io/snowflake/src/test/resources/invalid_test_rsa_key.p8",
"sdks/java/io/snowflake/src/test/resources/valid_encrypted_test_rsa_key.p8",
Expand Down
4 changes: 3 additions & 1 deletion learning/katas/python/IO/TextIO/ReadFromText/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
# name: ReadFromText
# description: Task from katas to read from text files.
# multifile: true
# context_line: 29
# files:
# - name: countries.txt
# context_line: 33
# categories:
# - IO
# complexity: BASIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
# name: EarlyTriggers
# description: Task from katas to count events using early triggers
# multifile: true
# context_line: 46
# files:
# - name: generate_event.py
# context_line: 36
# categories:
# - Streaming
# complexity: MEDIUM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
# name: EventTimeTriggers
# description: Task from katas to count events with event time triggers
# multifile: true
# context_line: 46
# files:
# - name: generate_event.py
# context_line: 36
# categories:
# - Streaming
# complexity: MEDIUM
Expand All @@ -41,15 +43,15 @@


class CountEvents(beam.PTransform):
def expand(self, events):
return (events
| beam.WindowInto(FixedWindows(5),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
| beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
def expand(self, events):
return (events
| beam.WindowInto(FixedWindows(5),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
| beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())

with beam.Pipeline() as p:
(p | GenerateEvent.sample_data()
(p | GenerateEvent.sample_data()
| CountEvents()
| LogElements(with_window=True))
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
# name: WindowAccumulationMode
# description: Task from katas to count events using ACCUMULATING as accumulation mode
# multifile: true
# context_line: 51
# files:
# - name: generate_event.py
# context_line: 36
# categories:
# - Streaming
# complexity: ADVANCED
Expand Down
4 changes: 3 additions & 1 deletion playground/api/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ message RunCodeRequest {
// The pipeline options as they would be passed to the program (e.g. "--option1 value1 --option2 value2")
string pipeline_options = 3;
repeated Dataset datasets = 4;
repeated SnippetFile files = 5;
}

// RunCodeResponse contains information of the pipeline uuid.
Expand Down Expand Up @@ -190,7 +191,7 @@ message PrecompiledObject{
repeated string tags = 12;
repeated Dataset datasets = 13;

// Link to the example in the Beam repository
// Link to the example in the Beam repository
string url_vcs = 14;
string url_notebook = 15;
}
Expand Down Expand Up @@ -254,6 +255,7 @@ message GetPrecompiledObjectResponse{
// GetPrecompiledObjectResponse represents the source code of the PrecompiledObject.
message GetPrecompiledObjectCodeResponse{
string code = 1;
repeated SnippetFile files = 2;
}

// GetPrecompiledObjectOutputResponse represents the result of the executed code.
Expand Down
52 changes: 46 additions & 6 deletions playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"beam.apache.org/playground/backend/internal/code_processing"
"beam.apache.org/playground/backend/internal/components"
"beam.apache.org/playground/backend/internal/db"
"beam.apache.org/playground/backend/internal/db/entity"
"beam.apache.org/playground/backend/internal/db/mapper"
"beam.apache.org/playground/backend/internal/emulators"
"beam.apache.org/playground/backend/internal/environment"
Expand All @@ -47,9 +48,10 @@ const (
errorTitleGetDefaultExample = "Error during getting default example"
errorTitleRunCode = "Error during run code"

userBadCloudPathErrMsg = "Invalid cloud path parameter"
userCloudConnectionErrMsg = "Cloud connection error"
resourceNotFoundErrMsg = "Resource is not found"
userBadCloudPathErrMsg = "Invalid cloud path parameter"
userCloudConnectionErrMsg = "Cloud connection error"
resourceNotFoundErrMsg = "Resource is not found"
resourceInconsistentErrMsg = "Resource is not consistent"
)

// playgroundController processes `gRPC' requests from clients.
Expand Down Expand Up @@ -97,8 +99,30 @@ func (controller *playgroundController) RunCode(ctx context.Context, info *pb.Ru
kafkaMockCluster = kafkaMockClusters[0]
prepareParams = prepareParamsVal
}
sources := make([]entity.FileEntity, 0)
if len(info.Files) > 0 {
for _, file := range info.Files {
sources = append(sources, entity.FileEntity{
Name: file.Name,
Content: file.Content,
IsMain: file.IsMain,
CntxLine: 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eantyshev Am I right that we are setting CntxLine to 1 because it is not used in runners and we do not care about it?

})
}
} else {
fileName, err := utils.GetFileName("", info.Code, info.Sdk)
if err != nil {
return nil, cerrors.InternalError(errorTitleRunCode, "Failed to get default filename")
}
sources = append(sources, entity.FileEntity{
Name: fileName,
Content: info.Code,
IsMain: true,
CntxLine: 1,
})
}

lc, err := life_cycle.Setup(info.Sdk, info.Code, pipelineId, controller.env.ApplicationEnvs.WorkingDir(), controller.env.ApplicationEnvs.PipelinesFolder(), controller.env.BeamSdkEnvs.PreparedModDir(), kafkaMockCluster)
lc, err := life_cycle.Setup(info.Sdk, sources, pipelineId, controller.env.ApplicationEnvs.WorkingDir(), controller.env.ApplicationEnvs.PipelinesFolder(), controller.env.BeamSdkEnvs.PreparedModDir(), kafkaMockCluster)
if err != nil {
logger.Errorf("RunCode(): error during setup file system: %s\n", err.Error())
return nil, cerrors.InternalError("Error during preparing", "Error during setup file system for the code processing: %s", err.Error())
Expand Down Expand Up @@ -337,7 +361,7 @@ func (controller *playgroundController) GetPrecompiledObjectCode(ctx context.Con
if err != nil {
return nil, cerrors.InvalidArgumentError(errorTitleGetExampleCode, userBadCloudPathErrMsg)
}
codeString, err := controller.db.GetExampleCode(ctx, exampleId)
files, err := controller.db.GetExampleCode(ctx, exampleId)
if err != nil {
switch err {
case datastore.ErrNoSuchEntity:
Expand All @@ -346,7 +370,23 @@ func (controller *playgroundController) GetPrecompiledObjectCode(ctx context.Con
return nil, cerrors.InternalError(errorTitleGetExampleCode, userCloudConnectionErrMsg)
}
}
response := pb.GetPrecompiledObjectCodeResponse{Code: codeString}
if len(files) == 0 {
return nil, cerrors.NotFoundError(errorTitleGetExampleCode, resourceNotFoundErrMsg)
}
response := pb.GetPrecompiledObjectCodeResponse{}
for _, file := range files {
response.Files = append(response.Files, &pb.SnippetFile{
Name: file.Name,
Content: file.Content,
IsMain: file.IsMain,
})
if file.IsMain {
response.Code = file.Content
}
}
if len(response.Files) == 0 || response.Code == "" {
return nil, cerrors.InternalError(errorTitleGetExampleCode, resourceInconsistentErrMsg)
}
return &response, nil
}

Expand Down
48 changes: 41 additions & 7 deletions playground/backend/cmd/server/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,21 @@ func TestPlaygroundController_RunCode(t *testing.T) {
},
wantErr: false,
},
{
name: "RunCode multifile",
args: args{
ctx: context.Background(),
request: &pb.RunCodeRequest{
Code: "MOCK_CODE",
Sdk: pb.Sdk_SDK_JAVA,
Files: []*pb.SnippetFile{
{Name: "main.java", Content: "MOCK_CODE", IsMain: true},
{Name: "import.java", Content: "import content", IsMain: false},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -1189,16 +1204,36 @@ func TestPlaygroundController_GetPrecompiledObjectCode(t *testing.T) {
name string
args args
wantErr bool
wantResponse string
wantResponse *pb.GetPrecompiledObjectCodeResponse
}{
{
name: "Getting the code of the specific example in the usual case",
name: "Getting the code of single-file example",
args: args{
ctx: ctx,
info: &pb.GetPrecompiledObjectCodeRequest{CloudPath: "SDK_JAVA/PRECOMPILED_OBJECT_TYPE_EXAMPLE/MOCK_DEFAULT_EXAMPLE"},
},
wantErr: false,
wantResponse: "MOCK_CONTENT",
wantErr: false,
wantResponse: &pb.GetPrecompiledObjectCodeResponse{
Code: "MOCK_CONTENT_0",
Files: []*pb.SnippetFile{
{Name: "MOCK_NAME_0", Content: "MOCK_CONTENT_0", IsMain: true},
},
},
},
{
name: "Getting the code of multifile example",
args: args{
ctx: ctx,
info: &pb.GetPrecompiledObjectCodeRequest{CloudPath: "SDK_JAVA/PRECOMPILED_OBJECT_TYPE_EXAMPLE/MOCK_MULTIFILE"},
},
wantErr: false,
wantResponse: &pb.GetPrecompiledObjectCodeResponse{
Code: "MOCK_CONTENT_0",
Files: []*pb.SnippetFile{
{Name: "MOCK_NAME_0", Content: "MOCK_CONTENT_0", IsMain: true},
{Name: "MOCK_NAME_1", Content: "MOCK_CONTENT_1", IsMain: false},
},
},
},
}

Expand All @@ -1209,9 +1244,8 @@ func TestPlaygroundController_GetPrecompiledObjectCode(t *testing.T) {
t.Errorf("PlaygroundController_GetPrecompiledObjectCode() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got.Code != tt.wantResponse {
t.Errorf("PlaygroundController_GetPrecompiledObjectCode() unexpected result")
}
assert.Equal(t, tt.wantResponse.Code, got.Code)
assert.Equal(t, tt.wantResponse.Files, got.Files)
})
}
}
Expand Down
Loading