Skip to content

Commit

Permalink
Merge pull request #16699 from [BEAM-13789][Playground] Change logic …
Browse files Browse the repository at this point in the history
…of keeping examples to the bucket on CD side

* [BEAM-13789][Playground]
Change examples' folders structure
Fix by `yapf` and `pylint`

* [BEAM-13789][Playground]
Change examples' folders structure
Fix by `yapf` and `pylint`
  • Loading branch information
Aydar Zainutdinov committed Feb 4, 2022
1 parent d8ab299 commit 4b2343a
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 28 deletions.
21 changes: 16 additions & 5 deletions playground/infrastructure/cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from tqdm import tqdm
from google.cloud import storage

from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA, PrecompiledObjectType
from config import Config, PrecompiledExample
from grpc_client import GRPCClient
from helper import Example, get_statuses
Expand Down Expand Up @@ -74,12 +74,13 @@ async def _get_outputs(self, examples: List[Example]):
tasks = [client.get_log(example.pipeline_id) for example in examples]
logs = await asyncio.gather(*tasks)

if len(examples) > 0 and (examples[0].sdk is SDK_PYTHON or examples[0].sdk is SDK_JAVA):
if len(examples) > 0 and (examples[0].sdk is SDK_PYTHON or
examples[0].sdk is SDK_JAVA):
tasks = [client.get_graph(example.pipeline_id) for example in examples]
graphs = await asyncio.gather(*tasks)

for graph, example in zip(graphs, examples):
example.graph = graph
example.graph = graph

for output, example in zip(outputs, examples):
example.output = output
Expand Down Expand Up @@ -117,38 +118,43 @@ def _write_to_local_fs(self, example: Example):
Config.TEMP_FOLDER,
example.pipeline_id,
Sdk.Name(example.sdk),
PrecompiledObjectType.Name(example.type),
example.tag.name)
Path(path_to_object_folder).mkdir(parents=True, exist_ok=True)

file_names = {}
code_path = self._get_gcs_object_name(
sdk=example.sdk,
type=example.type,
base_folder_name=example.tag.name,
file_name=example.tag.name)
output_path = self._get_gcs_object_name(
sdk=example.sdk,
type=example.type,
base_folder_name=example.tag.name,
file_name=example.tag.name,
extension=PrecompiledExample.OUTPUT_EXTENSION)
log_path = self._get_gcs_object_name(
sdk=example.sdk,
type=example.type,
base_folder_name=example.tag.name,
file_name=example.tag.name,
extension=PrecompiledExample.LOG_EXTENSION)
graph_path = self._get_gcs_object_name(
sdk=example.sdk,
type=example.type,
base_folder_name=example.tag.name,
file_name=example.tag.name,
extension=PrecompiledExample.GRAPH_EXTENSION)
meta_path = self._get_gcs_object_name(
sdk=example.sdk,
type=example.type,
base_folder_name=example.tag.name,
file_name=PrecompiledExample.META_NAME,
extension=PrecompiledExample.META_EXTENSION)
file_names[code_path] = example.code
file_names[output_path] = example.output
meta = example.tag._asdict()
meta["type"] = example.type
meta["link"] = example.link
file_names[meta_path] = json.dumps(meta)
file_names[log_path] = example.logs
Expand All @@ -166,6 +172,7 @@ def _write_to_local_fs(self, example: Example):
def _get_gcs_object_name(
self,
sdk: Sdk,
type: PrecompiledObjectType,
base_folder_name: str,
file_name: str,
extension: str = None):
Expand All @@ -174,6 +181,7 @@ def _get_gcs_object_name(
Args:
sdk: sdk of the example
type: type of the example
file_name: name of the example
base_folder_name: name of the folder where example is stored
(eq. to example name)
Expand All @@ -184,7 +192,10 @@ def _get_gcs_object_name(
if extension is None:
extension = Config.SDK_TO_EXTENSION[sdk]
return os.path.join(
Sdk.Name(sdk), base_folder_name, f"{file_name}.{extension}")
Sdk.Name(sdk),
PrecompiledObjectType.Name(type),
base_folder_name,
f"{file_name}.{extension}")

def _upload_blob(self, source_file: str, destination_blob_name: str):
"""
Expand Down
1 change: 1 addition & 0 deletions playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CIHelper:
It is used to find and verify correctness if beam examples/katas/tests.
"""

async def verify_examples(self, examples: List[Example]):
"""
Verify correctness of beam examples.
Expand Down
18 changes: 14 additions & 4 deletions playground/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

from api.v1.api_pb2 import STATUS_VALIDATION_ERROR, STATUS_ERROR, \
STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, \
STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR, SDK_JAVA, SDK_GO, SDK_PYTHON, SDK_SCIO, Sdk
STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR, SDK_JAVA, SDK_GO, SDK_PYTHON, \
SDK_SCIO, Sdk


@dataclass(frozen=True)
Expand All @@ -32,11 +33,19 @@ class Config:
General configuration for CI/CD steps
"""
SERVER_ADDRESS = os.getenv("SERVER_ADDRESS", "localhost:8080")
EXTENSION_TO_SDK = {"java": SDK_JAVA, "go": SDK_GO, "py": SDK_PYTHON, "scala": SDK_SCIO}
SUPPORTED_SDK = (Sdk.Name(SDK_JAVA), Sdk.Name(SDK_GO), Sdk.Name(SDK_PYTHON), Sdk.Name(SDK_SCIO))
EXTENSION_TO_SDK = {
"java": SDK_JAVA, "go": SDK_GO, "py": SDK_PYTHON, "scala": SDK_SCIO
}
SUPPORTED_SDK = (
Sdk.Name(SDK_JAVA),
Sdk.Name(SDK_GO),
Sdk.Name(SDK_PYTHON),
Sdk.Name(SDK_SCIO))
BUCKET_NAME = "playground-precompiled-objects"
TEMP_FOLDER = "temp"
SDK_TO_EXTENSION = {SDK_JAVA: "java", SDK_GO: "go", SDK_PYTHON: "py", SDK_SCIO: "scala"}
SDK_TO_EXTENSION = {
SDK_JAVA: "java", SDK_GO: "go", SDK_PYTHON: "py", SDK_SCIO: "scala"
}
NO_STORE = "no-store"
ERROR_STATUSES = [
STATUS_VALIDATION_ERROR,
Expand Down Expand Up @@ -81,6 +90,7 @@ class PrecompiledExampleType:
katas = "katas"
test_ends = ("test", "it")


@dataclass(frozen=True)
class OptionalTagFields:
pipeline_options: str = "pipeline_options"
Expand Down
2 changes: 1 addition & 1 deletion playground/infrastructure/grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def get_graph(self, pipeline_uuid: str) -> str:
try:
response = await self._stub.GetGraph(request)
return response.graph
except grpc.RpcError as e:
except grpc.RpcError:
# Some examples doesn't have graph (katas)
return ""

Expand Down
12 changes: 7 additions & 5 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,12 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool:
In case tag is not valid, False
"""
valid = True
required_tag_fields = {f.default for f in fields(TagFields)
if f.default not in
{o_f.default for o_f in fields(OptionalTagFields)}}
required_tag_fields = {
f.default
for f in fields(TagFields)
if f.default not in {o_f.default
for o_f in fields(OptionalTagFields)}
}
# check that all fields exist and they have no empty value
for field in required_tag_fields:
if field not in tag:
Expand All @@ -294,8 +297,7 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool:
valid = False
if valid is True:
value = tag.get(field)
if (value == "" or
value is None) and field != TagFields.pipeline_options:
if (value == "" or value is None) and field != TagFields.pipeline_options:
logging.error(
"tag's value is incorrect: %s\n%s field can not be empty.",
tag,
Expand Down
31 changes: 22 additions & 9 deletions playground/infrastructure/test_cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import pytest

from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED
from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED, \
PRECOMPILED_OBJECT_TYPE_UNIT_TEST
from cd_helper import CDHelper
from config import Config
from helper import Example, Tag
Expand Down Expand Up @@ -47,12 +48,17 @@ def test__get_gcs_object_name():
"""
Test getting the path where file will be stored at the bucket
"""
expected_result = "SDK_JAVA/base_folder/file.java"
expected_result_with_extension = "SDK_JAVA/base_folder/file.output"
expected_path = "SDK_JAVA/PRECOMPILED_OBJECT_TYPE_UNIT_TEST/base_folder"
expected_result = "%s/%s" % (expected_path, "file.java")
expected_result_with_extension = "%s/%s" % (expected_path, "file.output")
assert CDHelper()._get_gcs_object_name(
SDK_JAVA, "base_folder", "file") == expected_result
SDK_JAVA, PRECOMPILED_OBJECT_TYPE_UNIT_TEST, "base_folder",
"file") == expected_result
assert CDHelper()._get_gcs_object_name(
SDK_JAVA, "base_folder", "file",
SDK_JAVA,
PRECOMPILED_OBJECT_TYPE_UNIT_TEST,
"base_folder",
"file",
"output") == expected_result_with_extension


Expand Down Expand Up @@ -81,11 +87,18 @@ def test__write_to_local_fs(delete_temp_folder):
status=STATUS_UNSPECIFIED,
tag=Tag(**object_meta),
link="link")
result_filepath = "SDK_JAVA/PRECOMPILED_OBJECT_TYPE_UNSPECIFIED/name"
expected_result = {
"SDK_JAVA/name/name.java": "temp/pipeline_id/SDK_JAVA/name/name.java",
"SDK_JAVA/name/name.output": "temp/pipeline_id/SDK_JAVA/name/name.output",
"SDK_JAVA/name/name.log": "temp/pipeline_id/SDK_JAVA/name/name.log",
"SDK_JAVA/name/meta.info": "temp/pipeline_id/SDK_JAVA/name/meta.info"
"%s/%s" % (result_filepath, "name.java"): "%s/%s/%s" %
("temp/pipeline_id", result_filepath, "name.java"),
"%s/%s" % (result_filepath, "name.output"): "%s/%s/%s" %
("temp/pipeline_id", result_filepath, "name.output"),
"%s/%s" % (result_filepath, "name.log"): "%s/%s/%s" %
("temp/pipeline_id", result_filepath, "name.log"),
"%s/%s" % (result_filepath, "name.graph"): "%s/%s/%s" %
("temp/pipeline_id", result_filepath, "name.graph"),
"%s/%s" % (result_filepath, "meta.info"): "%s/%s/%s" %
("temp/pipeline_id", result_filepath, "meta.info"),
}
assert CDHelper()._write_to_local_fs(example) == expected_result

Expand Down
7 changes: 3 additions & 4 deletions playground/infrastructure/test_ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@


@pytest.mark.asyncio
@mock.patch("ci_helper.CIHelper._verify_examples_status")
@mock.patch("ci_helper.CIHelper._verify_examples")
@mock.patch("ci_helper.get_statuses")
async def test_verify_examples(
mock_get_statuses, mock_verify_examples_statuses):
async def test_verify_examples(mock_get_statuses, mock_verify_examples):
helper = CIHelper()
await helper.verify_examples([])

mock_get_statuses.assert_called_once_with([])
mock_verify_examples_statuses.assert_called_once_with([])
mock_verify_examples.assert_called_once_with([])

0 comments on commit 4b2343a

Please sign in to comment.