Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 Control Protocol #29

Merged
merged 19 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 338 additions & 6 deletions elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ package proto;

option cc_enable_arenas = true;
option go_package = "pkg/proto;proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

// Services that the client is allowed to use over the connection.
enum ConnInfoServices {
// V1 checkin service.
Checkin = 0;
// V2 checkin service.
CheckinV2 = 1;
// Key-value store service.
Store = 2;
// Artifact store service.
Artifact = 3;
// Log service.
Log = 4;
}

// Connection information sent to the application on startup so it knows how to connected back to the Elastic Agent.
//
Expand All @@ -25,6 +41,8 @@ message ConnInfo {
bytes peer_cert = 5;
// Peer private key.
bytes peer_key = 6;
// Allowed services that spawned process can use. (only used in V2)
repeated ConnInfoServices services = 7;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it expected that processes do with the services list? Is the intent that each process checks to make sure the services it needs are actually available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. A process can only use the services that it is given, if it tries to Elastic Agent will return an error back to the caller.

}

// A status observed message is streamed from the application to Elastic Agent.
Expand All @@ -49,7 +67,7 @@ message StateObserved {
// Application is stopping.
STOPPING = 5;
}
// Token that is used to unique identify the application to agent. When agent started this
// Token that is used to uniquely identify the application to agent. When agent started this
// application it would have provided it this token.
string token = 1;
// Current index of the applied configuration.
Expand Down Expand Up @@ -83,12 +101,40 @@ message StateExpected {
// A action request is streamed from the Elastic Agent to the application so an action can be performed
// by the connected application.
message ActionRequest {
// Type of action being performed.
enum Type {
// Custom action (registered by the unit)
CUSTOM = 0;
// Defined diagnostics action.
DIAGNOSTICS = 1;
}
// Unique ID of the action.
string id = 1;
// Name of the action.
// Name of the action (name is ignored for DIAGNOSTICS).
string name = 2;
// JSON encoded parameters for the action.
bytes params = 3;
// Unique ID of the unit (only used with V2).
string unit_id = 4;
// Type of the unit (only used with V2).
UnitType unit_type = 5;
// Type of action to be performed (only used with V2).
Type type = 6;
}

message ActionDiagnosticUnitResult {
// Human readable name of the diagnostic result content.
string name = 1;
// Filename to use to store the diagnostic to the disk.
string filename = 2;
// Human readable description of the information this diagnostic provides.
string description = 3;
// Content-Type of the resulting content.
string content_type = 4;
// Actual file content.
bytes content = 5;
// Timestamp the content was generated at.
google.protobuf.Timestamp generated = 6;
}

// An action response is streamed from the application back to the Elastic Agent to provide a result to
Expand All @@ -101,26 +147,312 @@ message ActionResponse {
// Action has failed.
FAILED = 1;
}
// Token that is used to unique identify the application to agent. When agent started this
// Token that is used to uniquely identify the application to agent. When agent started this
// application it would have provided it this token.
string token = 1;
// Unique ID of the action.
string id = 2;
// Status of the action.
Status status = 3;
// JSON encoded result for the action.
// JSON encoded result for the action (empty when diagnostic action response).
bytes result = 4;
// Specific result for diagnostics action. (only used in V2)
repeated ActionDiagnosticUnitResult diagnostic = 5;
AndersonQ marked this conversation as resolved.
Show resolved Hide resolved
}

// State codes for the current state.
enum State {
STARTING = 0;
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
CONFIGURING = 1;
HEALTHY = 2;
DEGRADED = 3;
FAILED = 4;
STOPPING = 5;
STOPPED = 6;
}

// Type of unit.
enum UnitType {
INPUT = 0;
OUTPUT = 1;
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
}

// A unit that is part of a collector/shipper.
AndersonQ marked this conversation as resolved.
Show resolved Hide resolved
message UnitExpected {
// Unique ID of the unit.
string id = 1;
// Unit type.
UnitType type = 2;
// Expected state of the unit.
State state = 3;
// Index of the either current configuration or new configuration provided.
uint64 config_state_idx = 4;
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
// Resulting configuration. (If the application already has the current `config_state_idx` this
// will be empty.)
string config = 5;
}

// A set of units and their expected states and configuration.
message CheckinExpected {
repeated UnitExpected units = 1;
}

// Observed status for a unit.
//
// Contains the currently applied `config_state_idx` (0 in the case of initial start, 1 is the first
// applied config index) along with the status of the application. In the case that the sent `config_state_idx`
// doesn't match the expected `config_state_idx` that Elastic Agent expects, the unit is always marked as
// `CONFIGURING` and a new `UnitExpected` will be sent to so it can have the latest configuration.
message UnitObserved {
// Unique ID of the unit.
string id = 1;
// Unit type.
UnitType type = 2;
// Current index of the applied configuration.
uint64 config_state_idx = 3;
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
// State of unit.
State state = 4;
// Human readable message for the state of the unit.
// Exposed to users to provide more detail about the state for this single unit.
string message = 5;
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
// JSON encoded payload for the state.
bytes payload = 6;
}

// Observed version information for the running program.
message CheckinObservedVersionInfo {
// Name of the binary.
string name = 1;
// Version of the binary.
string version = 2;
// Additional metadata about the binary.
map<string, string> meta = 3;
}

// Observed statuses and configuration for defined units.
//
// In the case that a unit is missing from the observation then the Elastic Agent will mark that missing unit
// as `STARTING` and send a new `UnitExpected` for the missing unit.
message CheckinObserved {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Units observed state.
repeated UnitObserved units = 2;
// Version information about the running program. Should always be included on first checkin, and not again unless
// one of the values have changed.
optional CheckinObservedVersionInfo version_info = 3;
AndersonQ marked this conversation as resolved.
Show resolved Hide resolved
}

service ElasticAgent {
// Called by the client to provide the Elastic Agent the state of the application.
//
// A `StateObserved` must be streamed at least every 30 seconds or it will result in the
// application be automatically marked as FAILED, and after 60 seconds it will be force killed and
// restarted.
// application be automatically marked as FAILED, and after 60 seconds the Elastic Agent will
// force kill the entire process and restart it.
rpc Checkin(stream StateObserved) returns (stream StateExpected);

// Called by the client to provide the Elastic Agent the state of the application over the V2 protocol.
//
// A `CheckinObserved` must be streamed at least every 30 seconds or it will result in the
// set of units automatically marked as FAILED, and after 60 seconds the Elastic Agent will
// force kill the entire process and restart it.
rpc CheckinV2(stream CheckinObserved) returns (stream CheckinExpected);

// Called by the client on connection to the GRPC allowing the Elastic Agent to stream action
// requests to the application and the application stream back responses to those requests.
//
// Request and response is swapped here because the Elastic Agent sends the requests in a stream
// to the connected process. The order of response from the process does not matter, it is acceptable
// for the response order to be different then the request order.
rpc Actions(stream ActionResponse) returns (stream ActionRequest);
}

// Type of transaction to start.
enum StoreTxType {
READ_ONLY = 0;
READ_WRITE = 1;
}

// Begins a new transaction.
//
// A started transaction must either have commit or discard called.
message StoreBeginTxRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// ID of the unit.
string unit_id = 2;
// Type of the unit.
UnitType unit_type = 3;
// Type of transaction to start.
StoreTxType type = 4;
}

// Response for a started transaction.
message StoreBeginTxResponse {
// Transaction ID.
string id = 1;
}

// Gets a key from the store.
message StoreGetKeyRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
// Name of the key.
string name = 3;
}

// Response of the retrieved key.
message StoreGetKeyResponse {
// Status result of the get.
enum Status {
// Action was successful.
FOUND = 0;
// Action has failed.
NOT_FOUND = 1;
}
Status status = 1;
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
// Value when `FOUND`.
bytes value = 2;
}

// Sets a key into the store.
//
// `tx_id` must be an ID of a transaction that was started with `READ_WRITE`.
message StoreSetKeyRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
// Name of the key.
string name = 3;
// Value of the key.
bytes value = 4;
// TTL of the key (in milliseconds)
uint64 ttl = 5;
}

// Response from `SetKey`.
message StoreSetKeyResponse {
// Empty at the moment, defined for possibility of adding future return values.
}
Copy link
Contributor

@ph ph May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I see a TTL maybe or to indicate single-use key.


// Deletes a key in the store.
//
// `tx_id` must be an ID of a transaction that was started with `READ_WRITE`.
//
// Does not error in the case that a key does not exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, align with Go's expected behavior.

message StoreDeleteKeyRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
// Name of the key.
string name = 3;
}

// Response from `DeleteKey`.
message StoreDeleteKeyResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

// Commits the transaction in the store.
//
// Upon error the whole transaction is discarded so no need to call discard after error.
message StoreCommitTxRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
}

// Response from `CommitTx`.
message StoreCommitTxResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

// Discards the transaction in the store.
message StoreDiscardTxRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
}

// Response from `DiscardTx`.
message StoreDiscardTxResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

service ElasticAgentStore {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this store needed? Who are the users of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for all users, to ease their implementation. Hopefully filestreams and other inputs will transition to using the key-value store. The benefit is the life-cycle of the state is handled by the Elastic Agent.

Elastic Agent will clean up state for units, as well manage the state across upgrade of versions. No need for the inputs to worry about writing state and managing the life-cycle of it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the surface-level it sounds great. But after thinking about it, I am on the fence. If e.g. filestream uses agent to store state information, it means that we cannot really test filestream without either starting an Agent or mocking it. I am afraid this adds unnecessary overhead to development of inputs. During testing I would like to avoid starting an Agent whenever it is possible. In e2e tests it is fine, but for unit and integration tests it sounds too much. But let's see.

// Key-Value state storage is provided for each unit.
//
// Transactional store is provided to allow multiple key operations to occur before a commit to ensure consistent
// state when multiple keys make up the state of an units persistent state.
rpc BeginTx(StoreBeginTxRequest) returns (StoreBeginTxResponse);
rpc GetKey(StoreGetKeyRequest) returns (StoreGetKeyResponse);
rpc SetKey(StoreSetKeyRequest) returns (StoreSetKeyResponse);
rpc DeleteKey(StoreDeleteKeyRequest) returns (StoreDeleteKeyResponse);
rpc CommitTx(StoreCommitTxRequest) returns (StoreCommitTxResponse);
rpc DiscardTx(StoreDiscardTxRequest) returns (StoreDiscardTxResponse);
}

// Requests an artifact from the Elastic Agent.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are artifacts? Is it anything hosted by Fleet Server?

For Endpoint, would this be a replacement for Endpoint downloading large files from Fleet Server directly? Or is this meant for another purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is meant as a replacement for Endpoint downloading directly from Fleet Server. We want to remove that and have artifact access to go through the control protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferullo it's an evolution, we aren't planning to remove it in the short term from fleet-server, there were also needs for inputs in beats world to download larger assets and we would like to have a more uniform way of doing it, so beats don't reach out directly to the fleet server, this also aligns with our goal to lock down more the beats.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@kevinlog can you think of anything more that would be needed here for Endpoint's use case?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are currently POC'ing a feature to request and download files from the Endpoint for 8.4. The user would be able to specify a path on the remote host and the Endpoint would call some function to transfer the file to the server.

We were thinking about putting some logic into Fleet Server to help with storing the files via artifacts (and eventually ES). I don't think this precludes us from doing this, but it's good to have the conversation.

@pzl is working on a POC for this ticket, currently: https://github.com/elastic/security-team/issues/3616. @pzl - any thoughts on the above? Does this change things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Elastic Agent never talks to Kibana. This is changing the behavior so endpoint does not need to be passed credentials to communicate with Fleet Server. Instead the Elastic Agent will perform the talking to Fleet Server. Only the Elastic Agent should communicate with Fleet Server, everything underneath in the control protocol sense should go through the Elastic Agent on the system.

Nothing actually changes on Fleet Server side, the change is that in v2 Endpoint will not be given the credentials or the URL to even communicate with Fleet Server. Endpoint will request the artifact from Elastic Agent and then Elastic Agent will get that artifact from the Fleet Server and provide it to Endpoint through the GRPC control protocol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thank you. I think I understand the shifting responsibilities now.

Where this does conflict a little bit with our future plans is that we were looking to expand some of the fleet APIs to handle this bi-directional file handling. And we were looking to have Endpoint fetch and deliver more files outside of the normal Artifact distribution. (We would be putting resources into adding these API changes to fleet server)

If the future-looking plans are to nix the Fleet Server -> client (Endpoint) communications and credentials, then perhaps we need to rethink. Is this something we can include in the V2 protocol here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can add it to the v2 protocol. Let me know what you need, and we can align to ensure that we provide it for you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinlog One question about uploading any reason that the endpoint doesn't send that artifact directly to an Elasticsearch index? What is the rational to pass through fleet server?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For large files (dozens to hundreds of MB, perhaps into GB range), it should be chunked into many documents to my understanding. Endpoint could potentially perform this chunking work on its own and send into ES indices/streams:

but for future-looking scenarios where we have better storage options (either an object store like was recently made available in our cloud deployments, but not yet usable by solutions, or if ES adds native file-storage https://github.com/elastic/dev/issues/1544 ) we don't want to expose the underlying storage mechanism so that the API can remain stable and the storage solution can evolve. The other problem is one of access control and permissions for the Endpiont, which is handled more easily through a fleet server API

message ArtifactFetchRequest {
// Token that is used to uniquely identify the collection of inputs to the agent. When started this is provided
// in the `ConnInfo`.
string token = 1;
// ID of the artifact.
string id = 2;
// SHA256 of the artifact.
string sha256 = 3;
AndersonQ marked this conversation as resolved.
Show resolved Hide resolved
}

// Content of the artifact.
message ArtifactFetchResponse {
oneof content_eof {
// Artifact content.
bytes content = 1;
// End-of-file.
google.protobuf.Empty eof = 2;
}
}

service ElasticAgentArtifact {
// Fetches an artifact from the artifact store.
//
// Response from this call can be chunked over multiple `ArtifactFetchResponse` for very large responses. A minimum
// of two responses will always be returned. The last response has eof set.
rpc Fetch(ArtifactFetchRequest) returns (stream ArtifactFetchResponse);
}

message LogMessage {
// ID of the unit.
string unit_id = 1;
// Type of the unit.
UnitType unit_type = 2;
// ECS log message body JSON encoded.
bytes message = 3;
}

message LogMessageRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Multiple message to report at the same time.
repeated LogMessage messages = 2;
}

message LogMessageResponse {
// Empty at the moment, defined for possibility of adding future return values.
}
Comment on lines +445 to +448
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for forward change.


// Log service is only exposed to programs that are not started as sub-processes by Elastic Agent.
//
// This allows services that are not started as sub-processes to write to the same stdout that programs that are
// started as subprocess. A program that is as a sub-process with stdout connected does not have the ability to use
// this service.
service ElasticAgentLog {
// Log messages to the Elastic Agent.
rpc Log(LogMessageRequest) returns (LogMessageResponse);
}
Loading