diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index d8ca9dd30c813..1f46401dcb13c 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -29,7 +29,7 @@ from tqdm import tqdm from google.cloud import storage -from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA +from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA, PrecompiledObjectType from config import Config, PrecompiledExample from grpc_client import GRPCClient from helper import Example, get_statuses @@ -74,12 +74,13 @@ async def _get_outputs(self, examples: List[Example]): tasks = [client.get_log(example.pipeline_id) for example in examples] logs = await asyncio.gather(*tasks) - if len(examples) > 0 and (examples[0].sdk is SDK_PYTHON or examples[0].sdk is SDK_JAVA): + if len(examples) > 0 and (examples[0].sdk is SDK_PYTHON or + examples[0].sdk is SDK_JAVA): tasks = [client.get_graph(example.pipeline_id) for example in examples] graphs = await asyncio.gather(*tasks) for graph, example in zip(graphs, examples): - example.graph = graph + example.graph = graph for output, example in zip(outputs, examples): example.output = output @@ -117,38 +118,43 @@ def _write_to_local_fs(self, example: Example): Config.TEMP_FOLDER, example.pipeline_id, Sdk.Name(example.sdk), + PrecompiledObjectType.Name(example.type), example.tag.name) Path(path_to_object_folder).mkdir(parents=True, exist_ok=True) file_names = {} code_path = self._get_gcs_object_name( sdk=example.sdk, + type=example.type, base_folder_name=example.tag.name, file_name=example.tag.name) output_path = self._get_gcs_object_name( sdk=example.sdk, + type=example.type, base_folder_name=example.tag.name, file_name=example.tag.name, extension=PrecompiledExample.OUTPUT_EXTENSION) log_path = self._get_gcs_object_name( sdk=example.sdk, + type=example.type, base_folder_name=example.tag.name, file_name=example.tag.name, extension=PrecompiledExample.LOG_EXTENSION) graph_path = self._get_gcs_object_name( sdk=example.sdk, + type=example.type, base_folder_name=example.tag.name, file_name=example.tag.name, extension=PrecompiledExample.GRAPH_EXTENSION) meta_path = self._get_gcs_object_name( sdk=example.sdk, + type=example.type, base_folder_name=example.tag.name, file_name=PrecompiledExample.META_NAME, extension=PrecompiledExample.META_EXTENSION) file_names[code_path] = example.code file_names[output_path] = example.output meta = example.tag._asdict() - meta["type"] = example.type meta["link"] = example.link file_names[meta_path] = json.dumps(meta) file_names[log_path] = example.logs @@ -166,6 +172,7 @@ def _write_to_local_fs(self, example: Example): def _get_gcs_object_name( self, sdk: Sdk, + type: PrecompiledObjectType, base_folder_name: str, file_name: str, extension: str = None): @@ -174,6 +181,7 @@ def _get_gcs_object_name( Args: sdk: sdk of the example + type: type of the example file_name: name of the example base_folder_name: name of the folder where example is stored (eq. to example name) @@ -184,7 +192,10 @@ def _get_gcs_object_name( if extension is None: extension = Config.SDK_TO_EXTENSION[sdk] return os.path.join( - Sdk.Name(sdk), base_folder_name, f"{file_name}.{extension}") + Sdk.Name(sdk), + PrecompiledObjectType.Name(type), + base_folder_name, + f"{file_name}.{extension}") def _upload_blob(self, source_file: str, destination_blob_name: str): """ diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index c4fa47d673699..9ae3cc55e88da 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -36,6 +36,7 @@ class CIHelper: It is used to find and verify correctness if beam examples/katas/tests. """ + async def verify_examples(self, examples: List[Example]): """ Verify correctness of beam examples. diff --git a/playground/infrastructure/config.py b/playground/infrastructure/config.py index a8dc4b7059a86..7cc816ccb3488 100644 --- a/playground/infrastructure/config.py +++ b/playground/infrastructure/config.py @@ -23,7 +23,8 @@ from api.v1.api_pb2 import STATUS_VALIDATION_ERROR, STATUS_ERROR, \ STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, \ - STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR, SDK_JAVA, SDK_GO, SDK_PYTHON, SDK_SCIO, Sdk + STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR, SDK_JAVA, SDK_GO, SDK_PYTHON, \ + SDK_SCIO, Sdk @dataclass(frozen=True) @@ -32,11 +33,19 @@ class Config: General configuration for CI/CD steps """ SERVER_ADDRESS = os.getenv("SERVER_ADDRESS", "localhost:8080") - EXTENSION_TO_SDK = {"java": SDK_JAVA, "go": SDK_GO, "py": SDK_PYTHON, "scala": SDK_SCIO} - SUPPORTED_SDK = (Sdk.Name(SDK_JAVA), Sdk.Name(SDK_GO), Sdk.Name(SDK_PYTHON), Sdk.Name(SDK_SCIO)) + EXTENSION_TO_SDK = { + "java": SDK_JAVA, "go": SDK_GO, "py": SDK_PYTHON, "scala": SDK_SCIO + } + SUPPORTED_SDK = ( + Sdk.Name(SDK_JAVA), + Sdk.Name(SDK_GO), + Sdk.Name(SDK_PYTHON), + Sdk.Name(SDK_SCIO)) BUCKET_NAME = "playground-precompiled-objects" TEMP_FOLDER = "temp" - SDK_TO_EXTENSION = {SDK_JAVA: "java", SDK_GO: "go", SDK_PYTHON: "py", SDK_SCIO: "scala"} + SDK_TO_EXTENSION = { + SDK_JAVA: "java", SDK_GO: "go", SDK_PYTHON: "py", SDK_SCIO: "scala" + } NO_STORE = "no-store" ERROR_STATUSES = [ STATUS_VALIDATION_ERROR, @@ -81,6 +90,7 @@ class PrecompiledExampleType: katas = "katas" test_ends = ("test", "it") + @dataclass(frozen=True) class OptionalTagFields: pipeline_options: str = "pipeline_options" diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py index 0aac87b65edb4..ddd915236f841 100644 --- a/playground/infrastructure/grpc_client.py +++ b/playground/infrastructure/grpc_client.py @@ -145,7 +145,7 @@ async def get_graph(self, pipeline_uuid: str) -> str: try: response = await self._stub.GetGraph(request) return response.graph - except grpc.RpcError as e: + except grpc.RpcError: # Some examples doesn't have graph (katas) return "" diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index 7dd36efaaf8c2..f90cd8796cb40 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -278,9 +278,12 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool: In case tag is not valid, False """ valid = True - required_tag_fields = {f.default for f in fields(TagFields) - if f.default not in - {o_f.default for o_f in fields(OptionalTagFields)}} + required_tag_fields = { + f.default + for f in fields(TagFields) + if f.default not in {o_f.default + for o_f in fields(OptionalTagFields)} + } # check that all fields exist and they have no empty value for field in required_tag_fields: if field not in tag: @@ -294,8 +297,7 @@ def _validate(tag: dict, supported_categories: List[str]) -> bool: valid = False if valid is True: value = tag.get(field) - if (value == "" or - value is None) and field != TagFields.pipeline_options: + if (value == "" or value is None) and field != TagFields.pipeline_options: logging.error( "tag's value is incorrect: %s\n%s field can not be empty.", tag, diff --git a/playground/infrastructure/test_cd_helper.py b/playground/infrastructure/test_cd_helper.py index d9d56faa29ce9..681851b544e8d 100644 --- a/playground/infrastructure/test_cd_helper.py +++ b/playground/infrastructure/test_cd_helper.py @@ -18,7 +18,8 @@ import pytest -from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED +from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED, \ + PRECOMPILED_OBJECT_TYPE_UNIT_TEST from cd_helper import CDHelper from config import Config from helper import Example, Tag @@ -47,12 +48,17 @@ def test__get_gcs_object_name(): """ Test getting the path where file will be stored at the bucket """ - expected_result = "SDK_JAVA/base_folder/file.java" - expected_result_with_extension = "SDK_JAVA/base_folder/file.output" + expected_path = "SDK_JAVA/PRECOMPILED_OBJECT_TYPE_UNIT_TEST/base_folder" + expected_result = "%s/%s" % (expected_path, "file.java") + expected_result_with_extension = "%s/%s" % (expected_path, "file.output") assert CDHelper()._get_gcs_object_name( - SDK_JAVA, "base_folder", "file") == expected_result + SDK_JAVA, PRECOMPILED_OBJECT_TYPE_UNIT_TEST, "base_folder", + "file") == expected_result assert CDHelper()._get_gcs_object_name( - SDK_JAVA, "base_folder", "file", + SDK_JAVA, + PRECOMPILED_OBJECT_TYPE_UNIT_TEST, + "base_folder", + "file", "output") == expected_result_with_extension @@ -81,11 +87,18 @@ def test__write_to_local_fs(delete_temp_folder): status=STATUS_UNSPECIFIED, tag=Tag(**object_meta), link="link") + result_filepath = "SDK_JAVA/PRECOMPILED_OBJECT_TYPE_UNSPECIFIED/name" expected_result = { - "SDK_JAVA/name/name.java": "temp/pipeline_id/SDK_JAVA/name/name.java", - "SDK_JAVA/name/name.output": "temp/pipeline_id/SDK_JAVA/name/name.output", - "SDK_JAVA/name/name.log": "temp/pipeline_id/SDK_JAVA/name/name.log", - "SDK_JAVA/name/meta.info": "temp/pipeline_id/SDK_JAVA/name/meta.info" + "%s/%s" % (result_filepath, "name.java"): "%s/%s/%s" % + ("temp/pipeline_id", result_filepath, "name.java"), + "%s/%s" % (result_filepath, "name.output"): "%s/%s/%s" % + ("temp/pipeline_id", result_filepath, "name.output"), + "%s/%s" % (result_filepath, "name.log"): "%s/%s/%s" % + ("temp/pipeline_id", result_filepath, "name.log"), + "%s/%s" % (result_filepath, "name.graph"): "%s/%s/%s" % + ("temp/pipeline_id", result_filepath, "name.graph"), + "%s/%s" % (result_filepath, "meta.info"): "%s/%s/%s" % + ("temp/pipeline_id", result_filepath, "meta.info"), } assert CDHelper()._write_to_local_fs(example) == expected_result diff --git a/playground/infrastructure/test_ci_helper.py b/playground/infrastructure/test_ci_helper.py index 2f9aff96a9ca2..df9b32da6f29c 100644 --- a/playground/infrastructure/test_ci_helper.py +++ b/playground/infrastructure/test_ci_helper.py @@ -20,12 +20,11 @@ @pytest.mark.asyncio -@mock.patch("ci_helper.CIHelper._verify_examples_status") +@mock.patch("ci_helper.CIHelper._verify_examples") @mock.patch("ci_helper.get_statuses") -async def test_verify_examples( - mock_get_statuses, mock_verify_examples_statuses): +async def test_verify_examples(mock_get_statuses, mock_verify_examples): helper = CIHelper() await helper.verify_examples([]) mock_get_statuses.assert_called_once_with([]) - mock_verify_examples_statuses.assert_called_once_with([]) + mock_verify_examples.assert_called_once_with([])