Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13267][Playground] Implement get_statuses method #16073

Merged
merged 16 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions playground/infrastructure/ci_cd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

Expand Down
2 changes: 1 addition & 1 deletion playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def verify_examples(self, examples: List[Example]):
2. Group code of examples by their SDK.
3. Run processing for all examples to verify examples' code.
"""
get_statuses(examples)
await get_statuses(examples)
await self._verify_examples_status(examples)

async def _verify_examples_status(self, examples: List[Example]):
Expand Down
1 change: 1 addition & 0 deletions playground/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Config:
SUPPORTED_SDK = {'java': SDK_JAVA, 'go': SDK_GO, 'py': SDK_PYTHON}
BEAM_PLAYGROUND_TITLE = "Beam-playground:\n"
BEAM_PLAYGROUND = "Beam-playground"
PAUSE_DELAY = 10


@dataclass(frozen=True)
Expand Down
46 changes: 34 additions & 12 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
import yaml
Expand All @@ -21,8 +22,10 @@
from typing import List
from yaml import YAMLError
from config import Config, TagFields
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk, STATUS_VALIDATING, STATUS_PREPARING, \
STATUS_COMPILING, STATUS_EXECUTING
from collections import namedtuple
from grpc_client import GRPCClient

Tag = namedtuple("Tag", [TagFields.NAME, TagFields.DESCRIPTION, TagFields.MULTIFILE, TagFields.CATEGORIES])

Expand Down Expand Up @@ -75,21 +78,18 @@ def find_examples(work_dir: str, supported_categories: List[str]) -> List[Exampl
return examples


def get_statuses(examples: List[Example]):
async def get_statuses(examples: List[Example]):
"""
Receive statuses for examples and update example.status and example.pipeline_id

Use client to send requests to the backend:
1. Start code processing.
2. Ping the backend while status is STATUS_VALIDATING/STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING
Update example.pipeline_id with resulting pipelineId.
Update example.status with resulting status.
Receive status and update example.status and example.pipeline_id for each example

Args:
examples: beam examples for processing and updating statuses.
examples: beam examples for processing and updating statuses and pipeline_id values.
"""
# TODO [BEAM-13267] Implement
pass
tasks = []
client = GRPCClient()
for example in examples:
tasks.append(_update_example_status(example, client))
await asyncio.gather(*tasks)


def get_tag(filepath):
Expand Down Expand Up @@ -272,3 +272,25 @@ def _get_sdk(filename: str) -> Sdk:
return Config.SUPPORTED_SDK[extension]
else:
raise ValueError(extension + " is not supported")


async def _update_example_status(example: Example, client: GRPCClient):
"""
Receive status for examples and update example.status and pipeline_id

Use client to send requests to the backend:
1. Start code processing.
2. Ping the backend while status is STATUS_VALIDATING/STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING
Update example.status with resulting status.

Args:
example: beam example for processing and updating status and pipeline_id.
client: client to send requests to the server.
"""
pipeline_id = await client.run_code(example.code, example.sdk)
example.pipeline_id = pipeline_id
status = await client.check_status(pipeline_id)
while status in [STATUS_VALIDATING, STATUS_PREPARING, STATUS_COMPILING, STATUS_EXECUTING]:
await asyncio.sleep(Config.PAUSE_DELAY)
status = await client.check_status(pipeline_id)
example.status = status
41 changes: 38 additions & 3 deletions playground/infrastructure/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import pytest

from unittest.mock import mock_open
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, SDK_JAVA, SDK_PYTHON, SDK_GO
from helper import find_examples, Example, _get_example, _get_name, _get_sdk, get_tag, _validate, Tag, \
get_supported_categories, _check_file
from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, SDK_JAVA, SDK_PYTHON, SDK_GO, STATUS_VALIDATING, \
STATUS_FINISHED
from grpc_client import GRPCClient
from helper import find_examples, Example, _get_example, _get_name, _get_sdk, get_tag, _validate, Tag, get_statuses, \
_update_example_status, get_supported_categories, _check_file


@mock.patch('helper._check_file')
Expand Down Expand Up @@ -49,6 +51,21 @@ def test_find_examples_with_invalid_tag(mock_os_walk, mock_check_file):
mock_check_file.assert_called_once_with([], "file.java", "/root/file.java", [])


@pytest.mark.asyncio
@mock.patch('helper.GRPCClient')
@mock.patch('helper._update_example_status')
async def test_get_statuses(mock_update_example_status, mock_grpc_client):
example = Example("file", "pipeline_id", SDK_UNSPECIFIED, "root/file.extension", "code", "output",
STATUS_UNSPECIFIED, {"name": "Name"})
client = None

mock_grpc_client.return_value = client

await get_statuses([example])

mock_update_example_status.assert_called_once_with(example, client)


@mock.patch('builtins.open', mock_open(read_data="...\n# Beam-playground:\n# name: Name\n\nimport ..."))
def test_get_tag_when_tag_is_exists():
result = get_tag("")
Expand Down Expand Up @@ -183,3 +200,21 @@ def test__get_sdk_with_supported_extension():
def test__get_sdk_with_unsupported_extension():
with pytest.raises(ValueError, match="extension is not supported"):
_get_sdk("filename.extension")


@pytest.mark.asyncio
@mock.patch('grpc_client.GRPCClient.check_status')
@mock.patch('grpc_client.GRPCClient.run_code')
async def test__update_example_status(mock_grpc_client_run_code, mock_grpc_client_check_status):
example = Example("file", "pipeline_id", SDK_UNSPECIFIED, "root/file.extension", "code", "output",
STATUS_UNSPECIFIED, {"name": "Name"})

mock_grpc_client_run_code.return_value = "pipeline_id"
mock_grpc_client_check_status.side_effect = [STATUS_VALIDATING, STATUS_FINISHED]

await _update_example_status(example, GRPCClient())

assert example.pipeline_id == "pipeline_id"
assert example.status == STATUS_FINISHED
mock_grpc_client_run_code.assert_called_once_with(example.code, example.sdk)
mock_grpc_client_check_status.assert_has_calls([mock.call("pipeline_id")])