From 91120598f96d496a4bcbd3383771c8d169889eb5 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 22 May 2019 12:34:09 -0400 Subject: [PATCH 1/5] refactor away metadata_key --- pkg/consts/consts.go | 1 + pkg/operator/api/context/context.go | 10 ++-------- pkg/operator/context/aggregates.go | 2 -- pkg/operator/context/aggregators.go | 1 - pkg/operator/context/apis.go | 3 --- pkg/operator/context/constants.go | 1 - pkg/operator/context/context.go | 9 +++++++-- pkg/operator/context/models.go | 2 -- pkg/operator/context/python_packages.go | 2 -- pkg/operator/context/raw_columns.go | 5 ----- pkg/operator/context/transformed_columns.go | 3 --- pkg/operator/context/transformers.go | 1 - pkg/workloads/lib/context.py | 12 +++++++++--- pkg/workloads/spark_job/spark_job.py | 3 +-- pkg/workloads/spark_job/spark_util.py | 3 --- .../spark_job/test/integration/iris_context.py | 2 -- .../spark_job/test/integration/iris_test.py | 4 ++-- pkg/workloads/tf_train/train_util.py | 2 +- 18 files changed, 23 insertions(+), 43 deletions(-) diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index e0835c7c7d..85543ae63a 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -54,6 +54,7 @@ var ( LogPrefixesDir = "log_prefixes" RawColumnsDir = "raw_columns" TransformedColumnsDir = "transformed_columns" + MetadataDir = "metadata" TelemetryURL = "https://telemetry.cortexlabs.dev" ) diff --git a/pkg/operator/api/context/context.go b/pkg/operator/api/context/context.go index 594df54b87..125077cac1 100644 --- a/pkg/operator/api/context/context.go +++ b/pkg/operator/api/context/context.go @@ -30,6 +30,7 @@ type Context struct { CortexConfig *config.CortexConfig `json:"cortex_config"` DatasetVersion string `json:"dataset_version"` Root string `json:"root"` + MetadataRoot string `json:"metadata_root"` RawDataset RawDataset `json:"raw_dataset"` StatusPrefix string `json:"status_prefix"` App *App `json:"app"` @@ -46,8 +47,7 @@ type Context struct { } type RawDataset struct { - Key string `json:"key"` - MetadataKey string `json:"metadata_key"` + Key string `json:"key"` } type Resource interface { @@ -55,7 +55,6 @@ type Resource interface { GetID() string GetIDWithTags() string GetResourceFields() *ResourceFields - GetMetadataKey() string } type ComputedResource interface { @@ -73,7 +72,6 @@ type ResourceFields struct { ID string `json:"id"` IDWithTags string `json:"id_with_tags"` ResourceType resource.Type `json:"resource_type"` - MetadataKey string `json:"metadata_key"` } type ComputedResourceFields struct { @@ -93,10 +91,6 @@ func (r *ResourceFields) GetResourceFields() *ResourceFields { return r } -func (r *ResourceFields) GetMetadataKey() string { - return r.MetadataKey -} - func (r *ComputedResourceFields) GetWorkloadID() string { return r.WorkloadID } diff --git a/pkg/operator/context/aggregates.go b/pkg/operator/context/aggregates.go index d4ea14374e..2cd7cb4b58 100644 --- a/pkg/operator/context/aggregates.go +++ b/pkg/operator/context/aggregates.go @@ -86,7 +86,6 @@ func getAggregates( id, ) aggregateKey := aggregateRootKey + ".msgpack" - aggregateMetadataKey := aggregateRootKey + "_metadata.json" aggregates[aggregateConfig.Name] = &context.Aggregate{ ComputedResourceFields: &context.ComputedResourceFields{ @@ -94,7 +93,6 @@ func getAggregates( ID: id, IDWithTags: idWithTags, ResourceType: resource.AggregateType, - MetadataKey: aggregateMetadataKey, }, }, Aggregate: aggregateConfig, diff --git a/pkg/operator/context/aggregators.go b/pkg/operator/context/aggregators.go index 0129bb658f..2094e2e302 100644 --- a/pkg/operator/context/aggregators.go +++ b/pkg/operator/context/aggregators.go @@ -106,7 +106,6 @@ func newAggregator( ID: id, IDWithTags: id, ResourceType: resource.AggregatorType, - MetadataKey: filepath.Join(consts.AggregatorsDir, id+"_metadata.json"), }, Aggregator: &aggregatorConfig, Namespace: namespace, diff --git a/pkg/operator/context/apis.go b/pkg/operator/context/apis.go index 9f0ef58bae..1ff5045575 100644 --- a/pkg/operator/context/apis.go +++ b/pkg/operator/context/apis.go @@ -18,9 +18,7 @@ package context import ( "bytes" - "path/filepath" - "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/hash" "github.com/cortexlabs/cortex/pkg/operator/api/context" "github.com/cortexlabs/cortex/pkg/operator/api/resource" @@ -50,7 +48,6 @@ func getAPIs(config *userconfig.Config, ID: id, IDWithTags: idWithTags, ResourceType: resource.APIType, - MetadataKey: filepath.Join(consts.APIsDir, id+"_metadata.json"), }, }, API: apiConfig, diff --git a/pkg/operator/context/constants.go b/pkg/operator/context/constants.go index ecc1836d8b..bdb2edcba1 100644 --- a/pkg/operator/context/constants.go +++ b/pkg/operator/context/constants.go @@ -59,7 +59,6 @@ func newConstant(constantConfig userconfig.Constant) (*context.Constant, error) ID: id, IDWithTags: idWithTags, ResourceType: resource.ConstantType, - MetadataKey: filepath.Join(consts.ConstantsDir, id+"_metadata.json"), }, Constant: &constantConfig, Key: filepath.Join(consts.ConstantsDir, id+".msgpack"), diff --git a/pkg/operator/context/context.go b/pkg/operator/context/context.go index 1d9d0d87ce..562b062ea3 100644 --- a/pkg/operator/context/context.go +++ b/pkg/operator/context/context.go @@ -117,9 +117,14 @@ func New( ctx.DatasetVersion, ctx.Environment.ID, ) + + ctx.MetadataRoot = filepath.Join( + ctx.Root, + consts.MetadataDir, + ) + ctx.RawDataset = context.RawDataset{ - Key: filepath.Join(ctx.Root, consts.RawDataDir, "raw.parquet"), - MetadataKey: filepath.Join(ctx.Root, consts.RawDataDir, "metadata.json"), + Key: filepath.Join(ctx.Root, consts.RawDataDir, "raw.parquet"), } ctx.StatusPrefix = StatusPrefix(ctx.App.Name) diff --git a/pkg/operator/context/models.go b/pkg/operator/context/models.go index a1c900e50a..d0323c5cf7 100644 --- a/pkg/operator/context/models.go +++ b/pkg/operator/context/models.go @@ -96,7 +96,6 @@ func getModels( ID: modelID, IDWithTags: modelID, ResourceType: resource.ModelType, - MetadataKey: filepath.Join(root, consts.ModelsDir, modelID+"_metadata.json"), }, }, Model: modelConfig, @@ -114,7 +113,6 @@ func getModels( ID: datasetID, IDWithTags: datasetIDWithTags, ResourceType: resource.TrainingDatasetType, - MetadataKey: filepath.Join(datasetRoot, "metadata.json"), }, }, ModelName: modelConfig.Name, diff --git a/pkg/operator/context/python_packages.go b/pkg/operator/context/python_packages.go index 8cf89da0a5..e87a1753fe 100644 --- a/pkg/operator/context/python_packages.go +++ b/pkg/operator/context/python_packages.go @@ -64,7 +64,6 @@ func loadPythonPackages(files map[string][]byte, datasetVersion string) (context ResourceFields: &context.ResourceFields{ ID: id, ResourceType: resource.PythonPackageType, - MetadataKey: filepath.Join(consts.PythonPackagesDir, id, "metadata.json"), }, }, SrcKey: filepath.Join(consts.PythonPackagesDir, id, "src.txt"), @@ -103,7 +102,6 @@ func loadPythonPackages(files map[string][]byte, datasetVersion string) (context ResourceFields: &context.ResourceFields{ ID: id, ResourceType: resource.PythonPackageType, - MetadataKey: filepath.Join(consts.PythonPackagesDir, id, "metadata.json"), }, }, SrcKey: filepath.Join(consts.PythonPackagesDir, id, "src.zip"), diff --git a/pkg/operator/context/raw_columns.go b/pkg/operator/context/raw_columns.go index a3102cff45..4ccf432aef 100644 --- a/pkg/operator/context/raw_columns.go +++ b/pkg/operator/context/raw_columns.go @@ -18,9 +18,7 @@ package context import ( "bytes" - "path/filepath" - "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/configreader" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/hash" @@ -59,7 +57,6 @@ func getRawColumns( ID: id, IDWithTags: idWithTags, ResourceType: resource.RawColumnType, - MetadataKey: filepath.Join(consts.RawColumnsDir, id+"_metadata.json"), }, }, RawIntColumn: typedColumnConfig, @@ -77,7 +74,6 @@ func getRawColumns( ID: id, IDWithTags: idWithTags, ResourceType: resource.RawColumnType, - MetadataKey: filepath.Join(consts.RawColumnsDir, id+"_metadata.json"), }, }, RawFloatColumn: typedColumnConfig, @@ -93,7 +89,6 @@ func getRawColumns( ID: id, IDWithTags: idWithTags, ResourceType: resource.RawColumnType, - MetadataKey: filepath.Join(consts.RawColumnsDir, id+"_metadata.json"), }, }, RawStringColumn: typedColumnConfig, diff --git a/pkg/operator/context/transformed_columns.go b/pkg/operator/context/transformed_columns.go index bcc94cea9c..21b7dad144 100644 --- a/pkg/operator/context/transformed_columns.go +++ b/pkg/operator/context/transformed_columns.go @@ -18,9 +18,7 @@ package context import ( "bytes" - "path/filepath" - "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/hash" s "github.com/cortexlabs/cortex/pkg/lib/strings" @@ -82,7 +80,6 @@ func getTransformedColumns( ID: id, IDWithTags: idWithTags, ResourceType: resource.TransformedColumnType, - MetadataKey: filepath.Join(consts.TransformedColumnsDir, id+"_metadata.json"), }, }, TransformedColumn: transformedColumnConfig, diff --git a/pkg/operator/context/transformers.go b/pkg/operator/context/transformers.go index a335ddcd52..859e0fe75f 100644 --- a/pkg/operator/context/transformers.go +++ b/pkg/operator/context/transformers.go @@ -103,7 +103,6 @@ func newTransformer( ID: id, IDWithTags: id, ResourceType: resource.TransformerType, - MetadataKey: filepath.Join(consts.TransformersDir, id+"_metadata.json"), }, Transformer: &transConfig, Namespace: namespace, diff --git a/pkg/workloads/lib/context.py b/pkg/workloads/lib/context.py index d8286c1c63..51c545e27f 100644 --- a/pkg/workloads/lib/context.py +++ b/pkg/workloads/lib/context.py @@ -468,14 +468,20 @@ def upload_resource_status_end(self, exit_code, *resources): def resource_status_key(self, resource): return os.path.join(self.status_prefix, resource["id"], resource["workload_id"]) - def write_metadata(self, resource_id, metadata_key, metadata): + def get_metadata_url(self, resource_id): + return os.path.join(self.ctx["metadata_root"], resource_id+".json") + + + def write_metadata(self, resource_id, metadata): + metadata_key = self.get_metadata_url(resource_id) if resource_id in self._metadatas and self._metadatas[resource_id] == metadata: return self._metadatas[resource_id] = metadata self.storage.put_json(metadata, metadata_key) - def get_metadata(self, resource_id, metadata_key, use_cache=True): + def get_metadata(self, resource_id, use_cache=True): + metadata_key = self.get_metadata_url(resource_id) if use_cache and resource_id in self._metadatas: return self._metadatas[resource_id] @@ -487,7 +493,7 @@ def get_inferred_column_type(self, column_name): column = self.columns[column_name] column_type = self.columns[column_name].get("type", "unknown") if column_type == "unknown": - column_type = self.get_metadata(column["id"], column["metadata_key"])["type"] + column_type = self.get_metadata(column["id"])["type"] self.columns[column_name]["type"] = column_type return column_type diff --git a/pkg/workloads/spark_job/spark_job.py b/pkg/workloads/spark_job/spark_job.py index 53247f090d..1eb0e00290 100644 --- a/pkg/workloads/spark_job/spark_job.py +++ b/pkg/workloads/spark_job/spark_job.py @@ -91,7 +91,7 @@ def parse_args(args): def validate_dataset(ctx, raw_df, cols_to_validate): - total_row_count = ctx.get_metadata(ctx.raw_dataset["key"], ctx.raw_dataset["metadata_key"])[ + total_row_count = ctx.get_metadata(ctx.raw_dataset["key"])[ "dataset_size" ] conditions_dict = spark_util.value_check_data(ctx, raw_df, cols_to_validate) @@ -164,7 +164,6 @@ def ingest_raw_dataset(spark, ctx, cols_to_validate, should_ingest): written_count = write_raw_dataset(ingest_df, ctx, spark) ctx.write_metadata( ctx.raw_dataset["key"], - ctx.raw_dataset["metadata_key"], {"dataset_size": written_count}, ) if written_count != full_dataset_size: diff --git a/pkg/workloads/spark_job/spark_util.py b/pkg/workloads/spark_job/spark_util.py index 558eb8b99a..26082a9d20 100644 --- a/pkg/workloads/spark_job/spark_util.py +++ b/pkg/workloads/spark_job/spark_util.py @@ -121,7 +121,6 @@ def write_training_data(model_name, df, ctx, spark): ctx.write_metadata( training_dataset["id"], - training_dataset["metadata_key"], {"training_size": train_df_acc.value, "eval_size": eval_df_acc.value}, ) @@ -528,7 +527,6 @@ def validate_transformer(column_name, test_df, ctx, spark): ctx.write_metadata( transformed_column["id"], - transformed_column["metadata_key"], {"type": inferred_python_type}, ) @@ -595,7 +593,6 @@ def validate_transformer(column_name, test_df, ctx, spark): inferred_spark_type = transform_spark_df.select(column_name).schema[0].dataType ctx.write_metadata( transformed_column["id"], - transformed_column["metadata_key"], {"type": inferred_spark_type}, ) diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py index 5379b90887..2eb3a0dd31 100644 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ b/pkg/workloads/spark_job/test/integration/iris_context.py @@ -40,7 +40,6 @@ def get(input_data_path): raw_ctx = { "raw_dataset": { "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet", - "metadata_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/metadata.json", }, "aggregates": { "class_index": { @@ -469,7 +468,6 @@ def get(input_data_path): "prediction_key": "", "workload_id": "aokhfrzyw6ju730nbwli", "dataset": { - "metadata_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/metadata.json", "train_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/train.tfrecord", "workload_id": "jjd3l0fi4fhwqtgmpatg", "eval_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/eval.tfrecord", diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/iris_test.py index 35eca5e371..ea4c616375 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/iris_test.py @@ -78,7 +78,7 @@ def test_simple_end_to_end(spark): assert raw_df.count() == 15 assert ( - ctx.get_metadata(ctx.raw_dataset["key"], ctx.raw_dataset["metadata_key"])["dataset_size"] + ctx.get_metadata(ctx.raw_dataset["key"])["dataset_size"] == 15 ) for raw_column_id in cols_to_validate: @@ -120,7 +120,7 @@ def test_simple_end_to_end(spark): status["exist_code"] = "succeeded" dataset = raw_ctx["models"]["dnn"]["dataset"] - metadata = ctx.get_metadata(dataset["id"], dataset["metadata_key"]) + metadata = ctx.get_metadata(dataset["id"]) assert metadata["training_size"] + metadata["eval_size"] == 15 assert local_storage_path.joinpath(dataset["train_key"], "_SUCCESS").exists() assert local_storage_path.joinpath(dataset["eval_key"], "_SUCCESS").exists() diff --git a/pkg/workloads/tf_train/train_util.py b/pkg/workloads/tf_train/train_util.py index d41a3b4995..9e36a08ae6 100644 --- a/pkg/workloads/tf_train/train_util.py +++ b/pkg/workloads/tf_train/train_util.py @@ -149,7 +149,7 @@ def train(model_name, model_impl, ctx, model_dir): exporter = tf.estimator.FinalExporter("estimator", serving_input_fn, as_text=False) train_num_steps = model["training"]["num_steps"] - dataset_metadata = ctx.get_metadata(model["dataset"]["id"], model["dataset"]["metadata_key"]) + dataset_metadata = ctx.get_metadata(model["dataset"]["id"]) if model["training"]["num_epochs"]: train_num_steps = ( math.ceil(dataset_metadata["training_size"] / float(model["training"]["batch_size"])) From a943282ec48c3632288ec9dc002d2bf7f728bdbd Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 22 May 2019 12:35:30 -0400 Subject: [PATCH 2/5] format --- pkg/workloads/lib/context.py | 3 +-- pkg/workloads/spark_job/spark_job.py | 9 ++------- pkg/workloads/spark_job/spark_util.py | 10 ++-------- .../spark_job/test/integration/iris_context.py | 2 +- pkg/workloads/spark_job/test/integration/iris_test.py | 5 +---- 5 files changed, 7 insertions(+), 22 deletions(-) diff --git a/pkg/workloads/lib/context.py b/pkg/workloads/lib/context.py index 51c545e27f..b9cb01e033 100644 --- a/pkg/workloads/lib/context.py +++ b/pkg/workloads/lib/context.py @@ -469,8 +469,7 @@ def resource_status_key(self, resource): return os.path.join(self.status_prefix, resource["id"], resource["workload_id"]) def get_metadata_url(self, resource_id): - return os.path.join(self.ctx["metadata_root"], resource_id+".json") - + return os.path.join(self.ctx["metadata_root"], resource_id + ".json") def write_metadata(self, resource_id, metadata): metadata_key = self.get_metadata_url(resource_id) diff --git a/pkg/workloads/spark_job/spark_job.py b/pkg/workloads/spark_job/spark_job.py index 1eb0e00290..253bb2b093 100644 --- a/pkg/workloads/spark_job/spark_job.py +++ b/pkg/workloads/spark_job/spark_job.py @@ -91,9 +91,7 @@ def parse_args(args): def validate_dataset(ctx, raw_df, cols_to_validate): - total_row_count = ctx.get_metadata(ctx.raw_dataset["key"])[ - "dataset_size" - ] + total_row_count = ctx.get_metadata(ctx.raw_dataset["key"])["dataset_size"] conditions_dict = spark_util.value_check_data(ctx, raw_df, cols_to_validate) if len(conditions_dict) > 0: @@ -162,10 +160,7 @@ def ingest_raw_dataset(spark, ctx, cols_to_validate, should_ingest): ingest_df = limit_dataset(full_dataset_size, ingest_df, ctx.environment["limit"]) written_count = write_raw_dataset(ingest_df, ctx, spark) - ctx.write_metadata( - ctx.raw_dataset["key"], - {"dataset_size": written_count}, - ) + ctx.write_metadata(ctx.raw_dataset["key"], {"dataset_size": written_count}) if written_count != full_dataset_size: logger.info( "{} rows read, {} rows dropped, {} rows ingested".format( diff --git a/pkg/workloads/spark_job/spark_util.py b/pkg/workloads/spark_job/spark_util.py index 26082a9d20..92ce98a554 100644 --- a/pkg/workloads/spark_job/spark_util.py +++ b/pkg/workloads/spark_job/spark_util.py @@ -525,10 +525,7 @@ def validate_transformer(column_name, test_df, ctx, spark): + inferred_python_type, ) - ctx.write_metadata( - transformed_column["id"], - {"type": inferred_python_type}, - ) + ctx.write_metadata(transformed_column["id"], {"type": inferred_python_type}) transform_python_collect = execute_transform_python( column_name, test_df, ctx, spark, validate=True @@ -591,10 +588,7 @@ def validate_transformer(column_name, test_df, ctx, spark): if transformer["output_type"] == "unknown": inferred_spark_type = transform_spark_df.select(column_name).schema[0].dataType - ctx.write_metadata( - transformed_column["id"], - {"type": inferred_spark_type}, - ) + ctx.write_metadata(transformed_column["id"], {"type": inferred_spark_type}) # perform the necessary upcast/downcast for the column e.g INT -> LONG or DOUBLE -> FLOAT transform_spark_df = transform_spark_df.withColumn( diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py index 2eb3a0dd31..d7b2514d15 100644 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ b/pkg/workloads/spark_job/test/integration/iris_context.py @@ -39,7 +39,7 @@ def get(input_data_path): raw_ctx = { "raw_dataset": { - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet", + "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet" }, "aggregates": { "class_index": { diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/iris_test.py index ea4c616375..7fcff3d3c4 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/iris_test.py @@ -77,10 +77,7 @@ def test_simple_end_to_end(spark): raw_df = spark_job.ingest_raw_dataset(spark, ctx, cols_to_validate, should_ingest) assert raw_df.count() == 15 - assert ( - ctx.get_metadata(ctx.raw_dataset["key"])["dataset_size"] - == 15 - ) + assert ctx.get_metadata(ctx.raw_dataset["key"])["dataset_size"] == 15 for raw_column_id in cols_to_validate: path = os.path.join(raw_ctx["status_prefix"], raw_column_id, workload_id) status = storage.get_json(str(path)) From d1ca3bfa43819954a34b675539463bbaec3bcd83 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 22 May 2019 13:59:05 -0400 Subject: [PATCH 3/5] fix test and clean up code --- pkg/workloads/lib/context.py | 6 ++---- pkg/workloads/spark_job/test/integration/iris_context.py | 1 + 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/workloads/lib/context.py b/pkg/workloads/lib/context.py index b9cb01e033..50ed2c6c7f 100644 --- a/pkg/workloads/lib/context.py +++ b/pkg/workloads/lib/context.py @@ -472,19 +472,17 @@ def get_metadata_url(self, resource_id): return os.path.join(self.ctx["metadata_root"], resource_id + ".json") def write_metadata(self, resource_id, metadata): - metadata_key = self.get_metadata_url(resource_id) if resource_id in self._metadatas and self._metadatas[resource_id] == metadata: return self._metadatas[resource_id] = metadata - self.storage.put_json(metadata, metadata_key) + self.storage.put_json(metadata, self.get_metadata_url(resource_id)) def get_metadata(self, resource_id, use_cache=True): - metadata_key = self.get_metadata_url(resource_id) if use_cache and resource_id in self._metadatas: return self._metadatas[resource_id] - metadata = self.storage.get_json(metadata_key, allow_missing=True) + metadata = self.storage.get_json(self.get_metadata_url(resource_id), allow_missing=True) self._metadatas[resource_id] = metadata return metadata diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py index d7b2514d15..6de14ffe32 100644 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ b/pkg/workloads/spark_job/test/integration/iris_context.py @@ -521,6 +521,7 @@ def get(input_data_path): "id": "da5e65b994ba4ebb069bdc19cf73da64aee79e5d83f466038dc75b3ef04fa63", }, "root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554", + "metadata_root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/metadata", "aggregators": { "cortex.mean": { "id": "a68b354ddadc2e14348698e03af74db72cba92d7acb162e3163629e3e343373", From 04697f1f2c4f34f652f1577e0becf984deed84a3 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 22 May 2019 14:03:51 -0400 Subject: [PATCH 4/5] clean up aggregate --- pkg/operator/context/aggregates.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/operator/context/aggregates.go b/pkg/operator/context/aggregates.go index 2cd7cb4b58..b80b86cd26 100644 --- a/pkg/operator/context/aggregates.go +++ b/pkg/operator/context/aggregates.go @@ -80,12 +80,11 @@ func getAggregates( buf.WriteString(aggregateConfig.Tags.ID()) idWithTags := hash.Bytes(buf.Bytes()) - aggregateRootKey := filepath.Join( + aggregateKey := filepath.Join( root, consts.AggregatesDir, id, - ) - aggregateKey := aggregateRootKey + ".msgpack" + ) + ".msgpack" aggregates[aggregateConfig.Name] = &context.Aggregate{ ComputedResourceFields: &context.ComputedResourceFields{ @@ -97,7 +96,7 @@ func getAggregates( }, Aggregate: aggregateConfig, Type: aggregator.OutputType, - Key: aggregateKey, + Key: aggregateKey + ".msgpack", } } From 059239a727cca95e4e281b075e97bb2ab618029e Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 22 May 2019 14:51:22 -0400 Subject: [PATCH 5/5] remove extra msgpack --- pkg/operator/context/aggregates.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/context/aggregates.go b/pkg/operator/context/aggregates.go index b80b86cd26..318718a61c 100644 --- a/pkg/operator/context/aggregates.go +++ b/pkg/operator/context/aggregates.go @@ -96,7 +96,7 @@ func getAggregates( }, Aggregate: aggregateConfig, Type: aggregator.OutputType, - Key: aggregateKey + ".msgpack", + Key: aggregateKey, } }