Skip to content

Commit

Permalink
Initial support for bootstrap segments.
Browse files Browse the repository at this point in the history
  - Adds a new API in the coordinator.
  - All processes that have storage locations configured (including tasks)
    talk to the coordinator if they can, and fetch bootstrap segments from it.
  - Then load the segments onto the segment cache as part of startup.
  - This addresses the segment bootstrapping logic required by processes before
    they can start serving queries or ingesting.

    This patch also lays the foundation to speed up upgrades.
  • Loading branch information
abhishekrb19 committed Jun 14, 2024
1 parent eb842d3 commit f57cf70
Show file tree
Hide file tree
Showing 21 changed files with 598 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,19 @@ public enum Persona
}

/**
* Category of error. The simplest way to describe this is that it exists as a classification of errors that
* Category of error. The simplest way to describe this is that it exists as a classification of errors that
* enables us to identify the expected response code (e.g. HTTP status code) of a specific DruidException
*/
public enum Category
{
/**
* Means that the exception is being created defensively, because we want to validate something but expect that
* it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* developers that the case being checked is not intended to actually be able to occur in the wild.
*/
DEFENSIVE(500),
/**
* Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* category have messages written either targeting the USER or ADMIN personas as those are the general users
* of the APIs who could generate invalid inputs.
*/
Expand All @@ -340,9 +340,8 @@ public enum Category
* Means that an action that was attempted is forbidden
*/
FORBIDDEN(403),

/**
* Means that the requsted requested resource cannot be found.
* Means that the requested resource cannot be found.
*/
NOT_FOUND(404),
/**
Expand All @@ -357,6 +356,10 @@ public enum Category
* Indicates a server-side failure of some sort at runtime
*/
RUNTIME_FAILURE(500),
/**
* Means that the requested resource could not be found due to a transient unavailability.
*/
UNAVAILABLE(503),
/**
* A timeout happened
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ public JsonParserIterator(
this.hasTimeout = timeoutAt > -1;
}

/**
* Bypasses Jackson serialization to prevent materialization of results from the {@code future} in memory at once.
* A shortened version of {@link #JsonParserIterator(JavaType, Future, String, Query, String, ObjectMapper)}
* where the URL and host parameters, used solely for logging/errors, are not known.
*/
public JsonParserIterator(JavaType typeRef, Future<InputStream> future, ObjectMapper objectMapper)
{
this(typeRef, future, "", null, "", objectMapper);
}

@Override
public boolean hasNext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
Expand Down Expand Up @@ -58,6 +59,12 @@ public interface CoordinatorClient
*/
ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(Set<String> datasources);

/**
* Fetch bootstrap segments from the coordiantor. The results must be streamed back to the caller as the
* result set can be large.
*/
ListenableFuture<CloseableIterator<DataSegment>> fetchBootstrapSegments();

/**
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.coordination.LoadableDataSegment;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
Expand All @@ -42,6 +48,7 @@

public class CoordinatorClientImpl implements CoordinatorClient
{
private static final Logger log = new Logger(CoordinatorClientImpl.class);
private final ServiceClient client;
private final ObjectMapper jsonMapper;

Expand Down Expand Up @@ -156,6 +163,23 @@ public ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(
);
}

@Override
public ListenableFuture<CloseableIterator<DataSegment>> fetchBootstrapSegments()
{
final String path = "/druid/coordinator/v1/metadata/bootstrapSegments";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new InputStreamResponseHandler()
),
in -> new JsonParserIterator<>(
jsonMapper.getTypeFactory().constructType(LoadableDataSegment.class),
Futures.immediateFuture(in),
jsonMapper
)
);
}

@Override
public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ private <T> JsonParserIterator<T> asJsonParserIterator(final InputStream in, fin
return new JsonParserIterator<>(
jsonMapper.getTypeFactory().constructType(clazz),
Futures.immediateFuture(in),
"", // We don't know URL at this point, but it's OK to use empty; it's used for logs/errors
null,
"", // We don't know host at this point, but it's OK to use empty; it's used for logs/errors
jsonMapper
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.segment.loading;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.io.Files;
import org.apache.druid.java.util.common.FileUtils;
Expand Down Expand Up @@ -117,12 +116,6 @@ public boolean delete()

private static final Logger log = new Logger(LocalDataSegmentPuller.class);

@VisibleForTesting
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
getSegmentFiles(getFile(segment), dir);
}

public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.server.coordination;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
Expand Down Expand Up @@ -59,8 +58,7 @@ public LoadableDataSegment(
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JacksonInject PruneSpecsHolder pruneSpecsHolder
@JsonProperty("size") long size
)
{
super(
Expand Down
Loading

0 comments on commit f57cf70

Please sign in to comment.