Skip to content

Commit

Permalink
Merge pull request apache#16073 from [BEAM-13267][Playground] Impleme…
Browse files Browse the repository at this point in the history
…nt get_statuses method

* [Playground][BEAM-12941][Bugfix] Fix workflows for playground applications (#83)

* Update workflows for playground

* Attempt to fix tests

* Remove continue on error to catch errors

* Fix linter problem for backend dockerfile

* Update folder to run backend go linter

* Moved flutter test to execution via gradle tasks

* Revert "[Playground][BEAM-12941][Bugfix] Fix workflows for playground applications (#83)" (#88)

This reverts commit b73f5f7.

* [BEAM-13267][Playground]
implemented a method to send examples to the backend to verify them

Co-authored-by: Sergey Kalinin <91209855+snkalinin@users.noreply.github.com>
Co-authored-by: Ilya <ilya.kozyrev@akvelon.com>
Co-authored-by: daria.malkova <daria.malkova@akvelon.com>
Co-authored-by: Pavel Avilov <pavel.avilov@akvelon.com>
Co-authored-by: Aydar Farrakhov <stranniknm@gmail.com>
  • Loading branch information
6 people committed Nov 30, 2021
1 parent 3b9d1d5 commit 85b9778
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 16 deletions.
1 change: 1 addition & 0 deletions playground/infrastructure/ci_cd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

Expand Down
2 changes: 1 addition & 1 deletion playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def verify_examples(self, examples: List[Example]):
2. Group code of examples by their SDK.
3. Run processing for all examples to verify examples' code.
"""
get_statuses(examples)
await get_statuses(examples)
await self._verify_examples_status(examples)

async def _verify_examples_status(self, examples: List[Example]):
Expand Down
1 change: 1 addition & 0 deletions playground/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Config:
SUPPORTED_SDK = {'java': SDK_JAVA, 'go': SDK_GO, 'py': SDK_PYTHON}
BEAM_PLAYGROUND_TITLE = "Beam-playground:\n"
BEAM_PLAYGROUND = "Beam-playground"
PAUSE_DELAY = 10


@dataclass(frozen=True)
Expand Down
46 changes: 34 additions & 12 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
import yaml
Expand All @@ -21,8 +22,10 @@
from typing import List
from yaml import YAMLError
from config import Config, TagFields
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk, STATUS_VALIDATING, STATUS_PREPARING, \
STATUS_COMPILING, STATUS_EXECUTING
from collections import namedtuple
from grpc_client import GRPCClient

Tag = namedtuple("Tag", [TagFields.NAME, TagFields.DESCRIPTION, TagFields.MULTIFILE, TagFields.CATEGORIES])

Expand Down Expand Up @@ -75,21 +78,18 @@ def find_examples(work_dir: str, supported_categories: List[str]) -> List[Exampl
return examples


def get_statuses(examples: List[Example]):
async def get_statuses(examples: List[Example]):
"""
Receive statuses for examples and update example.status and example.pipeline_id
Use client to send requests to the backend:
1. Start code processing.
2. Ping the backend while status is STATUS_VALIDATING/STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING
Update example.pipeline_id with resulting pipelineId.
Update example.status with resulting status.
Receive status and update example.status and example.pipeline_id for each example
Args:
examples: beam examples for processing and updating statuses.
examples: beam examples for processing and updating statuses and pipeline_id values.
"""
# TODO [BEAM-13267] Implement
pass
tasks = []
client = GRPCClient()
for example in examples:
tasks.append(_update_example_status(example, client))
await asyncio.gather(*tasks)


def get_tag(filepath):
Expand Down Expand Up @@ -272,3 +272,25 @@ def _get_sdk(filename: str) -> Sdk:
return Config.SUPPORTED_SDK[extension]
else:
raise ValueError(extension + " is not supported")


async def _update_example_status(example: Example, client: GRPCClient):
"""
Receive status for examples and update example.status and pipeline_id
Use client to send requests to the backend:
1. Start code processing.
2. Ping the backend while status is STATUS_VALIDATING/STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING
Update example.status with resulting status.
Args:
example: beam example for processing and updating status and pipeline_id.
client: client to send requests to the server.
"""
pipeline_id = await client.run_code(example.code, example.sdk)
example.pipeline_id = pipeline_id
status = await client.check_status(pipeline_id)
while status in [STATUS_VALIDATING, STATUS_PREPARING, STATUS_COMPILING, STATUS_EXECUTING]:
await asyncio.sleep(Config.PAUSE_DELAY)
status = await client.check_status(pipeline_id)
example.status = status
41 changes: 38 additions & 3 deletions playground/infrastructure/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import pytest

from unittest.mock import mock_open
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, SDK_JAVA, SDK_PYTHON, SDK_GO
from helper import find_examples, Example, _get_example, _get_name, _get_sdk, get_tag, _validate, Tag, \
get_supported_categories, _check_file
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, SDK_JAVA, SDK_PYTHON, SDK_GO, STATUS_VALIDATING, \
STATUS_FINISHED
from grpc_client import GRPCClient
from helper import find_examples, Example, _get_example, _get_name, _get_sdk, get_tag, _validate, Tag, get_statuses, \
_update_example_status, get_supported_categories, _check_file


@mock.patch('helper._check_file')
Expand Down Expand Up @@ -49,6 +51,21 @@ def test_find_examples_with_invalid_tag(mock_os_walk, mock_check_file):
mock_check_file.assert_called_once_with([], "file.java", "/root/file.java", [])


@pytest.mark.asyncio
@mock.patch('helper.GRPCClient')
@mock.patch('helper._update_example_status')
async def test_get_statuses(mock_update_example_status, mock_grpc_client):
example = Example("file", "pipeline_id", SDK_UNSPECIFIED, "root/file.extension", "code", "output",
STATUS_UNSPECIFIED, {"name": "Name"})
client = None

mock_grpc_client.return_value = client

await get_statuses([example])

mock_update_example_status.assert_called_once_with(example, client)


@mock.patch('builtins.open', mock_open(read_data="...\n# Beam-playground:\n# name: Name\n\nimport ..."))
def test_get_tag_when_tag_is_exists():
result = get_tag("")
Expand Down Expand Up @@ -183,3 +200,21 @@ def test__get_sdk_with_supported_extension():
def test__get_sdk_with_unsupported_extension():
with pytest.raises(ValueError, match="extension is not supported"):
_get_sdk("filename.extension")


@pytest.mark.asyncio
@mock.patch('grpc_client.GRPCClient.check_status')
@mock.patch('grpc_client.GRPCClient.run_code')
async def test__update_example_status(mock_grpc_client_run_code, mock_grpc_client_check_status):
example = Example("file", "pipeline_id", SDK_UNSPECIFIED, "root/file.extension", "code", "output",
STATUS_UNSPECIFIED, {"name": "Name"})

mock_grpc_client_run_code.return_value = "pipeline_id"
mock_grpc_client_check_status.side_effect = [STATUS_VALIDATING, STATUS_FINISHED]

await _update_example_status(example, GRPCClient())

assert example.pipeline_id == "pipeline_id"
assert example.status == STATUS_FINISHED
mock_grpc_client_run_code.assert_called_once_with(example.code, example.sdk)
mock_grpc_client_check_status.assert_has_calls([mock.call("pipeline_id")])

0 comments on commit 85b9778

Please sign in to comment.