Permalink
Browse files

Add config for Spark log level, move TF log level to environment

  • Loading branch information...
deliahu committed Jan 31, 2019
1 parent 62ebe72 commit db62228e6098cb801acfd98deff25d4984cb430e
@@ -7,6 +7,9 @@ Transfer data at scale from data warehouses like S3 into the Cortex cluster. Onc
```yaml
- kind: environment # (required)
name: <string> # environment name (required)
log_level:
tensorflow: <string> # TensorFlow log level (DEBUG, INFO, WARN, ERROR, or FATAL) (default: INFO)
spark: <string> # Spark log level (ALL, TRACE, DEBUG, INFO, WARN, ERROR, or FATAL) (default: WARN)
data:
<data_config>
```
@@ -28,7 +28,10 @@ fi
# entrypoint.sh doesn't do any of the following for executor (it does for driver):
. $SPARK_HOME/bin/load-spark-env.sh # Load spark-env.sh
export SPARK_EXTRA_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) # Add hadoop to classpath
export SPARK_JAVA_OPT_99="-Dlog4j.configuration=file://${SPARK_HOME}/conf/log4j.properties" # Add log config (99 is used to avoid conflicts)
export SPARK_JAVA_OPT_99="-Dlog4j.configuration=file://${SPARK_HOME}/conf-custom/log4j.properties" # Add log config (99 is used to avoid conflicts)

# Set user-specified spark logging level
sed -i -e "s/log4j.rootCategory=INFO, console/log4j.rootCategory=${CORTEX_SPARK_VERBOSITY}, console/g" $SPARK_HOME/conf-custom/log4j.properties

echo ""
echo "Starting"
@@ -27,8 +27,9 @@ import (
type Environments []*Environment

type Environment struct {
Name string `json:"name" yaml:"name"`
Data Data `json:"-" yaml:"-"`
Name string `json:"name" yaml:"name"`
LogLevel *LogLevel `json:"log_level" yaml:"log_level"`
Data Data `json:"-" yaml:"-"`
}

var environmentValidation = &cr.StructValidation{
@@ -40,6 +41,10 @@ var environmentValidation = &cr.StructValidation{
AlphaNumericDashUnderscore: true,
},
},
&cr.StructFieldValidation{
StructField: "LogLevel",
StructValidation: logLevelValidation,
},
&cr.StructFieldValidation{
StructField: "Data",
Key: "data",
@@ -49,6 +54,30 @@ var environmentValidation = &cr.StructValidation{
},
}

type LogLevel struct {
Tensorflow string `json:"tensorflow" yaml:"tensorflow"`
Spark string `json:"spark" yaml:"spark"`
}

var logLevelValidation = &cr.StructValidation{
StructFieldValidations: []*cr.StructFieldValidation{
&cr.StructFieldValidation{
StructField: "Tensorflow",
StringValidation: &cr.StringValidation{
Default: "INFO",
AllowedValues: []string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL"},
},
},
&cr.StructFieldValidation{
StructField: "Spark",
StringValidation: &cr.StringValidation{
Default: "WARN",
AllowedValues: []string{"ALL", "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"},
},
},
},
}

type Data interface {
GetIngestedFeatures() []string
GetExternalPath() string
@@ -40,7 +40,6 @@ type Model struct {
DataPartitionRatio *ModelDataPartitionRatio `json:"data_partition_ratio" yaml:"data_partition_ratio"`
Training *ModelTraining `json:"training" yaml:"training"`
Evaluation *ModelEvaluation `json:"evaluation" yaml:"evaluation"`
Misc *ModelMisc `json:"misc" yaml:"misc"`
Compute *TFCompute `json:"compute" yaml:"compute"`
Tags Tags `json:"tags" yaml:"tags"`
}
@@ -123,10 +122,6 @@ var modelValidation = &cr.StructValidation{
StructField: "Evaluation",
StructValidation: modelEvaluationValidation,
},
&cr.StructFieldValidation{
StructField: "Misc",
StructValidation: modelMiscValidation,
},
tfComputeFieldValidation,
tagsFieldValidation,
typeFieldValidation,
@@ -311,22 +306,6 @@ var modelEvaluationValidation = &cr.StructValidation{
},
}

type ModelMisc struct {
Verbosity string `json:"verbosity" yaml:"verbosity"`
}

var modelMiscValidation = &cr.StructValidation{
StructFieldValidations: []*cr.StructFieldValidation{
&cr.StructFieldValidation{
StructField: "Verbosity",
StringValidation: &cr.StringValidation{
Default: "INFO",
AllowedValues: []string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL"},
},
},
},
}

func (models Models) Validate() error {
for _, model := range models {
if err := model.Validate(); err != nil {
@@ -63,7 +63,6 @@ func getModels(
buf.WriteString(s.Obj(modelConfig.DataPartitionRatio))
buf.WriteString(s.Obj(modelConfig.Training))
buf.WriteString(s.Obj(modelConfig.Evaluation))
buf.WriteString(s.Obj(modelConfig.Misc))
buf.WriteString(features.IDWithTags(modelConfig.AllFeatureNames())) // A change in tags can invalidate the model

for _, aggregate := range modelConfig.Aggregates {
@@ -143,6 +143,9 @@ func SparkSpec(workloadID string, ctx *context.Context, workloadType string, spa
Key: "AWS_SECRET_ACCESS_KEY",
},
},
EnvVars: map[string]string{
"CORTEX_SPARK_VERBOSITY": ctx.Environment.LogLevel.Spark,
},
},
PodName: &workloadID,
ServiceAccount: util.StrPtr("spark"),
@@ -167,6 +170,9 @@ func SparkSpec(workloadID string, ctx *context.Context, workloadType string, spa
Key: "AWS_SECRET_ACCESS_KEY",
},
},
EnvVars: map[string]string{
"CORTEX_SPARK_VERBOSITY": ctx.Environment.LogLevel.Spark,
},
},
Instances: &sparkCompute.Executors,
},
@@ -346,7 +346,6 @@ def model_config(self, model_name):
"aggregates",
"training",
"evaluation",
"misc",
"tags",
]
util.keep_dict_keys(model_config, config_keys)
@@ -294,6 +294,7 @@ def start(args):
ctx = Context(s3_path=args.context, cache_dir=args.cache_dir, workload_id=args.workload_id)
api = ctx.apis_id_map[args.api]
model = ctx.models[api["model_name"]]
tf_lib.set_logging_verbosity(ctx.environment["log_level"]["tensorflow"])

local_cache["ctx"] = ctx
local_cache["api"] = api
@@ -117,7 +117,7 @@ def train(model_name, model_impl, ctx, model_dir):
util.mkdir_p(model_dir)
util.rm_dir(model_dir)

tf_lib.set_logging_verbosity(model["misc"]["verbosity"])
tf_lib.set_logging_verbosity(ctx.environment["log_level"]["tensorflow"])

run_config = tf.estimator.RunConfig(
tf_random_seed=model["training"]["tf_random_seed"],

0 comments on commit db62228

Please sign in to comment.