Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
LogPrefixesDir = "log_prefixes"
RawColumnsDir = "raw_columns"
TransformedColumnsDir = "transformed_columns"
MetadataDir = "metadata"

TelemetryURL = "https://telemetry.cortexlabs.dev"
)
10 changes: 2 additions & 8 deletions pkg/operator/api/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Context struct {
CortexConfig *config.CortexConfig `json:"cortex_config"`
DatasetVersion string `json:"dataset_version"`
Root string `json:"root"`
MetadataRoot string `json:"metadata_root"`
RawDataset RawDataset `json:"raw_dataset"`
StatusPrefix string `json:"status_prefix"`
App *App `json:"app"`
Expand All @@ -46,16 +47,14 @@ type Context struct {
}

type RawDataset struct {
Key string `json:"key"`
MetadataKey string `json:"metadata_key"`
Key string `json:"key"`
}

type Resource interface {
userconfig.Resource
GetID() string
GetIDWithTags() string
GetResourceFields() *ResourceFields
GetMetadataKey() string
}

type ComputedResource interface {
Expand All @@ -73,7 +72,6 @@ type ResourceFields struct {
ID string `json:"id"`
IDWithTags string `json:"id_with_tags"`
ResourceType resource.Type `json:"resource_type"`
MetadataKey string `json:"metadata_key"`
}

type ComputedResourceFields struct {
Expand All @@ -93,10 +91,6 @@ func (r *ResourceFields) GetResourceFields() *ResourceFields {
return r
}

func (r *ResourceFields) GetMetadataKey() string {
return r.MetadataKey
}

func (r *ComputedResourceFields) GetWorkloadID() string {
return r.WorkloadID
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/operator/context/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,18 @@ func getAggregates(
buf.WriteString(aggregateConfig.Tags.ID())
idWithTags := hash.Bytes(buf.Bytes())

aggregateRootKey := filepath.Join(
aggregateKey := filepath.Join(
root,
consts.AggregatesDir,
id,
)
aggregateKey := aggregateRootKey + ".msgpack"
aggregateMetadataKey := aggregateRootKey + "_metadata.json"
) + ".msgpack"

aggregates[aggregateConfig.Name] = &context.Aggregate{
ComputedResourceFields: &context.ComputedResourceFields{
ResourceFields: &context.ResourceFields{
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.AggregateType,
MetadataKey: aggregateMetadataKey,
},
},
Aggregate: aggregateConfig,
Expand Down
1 change: 0 additions & 1 deletion pkg/operator/context/aggregators.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func newAggregator(
ID: id,
IDWithTags: id,
ResourceType: resource.AggregatorType,
MetadataKey: filepath.Join(consts.AggregatorsDir, id+"_metadata.json"),
},
Aggregator: &aggregatorConfig,
Namespace: namespace,
Expand Down
3 changes: 0 additions & 3 deletions pkg/operator/context/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package context

import (
"bytes"
"path/filepath"

"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/hash"
"github.com/cortexlabs/cortex/pkg/operator/api/context"
"github.com/cortexlabs/cortex/pkg/operator/api/resource"
Expand Down Expand Up @@ -50,7 +48,6 @@ func getAPIs(config *userconfig.Config,
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.APIType,
MetadataKey: filepath.Join(consts.APIsDir, id+"_metadata.json"),
},
},
API: apiConfig,
Expand Down
1 change: 0 additions & 1 deletion pkg/operator/context/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func newConstant(constantConfig userconfig.Constant) (*context.Constant, error)
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.ConstantType,
MetadataKey: filepath.Join(consts.ConstantsDir, id+"_metadata.json"),
},
Constant: &constantConfig,
Key: filepath.Join(consts.ConstantsDir, id+".msgpack"),
Expand Down
9 changes: 7 additions & 2 deletions pkg/operator/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ func New(
ctx.DatasetVersion,
ctx.Environment.ID,
)

ctx.MetadataRoot = filepath.Join(
ctx.Root,
consts.MetadataDir,
)

ctx.RawDataset = context.RawDataset{
Key: filepath.Join(ctx.Root, consts.RawDataDir, "raw.parquet"),
MetadataKey: filepath.Join(ctx.Root, consts.RawDataDir, "metadata.json"),
Key: filepath.Join(ctx.Root, consts.RawDataDir, "raw.parquet"),
}

ctx.StatusPrefix = StatusPrefix(ctx.App.Name)
Expand Down
2 changes: 0 additions & 2 deletions pkg/operator/context/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func getModels(
ID: modelID,
IDWithTags: modelID,
ResourceType: resource.ModelType,
MetadataKey: filepath.Join(root, consts.ModelsDir, modelID+"_metadata.json"),
},
},
Model: modelConfig,
Expand All @@ -114,7 +113,6 @@ func getModels(
ID: datasetID,
IDWithTags: datasetIDWithTags,
ResourceType: resource.TrainingDatasetType,
MetadataKey: filepath.Join(datasetRoot, "metadata.json"),
},
},
ModelName: modelConfig.Name,
Expand Down
2 changes: 0 additions & 2 deletions pkg/operator/context/python_packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func loadPythonPackages(files map[string][]byte, datasetVersion string) (context
ResourceFields: &context.ResourceFields{
ID: id,
ResourceType: resource.PythonPackageType,
MetadataKey: filepath.Join(consts.PythonPackagesDir, id, "metadata.json"),
},
},
SrcKey: filepath.Join(consts.PythonPackagesDir, id, "src.txt"),
Expand Down Expand Up @@ -103,7 +102,6 @@ func loadPythonPackages(files map[string][]byte, datasetVersion string) (context
ResourceFields: &context.ResourceFields{
ID: id,
ResourceType: resource.PythonPackageType,
MetadataKey: filepath.Join(consts.PythonPackagesDir, id, "metadata.json"),
},
},
SrcKey: filepath.Join(consts.PythonPackagesDir, id, "src.zip"),
Expand Down
5 changes: 0 additions & 5 deletions pkg/operator/context/raw_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package context

import (
"bytes"
"path/filepath"

"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/configreader"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/hash"
Expand Down Expand Up @@ -59,7 +57,6 @@ func getRawColumns(
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.RawColumnType,
MetadataKey: filepath.Join(consts.RawColumnsDir, id+"_metadata.json"),
},
},
RawIntColumn: typedColumnConfig,
Expand All @@ -77,7 +74,6 @@ func getRawColumns(
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.RawColumnType,
MetadataKey: filepath.Join(consts.RawColumnsDir, id+"_metadata.json"),
},
},
RawFloatColumn: typedColumnConfig,
Expand All @@ -93,7 +89,6 @@ func getRawColumns(
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.RawColumnType,
MetadataKey: filepath.Join(consts.RawColumnsDir, id+"_metadata.json"),
},
},
RawStringColumn: typedColumnConfig,
Expand Down
3 changes: 0 additions & 3 deletions pkg/operator/context/transformed_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package context

import (
"bytes"
"path/filepath"

"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/hash"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
Expand Down Expand Up @@ -82,7 +80,6 @@ func getTransformedColumns(
ID: id,
IDWithTags: idWithTags,
ResourceType: resource.TransformedColumnType,
MetadataKey: filepath.Join(consts.TransformedColumnsDir, id+"_metadata.json"),
},
},
TransformedColumn: transformedColumnConfig,
Expand Down
1 change: 0 additions & 1 deletion pkg/operator/context/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func newTransformer(
ID: id,
IDWithTags: id,
ResourceType: resource.TransformerType,
MetadataKey: filepath.Join(consts.TransformersDir, id+"_metadata.json"),
},
Transformer: &transConfig,
Namespace: namespace,
Expand Down
13 changes: 8 additions & 5 deletions pkg/workloads/lib/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,26 +468,29 @@ def upload_resource_status_end(self, exit_code, *resources):
def resource_status_key(self, resource):
return os.path.join(self.status_prefix, resource["id"], resource["workload_id"])

def write_metadata(self, resource_id, metadata_key, metadata):
def get_metadata_url(self, resource_id):
return os.path.join(self.ctx["metadata_root"], resource_id + ".json")

def write_metadata(self, resource_id, metadata):
if resource_id in self._metadatas and self._metadatas[resource_id] == metadata:
return

self._metadatas[resource_id] = metadata
self.storage.put_json(metadata, metadata_key)
self.storage.put_json(metadata, self.get_metadata_url(resource_id))

def get_metadata(self, resource_id, metadata_key, use_cache=True):
def get_metadata(self, resource_id, use_cache=True):
if use_cache and resource_id in self._metadatas:
return self._metadatas[resource_id]

metadata = self.storage.get_json(metadata_key, allow_missing=True)
metadata = self.storage.get_json(self.get_metadata_url(resource_id), allow_missing=True)
self._metadatas[resource_id] = metadata
return metadata

def get_inferred_column_type(self, column_name):
column = self.columns[column_name]
column_type = self.columns[column_name].get("type", "unknown")
if column_type == "unknown":
column_type = self.get_metadata(column["id"], column["metadata_key"])["type"]
column_type = self.get_metadata(column["id"])["type"]
self.columns[column_name]["type"] = column_type

return column_type
Expand Down
10 changes: 2 additions & 8 deletions pkg/workloads/spark_job/spark_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ def parse_args(args):


def validate_dataset(ctx, raw_df, cols_to_validate):
total_row_count = ctx.get_metadata(ctx.raw_dataset["key"], ctx.raw_dataset["metadata_key"])[
"dataset_size"
]
total_row_count = ctx.get_metadata(ctx.raw_dataset["key"])["dataset_size"]
conditions_dict = spark_util.value_check_data(ctx, raw_df, cols_to_validate)

if len(conditions_dict) > 0:
Expand Down Expand Up @@ -162,11 +160,7 @@ def ingest_raw_dataset(spark, ctx, cols_to_validate, should_ingest):
ingest_df = limit_dataset(full_dataset_size, ingest_df, ctx.environment["limit"])

written_count = write_raw_dataset(ingest_df, ctx, spark)
ctx.write_metadata(
ctx.raw_dataset["key"],
ctx.raw_dataset["metadata_key"],
{"dataset_size": written_count},
)
ctx.write_metadata(ctx.raw_dataset["key"], {"dataset_size": written_count})
if written_count != full_dataset_size:
logger.info(
"{} rows read, {} rows dropped, {} rows ingested".format(
Expand Down
13 changes: 2 additions & 11 deletions pkg/workloads/spark_job/spark_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def write_training_data(model_name, df, ctx, spark):

ctx.write_metadata(
training_dataset["id"],
training_dataset["metadata_key"],
{"training_size": train_df_acc.value, "eval_size": eval_df_acc.value},
)

Expand Down Expand Up @@ -526,11 +525,7 @@ def validate_transformer(column_name, test_df, ctx, spark):
+ inferred_python_type,
)

ctx.write_metadata(
transformed_column["id"],
transformed_column["metadata_key"],
{"type": inferred_python_type},
)
ctx.write_metadata(transformed_column["id"], {"type": inferred_python_type})

transform_python_collect = execute_transform_python(
column_name, test_df, ctx, spark, validate=True
Expand Down Expand Up @@ -593,11 +588,7 @@ def validate_transformer(column_name, test_df, ctx, spark):

if transformer["output_type"] == "unknown":
inferred_spark_type = transform_spark_df.select(column_name).schema[0].dataType
ctx.write_metadata(
transformed_column["id"],
transformed_column["metadata_key"],
{"type": inferred_spark_type},
)
ctx.write_metadata(transformed_column["id"], {"type": inferred_spark_type})

# perform the necessary upcast/downcast for the column e.g INT -> LONG or DOUBLE -> FLOAT
transform_spark_df = transform_spark_df.withColumn(
Expand Down
5 changes: 2 additions & 3 deletions pkg/workloads/spark_job/test/integration/iris_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def get(input_data_path):

raw_ctx = {
"raw_dataset": {
"key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet",
"metadata_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/metadata.json",
"key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet"
},
"aggregates": {
"class_index": {
Expand Down Expand Up @@ -469,7 +468,6 @@ def get(input_data_path):
"prediction_key": "",
"workload_id": "aokhfrzyw6ju730nbwli",
"dataset": {
"metadata_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/metadata.json",
"train_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/train.tfrecord",
"workload_id": "jjd3l0fi4fhwqtgmpatg",
"eval_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/eval.tfrecord",
Expand Down Expand Up @@ -523,6 +521,7 @@ def get(input_data_path):
"id": "da5e65b994ba4ebb069bdc19cf73da64aee79e5d83f466038dc75b3ef04fa63",
},
"root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554",
"metadata_root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/metadata",
"aggregators": {
"cortex.mean": {
"id": "a68b354ddadc2e14348698e03af74db72cba92d7acb162e3163629e3e343373",
Expand Down
7 changes: 2 additions & 5 deletions pkg/workloads/spark_job/test/integration/iris_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ def test_simple_end_to_end(spark):
raw_df = spark_job.ingest_raw_dataset(spark, ctx, cols_to_validate, should_ingest)

assert raw_df.count() == 15
assert (
ctx.get_metadata(ctx.raw_dataset["key"], ctx.raw_dataset["metadata_key"])["dataset_size"]
== 15
)
assert ctx.get_metadata(ctx.raw_dataset["key"])["dataset_size"] == 15
for raw_column_id in cols_to_validate:
path = os.path.join(raw_ctx["status_prefix"], raw_column_id, workload_id)
status = storage.get_json(str(path))
Expand Down Expand Up @@ -120,7 +117,7 @@ def test_simple_end_to_end(spark):
status["exist_code"] = "succeeded"

dataset = raw_ctx["models"]["dnn"]["dataset"]
metadata = ctx.get_metadata(dataset["id"], dataset["metadata_key"])
metadata = ctx.get_metadata(dataset["id"])
assert metadata["training_size"] + metadata["eval_size"] == 15
assert local_storage_path.joinpath(dataset["train_key"], "_SUCCESS").exists()
assert local_storage_path.joinpath(dataset["eval_key"], "_SUCCESS").exists()
2 changes: 1 addition & 1 deletion pkg/workloads/tf_train/train_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def train(model_name, model_impl, ctx, model_dir):
exporter = tf.estimator.FinalExporter("estimator", serving_input_fn, as_text=False)

train_num_steps = model["training"]["num_steps"]
dataset_metadata = ctx.get_metadata(model["dataset"]["id"], model["dataset"]["metadata_key"])
dataset_metadata = ctx.get_metadata(model["dataset"]["id"])
if model["training"]["num_epochs"]:
train_num_steps = (
math.ceil(dataset_metadata["training_size"] / float(model["training"]["batch_size"]))
Expand Down