Skip to content

Commit

Permalink
receive: Replication (#1270)
Browse files Browse the repository at this point in the history
* pkg/receive: add replication

This commit adds a new replication feature to the Thanos receiver.
By default, replication is turned off, however, when replication is
enabled (replication factor >=2), the target node for a time series will
replicate the time series to the other nodes concurrently and
synchronously. If the replication to >= (rf+1)/2 nodes fails, the
original write request is failed.

* test/e2e: add e2e tests for replication
  • Loading branch information
squat authored and brancz committed Jun 24, 2019
1 parent 38a9da0 commit 70ba420
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 143 deletions.
22 changes: 16 additions & 6 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default("THANOS-TENANT").String()

replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default("THANOS-REPLICA").String()

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand Down Expand Up @@ -101,6 +105,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
cw,
*local,
*tenantHeader,
*replicaHeader,
*replicationFactor,
)
}
}
Expand All @@ -123,6 +129,8 @@ func runReceive(
cw *receive.ConfigWatcher,
endpoint string,
tenantHeader string,
replicaHeader string,
replicationFactor uint64,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand All @@ -137,12 +145,14 @@ func runReceive(
localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
Endpoint: endpoint,
TenantHeader: tenantHeader,
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
Endpoint: endpoint,
TenantHeader: tenantHeader,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
})

// Start all components while we wait for TSDB to open but only load
Expand Down
63 changes: 44 additions & 19 deletions docs/proposals/approved/201812_thanos-remote-receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Prometheus has the remote write API to send samples collected by a Prometheus se

## Architecture

The Thanos receiver component seamlessly integrates into the rest of the Thanos components. It acts similarly to what is referred to in Thanos as a "source", in the current set of components, this is typically represented by the Thanos sidecar that is put next to Prometheus to ship tsdb blocks into object storage and reply to store API requests, however in the case of the Thanos receiver, the Thanos sidecar is not necessary anymore, as the data is replicated from the original Prometheus server to the Thanos receiver, and the Thanos receiver participates in the Thanos gossip mesh. The Prometheus server on a tenants infrastructure can therefore be completely vanilla and is just configured to replicate its time-series to the Thanos receiver.
The Thanos receiver component seamlessly integrates into the rest of the Thanos components. It acts similarly to what is referred to in Thanos as a "source", in the current set of components, this is typically represented by the Thanos sidecar that is put next to Prometheus to ship tsdb blocks into object storage and reply to store API requests, however in the case of the Thanos receiver, the Thanos sidecar is not necessary anymore, as the data is replicated from the original Prometheus server to the Thanos receiver, and the Thanos receiver participates in the Thanos gossip mesh. The Prometheus server on a tenant's infrastructure can therefore be completely vanilla and is just configured to replicate its time-series to the Thanos receiver.

Instead of directly scraping metrics, however, the Thanos receiver accepts Prometheus remote-write requests, and writes these into a local instance of the Prometheus tsdb. Once successfully committed to the tenant's tsdbs, the requests returns successfully. To prevent data leaking at the database level, each tenant has an individual tsdb instance, meaning a single Thanos receiver may manage multiple tsdb instances. The receiver answers Thanos store API requests and uploads built blocks of the Prometheus tsdb. Implementation-wise, this just requires wiring up existing components. As tenant's data within object storage are separate objects, it may be enough separation to have a single bucket for all tenants, however, this architecture supports any setup of tenant to object storage bucket combination.

Expand Down Expand Up @@ -76,18 +76,17 @@ For the Thanos receiver to work at scale there are some areas that need further

### Load distribution

In order to scale beyond a single machine, time-series are distributed among all receivers. In order to do this consistently, the Thanos receivers build a hash ring and use consistent hashing to distribute the time-series. Receivers are configured with their identity, and thus their position in the hash ring, by an external system (such as the configuration management system). The position in the hashring decides which time-series are accepted and stored by a Thanos receiver.
In order to scale beyond a single machine, time-series are distributed among all receivers. In order to do this consistently, the Thanos receivers build a hashring and use consistent hashing to distribute the time-series. Receivers are configured with their identity, and thus their position in the hashring, by an external system (such as the configuration management system). The position in the hashring decides which time-series are accepted and stored by a Thanos receiver.

As the Thanos receivers route the requests to the responsible node, an edge load balancer is configured to randomly distribute load to all Thanos receivers available.

Time-series hashes are calculated from the entire label set of that time-series. To ensure that potentially large time-series with common labels do not all end up being ingested by the same node, the tenant’s ID should be included in the hash. The tenant's ID is obtained through a set of labels passed to the receiver via an HTTP header. The label keys defining a tenant must be pre-configured. For example, the receiver is configured with the following flag:
Time-series hashes are calculated from the entire label set of that time-series. To ensure that potentially large time-series with common labels do not all end up being ingested by the same node, the tenant’s ID should be included in the hash. The tenant's ID is passed to the receiver via an HTTP header. The header defining a tenant must be pre-configured. For example, the receiver is configured with the following flag:

```
--tenant-labels=tenant
--tenant-header=X-THANOS-TENANT
--receive.tenant-header=THANOS-TENANT
```

And a valid request would have the `X-THANOS-TENANT` header set to `{tenant="A"}`. When a tenant label is configured, it must be included in the header sent.
A valid request could have the `THANOS-TENANT` header set to `tenant-a`. If the header is not present in a request, then the request is interpreted as belonging to the empty string tenant ``.

Using the tenant's ID in the hash will help to distribute the load across receivers. The hash is roughly calculated as follows.

Expand All @@ -103,21 +102,30 @@ While the routing functionality could be a separate component, we choose to have

In attempts to build similar systems a common fallacy has been to distribute all load from all tenants of the system onto a single set of ingestion nodes. This makes reasoning and management rather simple, however, has turned out to have stronger downsides than upsides. Companies using Cortex in production and offering it to clients have setup entirely separate clusters for different customers, because their load has caused incidents affecting other clients. In order to allow customers to send large amounts of data at irregular intervals the ingestion infrastructure needs to scale without impacting durability. Scaling the ingestion infrastructure can cause endpoints to not accept data temporarily, however, the write-ahead-log based replication can cope with this, as it backs off and continues sending its data once the ingestion infrastructure successfully processes those requests again.

Hard tenants in the Thanos receiver are configured in a configuration file, changes to this configuration must be orchestrated by a configuration management tool. When a remote write request is received by a Thanos receiver it goes through the list of configured hard tenants. For each hard tenant there is a separate hash ring respective to their ingestion nodes as described in the "Load distribution" section. A hard tenant also has the number of associated receive nodes belonging to it. A remote write request can be initially received by any receiver node, however, will only be dispatched to receiver nodes that correspond to that hard tenant.
Hard tenants in the Thanos receiver are configured in a configuration file. Changes to this configuration must be orchestrated by a configuration management tool. When a remote write request is received by a Thanos receiver, it goes through the list of configured hard tenants. For each hard tenant, there is a separate hashring respective to their ingestion nodes as described in the "Load distribution" section. A hard tenant also has the number of associated receive endpoints belonging to it. A remote write request can be initially received by any receiver node, however, will only be dispatched to receiver endpoints that correspond to that hard tenant.

A sample of the configuration of tenants and their respective infrastructure:

```
tenants:
- match: tenant-a
nodes:
- tenant-a-1.metrics.local
- hashmod: 0
nodes:
- soft-tenants-1.metrics.local
```json
[
{
hashring: "tenant-a",
endpoints: ["tenant-a-1.metrics.local:19291/api/v1/receive", "tenant-a-2.metrics.local:19291/api/v1/receive"],
tenants: ["tenant-a"]
},
{
hashring: "tenants-b-c",
endpoints: ["tenant-b-c-1.metrics.local:19291/api/v1/receive", "tenant-b-c-2.metrics.local:19291/api/v1/receive"],
tenants: ["tenant-b", "tenant-c"]
},
{
hashring: "soft-tenants",
endpoints: ["http://soft-tenants-1.metrics.local:19291/api/v1/receive"]
}
]
```

To start, exact matches of tenant IDs will used to distribute requests to receive nodes. Additionally a sharding mechanism performing `hashmod` on the tenant ID, in order to shard the tenants among pools of receivers. Should it be necessary, more sophisticated mechanisms can be added later. When a request is received, the specified tenant is tested against the configured tenant ID until an exact match is found. If the specified tenant is the empty string, then any tenant is considered a valid match. If no hard tenancy is configured, a tenant will automatically land in a soft tenancy hashring.
To start, exact matches of tenant IDs will be used to distribute requests to receive endpoints. Should it be necessary, more sophisticated mechanisms can be added later. When a request is received, the tenant specified in the request is tested against the configured allowed tenants for each hashring until an exact match is found. If a hashring specifies no explicit tenants, then any tenant is considered a valid match; this allows for a cluster to provide soft-tenancy. Requests whose tenant ID matches no other hashring explicitly, will automatically land in this soft tenancy hashring. If no matching hashring is found and no soft tenancy is configured, the receiver responds with an error.

```
Soft tenant hashring
Expand Down Expand Up @@ -157,11 +165,29 @@ To start, exact matches of tenant IDs will used to distribute requests to receiv
+-----------------------+
```

The intention is that the load balancer can just distribute requests randomly to all Thanos receivers independent of the tenant. Should this turn out to cause problems, either an additional “distribution” layer can be inserted that performs this routing or a load balancer per hard tenant hashring can be introduced. The distribution layer would have the advantage of being able to keep a single endpoint for configuration of Prometheus servers for remote write, which could be convenient for users.
The intention is that the load balancer can distribute requests randomly to all Thanos receivers independent of the tenant. Should this turn out to cause problems, a distribution layer can be created with additional Thanos receivers whose names do not appear in the configuration file can be used to perform routing of requests. These receivers will forward all requests to the correct receiver in each hashring without storing any data themselves. Alternatively, a load balancer per hard tenant hashring can be introduced. The distribution layer would have the advantage of being able to keep a single endpoint for configuration of Prometheus servers for remote write, which could be convenient for users.

### Replication

The Thanos receiver supports replication of received time-series to other receivers in the same hashring. The replication factor is controlled by setting a flag on the receivers and indicates the maximum number of copies of any time-series that should be stored in the hashring. If any time-series in a write request received by a Thanos receiver is not successfully written to at least `(REPLICATION_FATOR + 1)/2` nodes, the receiver responds with an error. For example, to attempt to store 3 copies of every time-series and ensure that every time-series is successfully written to at least 2 Thanos receivers in the target hashring, all receivers should be configured with the following flag:

```
--receive.replication-factor=3
```

Thanos receivers identify the replica number of a write request via a 0-indexed uint64 contained in an HTTP header. The header name can be configured via a flag:

```
--receive.replica-header=THANOS-REPLICA
```

If the header is present in a request, the receiver will look for the replica-th node of the hashring that should handle the request. If it is the receiver itself, then the request is stored locally, else it is forwarded to the correct endpoint. If the replica number of the request exceeds the configured replication factor or the total number of nodes in the target hashring, the receiver responds with an error. If the header is not present in a request, then the receiver will replicate the request to `REPLCIATION_FACTOR` nodes, setting the replica header on each request to ensure it is not replicated further.

Note that replicating write requests may require additional compaction and deduplication of object storage as well as significantly increase infrastructure cost.

### Rollout/scaling/failure of receiver nodes

As replication is based on the Prometheus write-ahead-log and retries when the remote write backend is not available, intermediate downtime is tolerable and expected for receivers. Prometheus remote write treats 503 as temporary failures and continues do retry until the remote write receiving end responds again.
Prometheus remote write will retry whenever the remote write backend is not available, thus intermediate downtime is tolerable and expected for receivers. Prometheus remote write treats 503 as temporary failures and continues do retry until the remote write receiving end responds again. If this ingestion downtime is not acceptable, then a replication factor of 3 or more should be specified, ensuring that a write request is accepted in its entirety by at least 2 replicas. This way we can ensure there is no downtime of ingestion.

On rollouts receivers do not need to re-shard data, but instead at shutdown in case of rollout or scaling flush the write-ahead-log to a Prometheus tsdb block and upload it to object storage. Rollouts that include a soft tenant being promoted to a hard tenant, does require all nodes of a hash-ring to upload its content as the hash-ring changes. When the nodes comes back and accepts remote write requests again, the tenant local Prometheus server will continue where it left off. When scaling, all nodes need to perform the above operation as the hashring is resized meaning all nodes will have a new distribution. In the case of a failure, and the hashring is not resized, it will load the write-ahead-log and assume where it left off. Partially succeeding requests return a 503 causing Prometheus to retry the full request. This works as identically existing timestamp-value matches are ignored by tsdb. Prometheus relies on this to de-duplicate federation request results, therefore it is safe to rely on this here as well.

Expand All @@ -178,7 +204,6 @@ Decisions of the design have consequences some of which will show themselves in
* This proposal describes the write-ahead-log based remote write, which is not (yet) merged in Prometheus: https://github.com/prometheus/prometheus/pull/4588. This landing may impact durability characteristics.
- While there is work left the pull request seems to be close to completion
* For compaction to work as described in this proposal, vertical compaction in tsdb needs to be possible. Implemented but not merged yet: https://github.com/prometheus/tsdb/pull/370
* If downtime of ingestion, as in the described `503` for intermediate downtime, and Prometheus resuming when the backend becomes healthy again, turns out not to be an option, we could attempt to duplicate all write requests to 3 (or configurable amount) replicas, where a write request needs to be accepted by at least 2 replicas, this way we can ensure no downtime of ingestion. This may require additional compaction and deduplication of object storage as well as significantly increase infrastructure cost.
* Additional safeguards may need to be put in place to ensure that hashring resizes do not occur on failed nodes, only once they have recovered and have successfully uploaded their blocks.

[xxhash]: http://cyan4973.github.io/xxHash/
Expand Down
Loading

0 comments on commit 70ba420

Please sign in to comment.