Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add validation and new upi logger type #319

Merged
merged 28 commits into from
Mar 2, 2023

Conversation

leonlnj
Copy link
Contributor

@leonlnj leonlnj commented Feb 16, 2023

Description

This PR aims at

  • Adding some validation to UPI routers for invalid component configuration
  • Bugfix existing experiment response that doesn't include experiment and treatment name defined in runner interface
  • New ResultLogger=UPI and removed ConsoleLogger from API schema
    • Adding RouterLog publishing to UPI router (existing implementation to BQ/Kafka is removed)
      • Currently UPI Router will only publish to Kafka

Task

  • API Server
  • Router
  • SDK
  • UI is not scoped in this PR

Key Modification

API Schema

api/api/specs/routers.yaml - added UPI logger type and removed console

Validation

api/turing/validation/validator.go - Standard ensembler and UPI logger validation for UPI Router

API Server

api/turing/cluster/servicebuilder/router.go - construction of env var for Router. Logic to which kafka topic is predefined here.

Patching Experiment response

engines/router/missionctl/experiment/experiment.go - where the experiment and treatment name was never set before

Patching e2e

api/Makefile - fix cli to include env creds
api/config-dev-exp-engine.yaml - patch config to incorporate recent plugin changes + fix treatment config
api/e2e/test/config-local.yaml - add description of expected payload for cred config
api/e2e/test/router_test.go - patching to fit the correct treatment name and route name

UPI Logger

engines/router/missionctl/log/resultlog/upi_result_log.go - new UPIResultLogger struct and UPILogger interface. Kakfa and nop logger added new method to satisfy UPILogger interface. This is also the main logic on how RouterLog is constructed and the test validate that the fields are set in expected manner.

engines/router/missionctl/log/resultlog/kafka.go - other than adding new method for UPILogger interface, existing method is moved to writeToKafka which takes in a generic proto, how the message is published remain unchanged. Both write (for http) and WriteUPIRouterLog (for upi) call the same underlying method.

engines/router/missionctl/log/resultlog/resultlog.go - one of the key changes is to redefined TuringResultLogEntry (its weird how it was defined to be turing.TuringResultLogMessage and then added pointer receiver). The proto is added as field and passed as it is when required.

type TuringResultLogEntry turing.TuringResultLogMessage
type TuringResultLogEntry struct {
	resultLogMessage turing.TuringResultLogMessage
}

The previous defined, will not recognize the type as a proto, probably due to type tracing of custom-type (Syntax: Type Definitions and Concept: Underlying Types)

The original proto can assigned to a proto.message but not TuringResultLogEntry
Screenshot 2023-02-23 at 4 07 40 PM

Because of this, the type was never recognized as a proto, and to accomodate this change, json.marshal is modified to protojson marshaller. The output are intact (as per unit test), only the marshaller is required to change to produce the same output.

engines/router/missionctl/server/application.go- Construct the logger and pass it to UPILogger.

engines/router/missionctl/server/upi/server.go - Note that UPILogger has its own ResultLogger and does not refer to the global module as per HTTP. The logger is injected and the resultlogger is part of the missionControl, this structural change is to allow dependency injection of logger implementation (no need for monkey patch) and it allows the removal of logUtils and global reference of ResultLog module (currently depending on it without being explicitly anywhere). Result logging is using the ResultLogger injected now. Added implementation to get experiment response using fiber context

engines/router/missionctl/server/upi/logutils.go - deleted and beloing to part of engines/router/missionctl/log/resultlog/upi_result_log.go, in the rightful log module.

SDK

Validation and UPI Logger init

@leonlnj leonlnj self-assigned this Feb 17, 2023
variants_configuration:
control:
traffic: 0.85
treatment_configuration:
foo: bar
route_name: treatment-a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously the treatment is control but it return a route_name of treatment-a, vice versa with treatment-1.

Minor refactoring to fix that and simplify the config

@@ -34,6 +34,7 @@ var cfg config.Config
var defaultDeploymentIntervals = []interface{}{"10m", "5s"}
var defaultDeletionIntervals = []interface{}{"20s", "2s"}
var arbitraryUpdateIntervals = []interface{}{"10s", "1s"}
var istioVirtualServiceIntervals = []interface{}{"60s", "5s"}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increased and shifted, as 30s is too short for local setup and 60s works better

routerE.POST("/v1/predict").
WithHeaders(defaultPredictHeaders).
WithJSON(json.RawMessage(`{"client": {"id": 4}}`)).
Expect().
Status(http.StatusOK).
JSON().Equal(json.RawMessage(`{"version": "treatment-a"}`))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was actually flawed previously. "4" calls treatment "control" which has route_name=treatment-a

Since the exp config is patched to pair route and treatment consistently, id=4 will call control and return control

@@ -22,6 +23,15 @@ import (
routerConfig "github.com/caraml-dev/turing/engines/router/missionctl/config"
)

var dummyConfig json.RawMessage

func getDefaultValidator() (*validator.Validate, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracting common mock construction for re-use across whole test

}

func (tt routerConfigTestCase) RouterConfig(protocol routerConfig.Protocol) *request.RouterConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea why protocol is in method parameter (written by me 😅 previously i think), removed and refer to input args instead for consistency

@@ -36,6 +36,8 @@ const (
ConsoleLogger ResultLogger = "CONSOLE"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: console logger is only removed from API, the implementation isn't remove as it can be potentially useful for debugging

@@ -92,6 +94,8 @@ func NewResponse(expPlan *runner.Treatment, expPlanErr error) *Response {
experimentResponse.Error = expPlanErr.Error()
} else {
experimentResponse.Configuration = expPlan.Config
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

treatment and experiment name was never returned prior

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context on why these fields were not being logged previously - the experiment config returned by xp and other internal engines typically also included the experiment name and treatment name in the config. So we didn't care too much about logging these values explicitly (rather, these fields were only used for things like metrics). But I agree it makes sense to include them in the logs (especially since we're not supposed to make assumptions about what the config includes and what it doesn't).

(Including these fields in func (r *Response) Body() { ... would've been a breaking change for the users though - glad we don't see the need to do that yet.)

}
}

func (us *Server) Run(listener net.Listener) {
s := grpc.NewServer()
//TODO: the unmarshalling can be done more efficiently by using partial deserialization
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially optimization could had been done to speed up calls, however it would be required for logging purposes, hence there is no more need to explore this

@@ -116,5 +116,5 @@ def test_deploy_router_with_std_ensembler():
json={"client": {"id": 4}},
)
assert response.status_code == 200
expected_response = {"version": "treatment-a"}
expected_response = {"version": "control"}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

patching due to correction of the experiment engine config

@@ -153,7 +155,7 @@ def test_deploy_router_upi_traffic_split():
variable_pb2.Variable(
name="client_id",
type=type_pb2.TYPE_STRING,
string_value="1",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although named treatment this was actually control, if traffic rules malfunction and a control via default route, would had pass this anyway

@@ -64,7 +64,7 @@ def test_deploy_router_upi_traffic_split():
field_source=FieldSource.PREDICTION_CONTEXT,
field="client_id",
operator="in",
values=["1"],
values=["7"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 would return control, which is the default route and doesn't test traffic rules

@@ -175,7 +177,7 @@ def test_deploy_router_upi_traffic_split():
variable_pb2.Variable(
name="client_id",
type=type_pb2.TYPE_STRING,
string_value="1234",
string_value="12",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both 7 and 12 returns treatment. Hence how traffic rules is tested, is that two request both should get treatment, but the one not fulfilling traffic rule will get control, which is 12. 1234 previously returns treatment which calls control route

@@ -126,8 +126,9 @@ def test_deploy_router_upi_std_ensembler():
channel = grpc.insecure_channel(retrieved_router.endpoint)
stub = upi_pb2_grpc.UniversalPredictionServiceStub(channel)

logging.info("send request that satisfy treatment-a")
logging.info("send request that satisfy control")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than the traffic rules splt which has some flaw and the others are merely adapting to the new corrected config

@@ -47,8 +47,9 @@ def __init__(
self.status = RouterStatus(status)
self.name = name
self.monitoring_url = monitoring_url
self.log_config = RouterVersionLogConfig(**kwargs.get("log_config"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because LogConfig validates against the Protocol, the RouterConfig has to be initiated first

@leonlnj leonlnj requested review from pradithya, tiopramayudi and a team February 27, 2023 06:53
@leonlnj leonlnj marked this pull request as ready for review February 27, 2023 06:53
Copy link
Member

@pradithya pradithya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @leonlnj I left some comments below

engines/router/missionctl/log/resultlog/upi_result_log.go Outdated Show resolved Hide resolved
engines/router/missionctl/log/resultlog/upi_result_log.go Outdated Show resolved Hide resolved
@@ -168,4 +169,4 @@ def test_deploy_router_upi_std_ensembler():
logging.info(f"received response {response}")

assert response.prediction_result_table == request.prediction_table
assert response.metadata.models[0].name == "control"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add assertion in this test and test_deploy_router_upi_traffic_split that validate experiment_name and treatment_name are properly populated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not possible with the current proprietary engine, it doesn't return those fields.

)
}

func (l *KafkaLogger) WriteUPIRouterLog(routerLog *upiv1.RouterLog) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit out of place. Can we instead have a separate KafkaLogger (e.g. UPIKafkaLogger) and embed the original KafkaLogger within it and override the write method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm the challenge with this, is that a UPI wrapper for each logger needs to be created, including the nop logger, since write is redeclared twice as the parameter is different. I can change the interface to a proto.message but all the loggers for HTTP need some refactor and type assert back to TuringResultLog. I prefer to do this is another MR if necessary.

// TuringResultLogger is an abstraction for the underlying result logger
type TuringResultLogger interface {
	write(*TuringResultLogEntry) error
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that a UPI wrapper for each logger needs to be created, including the nop logger

I don't think this is needed. Since the logging orchestrator (UPIResultLogger) in itself in a separate implementation, the logger in the below initialization need not adhere to the TuringResultLogger interface anymore, right? That logger can be the UPIKafkaLogger which implements only the UPILogger interface, but makes use of the existing Kafka logger under the hood to write the given messge.

resultLogger, err := resultlog.InitUPIResultLogger(cfg.AppConfig.Name, logger)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep UPIKafkaLogger is fine, but I would need UPINopLogger else there be this clash. Will just add it anyway 😅

// write is a nop method that satisfies the NopLogger interface
func (*NopLogger) write(turLogEntry *TuringResultLogEntry) error {
	return nil
}

// write is a nop method that satisfies the UPILogger interface
func (*NopLogger) write(routerLog *upiv1.RouterLog) error {
	return nil
}

Copy link
Member

@pradithya pradithya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Collaborator

@krithika369 krithika369 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks, @leonlnj ! Left some small comments.

@echo "Building proprietary experiment engine image..."
cd ../engines/experiment/examples/plugins/hardcoded && make build-local-proprietary-exp-plugin-image
@echo "Building router image..."
cd ../engines/router && make build-local-router-image DOCKER_REGISTRY=localhost:5000 OVERWRITE_VERSION=latest
@echo "Generating cluster cred"
sh ../infra/docker-compose/dev/extract_creds.sh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. Amidst the testing improvements and chart changes (move to the umbrella chart), one thing we could do is to let the e2e tests in the component repo stop depending on the chart at all and use this setup instead, which could catch these problems earlier. We'll add this to the list of improvements. CC: @deadlycoconuts

// UPIConfig captures the defaults used by UPI Router
type UPIConfig struct {
// KafkaBrokers is broker which all Router will write to when UPI logging is enabled
KafkaBrokers string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we expecting this UPIConfig to hold more values eventually? Instead, should we just put this under the existing KafkaConfig ? For the HTTP_JSON routers, we don't have to use this value (since the broker info will be supplied by the users).

Copy link
Contributor Author

@leonlnj leonlnj Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I saw kafkaConfig which contain MaxMessageBytes and CompressionType as defaults applied to all Kafka, the broker is applied specifically for UPI Routers only, so I didnt reuse it.

Are we expecting this UPIConfig to hold more values eventually?

Yes, its one of possible feature to enable finer-grain logging control, so we can reduce the amount of the logs if user doesn't want them.

api/turing/validation/validator.go Outdated Show resolved Hide resolved
api/turing/validation/validator_test.go Show resolved Hide resolved
@@ -92,6 +94,8 @@ func NewResponse(expPlan *runner.Treatment, expPlanErr error) *Response {
experimentResponse.Error = expPlanErr.Error()
} else {
experimentResponse.Configuration = expPlan.Config
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context on why these fields were not being logged previously - the experiment config returned by xp and other internal engines typically also included the experiment name and treatment name in the config. So we didn't care too much about logging these values explicitly (rather, these fields were only used for things like metrics). But I agree it makes sense to include them in the logs (especially since we're not supposed to make assumptions about what the config includes and what it doesn't).

(Including these fields in func (r *Response) Body() { ... would've been a breaking change for the users though - glad we don't see the need to do that yet.)

)
}

func (l *KafkaLogger) WriteUPIRouterLog(routerLog *upiv1.RouterLog) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that a UPI wrapper for each logger needs to be created, including the nop logger

I don't think this is needed. Since the logging orchestrator (UPIResultLogger) in itself in a separate implementation, the logger in the below initialization need not adhere to the TuringResultLogger interface anymore, right? That logger can be the UPIKafkaLogger which implements only the UPILogger interface, but makes use of the existing Kafka logger under the hood to write the given messge.

resultLogger, err := resultlog.InitUPIResultLogger(cfg.AppConfig.Name, logger)

@leonlnj leonlnj merged commit 5df71f6 into caraml-dev:main Mar 2, 2023
@leonlnj leonlnj added the enhancement New feature or request label Mar 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants