From eaa5806abceb73b5199c505322773ad048a336ca Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Apr 2019 16:50:15 -0400 Subject: [PATCH 1/5] Allow version mismatch in integration test --- .../spark_job/test/integration/iris_test.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/iris_test.py index 86a72c6f0f..eeaa63742b 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/iris_test.py @@ -18,6 +18,7 @@ from lib.exceptions import UserException from lib import Context from test.integration.iris_context import raw_ctx +import consts import pytest from pyspark.sql.types import * @@ -27,6 +28,7 @@ from py4j.protocol import Py4JJavaError from pathlib import Path import os +import copy pytestmark = pytest.mark.usefixtures("spark") @@ -51,26 +53,31 @@ def test_simple_end_to_end(spark): + iris_ctx = copy.deepcopy(raw_ctx) local_storage_path = Path("/workspace/local_storage") local_storage_path.mkdir(parents=True, exist_ok=True) should_ingest = True - workload_id = raw_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"] + + # accommodate hard-coded version in iris_context.py + iris_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION + + workload_id = iris_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"] cols_to_validate = [] - for column_type in raw_ctx["raw_columns"].values(): + for column_type in iris_ctx["raw_columns"].values(): for raw_column in column_type.values(): cols_to_validate.append(raw_column["id"]) iris_data_string = "\n".join(",".join(str(val) for val in line) for line in iris_data) Path(os.path.join(str(local_storage_path), "iris.csv")).write_text(iris_data_string) - raw_ctx["environment_data"]["csv_data"]["path"] = os.path.join( + iris_ctx["environment_data"]["csv_data"]["path"] = os.path.join( str(local_storage_path), "iris.csv" ) ctx = Context( - raw_obj=raw_ctx, cache_dir="/workspace/cache", local_storage_path=str(local_storage_path) + raw_obj=iris_ctx, cache_dir="/workspace/cache", local_storage_path=str(local_storage_path) ) storage = ctx.storage @@ -79,44 +86,44 @@ def test_simple_end_to_end(spark): assert raw_df.count() == 15 assert storage.get_json(ctx.raw_dataset["metadata_key"])["dataset_size"] == 15 for raw_column_id in cols_to_validate: - path = os.path.join(raw_ctx["status_prefix"], raw_column_id, workload_id) + path = os.path.join(iris_ctx["status_prefix"], raw_column_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = raw_column_id status["exist_code"] = "succeeded" - cols_to_aggregate = [r["id"] for r in raw_ctx["aggregates"].values()] + cols_to_aggregate = [r["id"] for r in iris_ctx["aggregates"].values()] spark_job.run_custom_aggregators(spark, ctx, cols_to_aggregate, raw_df) for aggregate_id in cols_to_aggregate: - for aggregate_resource in raw_ctx["aggregates"].values(): + for aggregate_resource in iris_ctx["aggregates"].values(): if aggregate_resource["id"] == aggregate_id: assert local_storage_path.joinpath(aggregate_resource["key"]).exists() - path = os.path.join(raw_ctx["status_prefix"], aggregate_id, workload_id) + path = os.path.join(iris_ctx["status_prefix"], aggregate_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = aggregate_id status["exist_code"] = "succeeded" - cols_to_transform = [r["id"] for r in raw_ctx["transformed_columns"].values()] + cols_to_transform = [r["id"] for r in iris_ctx["transformed_columns"].values()] spark_job.validate_transformers(spark, ctx, cols_to_transform, raw_df) for transformed_id in cols_to_transform: - path = os.path.join(raw_ctx["status_prefix"], transformed_id, workload_id) + path = os.path.join(iris_ctx["status_prefix"], transformed_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = transformed_id status["exist_code"] = "succeeded" - training_datasets = [raw_ctx["models"]["dnn"]["dataset"]["id"]] + training_datasets = [iris_ctx["models"]["dnn"]["dataset"]["id"]] spark_job.create_training_datasets(spark, ctx, training_datasets, raw_df) for dataset_id in training_datasets: - path = os.path.join(raw_ctx["status_prefix"], transformed_id, workload_id) + path = os.path.join(iris_ctx["status_prefix"], transformed_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = transformed_id status["exist_code"] = "succeeded" - dataset = raw_ctx["models"]["dnn"]["dataset"] + dataset = iris_ctx["models"]["dnn"]["dataset"] metadata_key = storage.get_json(dataset["metadata_key"]) assert metadata_key["training_size"] + metadata_key["eval_size"] == 15 assert local_storage_path.joinpath(dataset["train_key"], "_SUCCESS").exists() From c9ea8387993c3350734d0494b23ed7816f39d309 Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Apr 2019 16:54:32 -0400 Subject: [PATCH 2/5] Remove unnecessary variable rename --- .../spark_job/test/integration/iris_test.py | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/iris_test.py index eeaa63742b..1f99808e6e 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/iris_test.py @@ -28,7 +28,6 @@ from py4j.protocol import Py4JJavaError from pathlib import Path import os -import copy pytestmark = pytest.mark.usefixtures("spark") @@ -53,31 +52,30 @@ def test_simple_end_to_end(spark): - iris_ctx = copy.deepcopy(raw_ctx) local_storage_path = Path("/workspace/local_storage") local_storage_path.mkdir(parents=True, exist_ok=True) should_ingest = True # accommodate hard-coded version in iris_context.py - iris_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION + raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION - workload_id = iris_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"] + workload_id = raw_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"] cols_to_validate = [] - for column_type in iris_ctx["raw_columns"].values(): + for column_type in raw_ctx["raw_columns"].values(): for raw_column in column_type.values(): cols_to_validate.append(raw_column["id"]) iris_data_string = "\n".join(",".join(str(val) for val in line) for line in iris_data) Path(os.path.join(str(local_storage_path), "iris.csv")).write_text(iris_data_string) - iris_ctx["environment_data"]["csv_data"]["path"] = os.path.join( + raw_ctx["environment_data"]["csv_data"]["path"] = os.path.join( str(local_storage_path), "iris.csv" ) ctx = Context( - raw_obj=iris_ctx, cache_dir="/workspace/cache", local_storage_path=str(local_storage_path) + raw_obj=raw_ctx, cache_dir="/workspace/cache", local_storage_path=str(local_storage_path) ) storage = ctx.storage @@ -86,44 +84,44 @@ def test_simple_end_to_end(spark): assert raw_df.count() == 15 assert storage.get_json(ctx.raw_dataset["metadata_key"])["dataset_size"] == 15 for raw_column_id in cols_to_validate: - path = os.path.join(iris_ctx["status_prefix"], raw_column_id, workload_id) + path = os.path.join(raw_ctx["status_prefix"], raw_column_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = raw_column_id status["exist_code"] = "succeeded" - cols_to_aggregate = [r["id"] for r in iris_ctx["aggregates"].values()] + cols_to_aggregate = [r["id"] for r in raw_ctx["aggregates"].values()] spark_job.run_custom_aggregators(spark, ctx, cols_to_aggregate, raw_df) for aggregate_id in cols_to_aggregate: - for aggregate_resource in iris_ctx["aggregates"].values(): + for aggregate_resource in raw_ctx["aggregates"].values(): if aggregate_resource["id"] == aggregate_id: assert local_storage_path.joinpath(aggregate_resource["key"]).exists() - path = os.path.join(iris_ctx["status_prefix"], aggregate_id, workload_id) + path = os.path.join(raw_ctx["status_prefix"], aggregate_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = aggregate_id status["exist_code"] = "succeeded" - cols_to_transform = [r["id"] for r in iris_ctx["transformed_columns"].values()] + cols_to_transform = [r["id"] for r in raw_ctx["transformed_columns"].values()] spark_job.validate_transformers(spark, ctx, cols_to_transform, raw_df) for transformed_id in cols_to_transform: - path = os.path.join(iris_ctx["status_prefix"], transformed_id, workload_id) + path = os.path.join(raw_ctx["status_prefix"], transformed_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = transformed_id status["exist_code"] = "succeeded" - training_datasets = [iris_ctx["models"]["dnn"]["dataset"]["id"]] + training_datasets = [raw_ctx["models"]["dnn"]["dataset"]["id"]] spark_job.create_training_datasets(spark, ctx, training_datasets, raw_df) for dataset_id in training_datasets: - path = os.path.join(iris_ctx["status_prefix"], transformed_id, workload_id) + path = os.path.join(raw_ctx["status_prefix"], transformed_id, workload_id) status = storage.get_json(str(path)) status["resource_id"] = transformed_id status["exist_code"] = "succeeded" - dataset = iris_ctx["models"]["dnn"]["dataset"] + dataset = raw_ctx["models"]["dnn"]["dataset"] metadata_key = storage.get_json(dataset["metadata_key"]) assert metadata_key["training_size"] + metadata_key["eval_size"] == 15 assert local_storage_path.joinpath(dataset["train_key"], "_SUCCESS").exists() From 02e13c87c1cd663fd1eece4e7c674c335884d6bb Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Apr 2019 17:37:16 -0400 Subject: [PATCH 3/5] Move cortex config version update to iris_context file --- .../spark_job/test/integration/iris_context.py | 12 +++++++++--- .../spark_job/test/integration/iris_test.py | 11 +++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py index d09d8e5235..a1a4bf383f 100644 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ b/pkg/workloads/spark_job/test/integration/iris_context.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import consts + """ HOW TO GENERATE CONTEXT @@ -25,10 +27,14 @@ from lib.storage import S3 bucket, key = S3.deconstruct_s3_path('s3:///apps//contexts/.msgpack') S3(bucket, client_config={}).get_msgpack(key) - -5. Modify environment_data.csv_data.path to point to the correct input data file """ +def get_raw_ctx(input_data_path): + raw_ctx["environment_data"]["csv_data"]["path"] = input_data_path + raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION + + return raw_ctx + raw_ctx = { "raw_dataset": { "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet", @@ -511,7 +517,7 @@ "cortex_config": { "region": "us-west-2", "log_group": "cortex", - "api_version": "master", + "api_version": consts.CORTEX_VERSION, "id": "da5e65b994ba4ebb069bdc19cf73da64aee79e5d83f466038dc75b3ef04fa63", }, "root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554", diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/iris_test.py index 1f99808e6e..d915929a17 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/iris_test.py @@ -17,8 +17,7 @@ from spark_job import spark_job from lib.exceptions import UserException from lib import Context -from test.integration.iris_context import raw_ctx -import consts +from test.integration.iris_context import get_raw_ctx import pytest from pyspark.sql.types import * @@ -55,9 +54,9 @@ def test_simple_end_to_end(spark): local_storage_path = Path("/workspace/local_storage") local_storage_path.mkdir(parents=True, exist_ok=True) should_ingest = True + input_data_path = os.path.join(str(local_storage_path), "iris.csv") - # accommodate hard-coded version in iris_context.py - raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION + raw_ctx = get_raw_ctx(input_data_path) workload_id = raw_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"] @@ -70,10 +69,6 @@ def test_simple_end_to_end(spark): iris_data_string = "\n".join(",".join(str(val) for val in line) for line in iris_data) Path(os.path.join(str(local_storage_path), "iris.csv")).write_text(iris_data_string) - raw_ctx["environment_data"]["csv_data"]["path"] = os.path.join( - str(local_storage_path), "iris.csv" - ) - ctx = Context( raw_obj=raw_ctx, cache_dir="/workspace/cache", local_storage_path=str(local_storage_path) ) From e3ee1ff995c3b2183f1b93b0161accd88543f7eb Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Apr 2019 18:24:03 -0400 Subject: [PATCH 4/5] Fix linting in iris_context.py --- pkg/workloads/spark_job/test/integration/iris_context.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py index a1a4bf383f..de4315b832 100644 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ b/pkg/workloads/spark_job/test/integration/iris_context.py @@ -29,12 +29,14 @@ S3(bucket, client_config={}).get_msgpack(key) """ + def get_raw_ctx(input_data_path): raw_ctx["environment_data"]["csv_data"]["path"] = input_data_path raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION return raw_ctx + raw_ctx = { "raw_dataset": { "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet", From 3053982e55767d393c92b46155a6f8866df80508 Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Apr 2019 18:41:56 -0400 Subject: [PATCH 5/5] Rename get_raw_ctx to get --- pkg/workloads/spark_job/test/integration/iris_context.py | 4 ++-- pkg/workloads/spark_job/test/integration/iris_test.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py index de4315b832..5379b90887 100644 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ b/pkg/workloads/spark_job/test/integration/iris_context.py @@ -30,7 +30,7 @@ """ -def get_raw_ctx(input_data_path): +def get(input_data_path): raw_ctx["environment_data"]["csv_data"]["path"] = input_data_path raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION @@ -519,7 +519,7 @@ def get_raw_ctx(input_data_path): "cortex_config": { "region": "us-west-2", "log_group": "cortex", - "api_version": consts.CORTEX_VERSION, + "api_version": "master", "id": "da5e65b994ba4ebb069bdc19cf73da64aee79e5d83f466038dc75b3ef04fa63", }, "root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554", diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/iris_test.py index d915929a17..73ff1e9ec1 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/iris_test.py @@ -17,7 +17,7 @@ from spark_job import spark_job from lib.exceptions import UserException from lib import Context -from test.integration.iris_context import get_raw_ctx +from test.integration import iris_context import pytest from pyspark.sql.types import * @@ -56,7 +56,7 @@ def test_simple_end_to_end(spark): should_ingest = True input_data_path = os.path.join(str(local_storage_path), "iris.csv") - raw_ctx = get_raw_ctx(input_data_path) + raw_ctx = iris_context.get(input_data_path) workload_id = raw_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"]