Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A {@link LoadSpec} wrapper that carries partial-projection metadata from the coordinator to a historical alongside
* the original backend-specific load spec. The wrapped {@code delegate} is held as a raw {@link Map} so that the
* concrete backend type (e.g. {@code s3}, {@code local}, {@code hdfs}) is materialized only when needed; this avoids
* pulling backend-specific dependencies onto every node that touches the wire form.
* <p>
* Both {@link #loadSegment(File)} and {@link #openRangeReader()} delegate verbatim to the inner load spec. The
* historical-side partial-load path inspects this wrapper at mount time to learn which projections to range-read and
* the fingerprint identifying the request the coordinator made.
*/
@JsonTypeName(PartialProjectionLoadSpec.TYPE)
public class PartialProjectionLoadSpec implements LoadSpec
{
public static final String TYPE = "partialProjection";

private final Map<String, Object> delegate;
private final List<String> projections;
private final String fingerprint;
private final Supplier<LoadSpec> materializedDelegateSupplier;

@JsonCreator
public PartialProjectionLoadSpec(
@JsonProperty("delegate") Map<String, Object> delegate,
@JsonProperty("projections") List<String> projections,
@JsonProperty("fingerprint") String fingerprint,
@JacksonInject ObjectMapper jsonMapper
)
{
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.delegate = Preconditions.checkNotNull(delegate, "delegate");
Preconditions.checkArgument(
!CollectionUtils.isNullOrEmpty(projections),
"projections must not be null or empty"
);
this.projections = List.copyOf(projections);
this.fingerprint = Preconditions.checkNotNull(fingerprint, "fingerprint");
this.materializedDelegateSupplier = Suppliers.memoize(() -> jsonMapper.convertValue(delegate, LoadSpec.class));
}

@JsonProperty
public Map<String, Object> getDelegate()
{
return delegate;
}

@JsonProperty
public List<String> getProjections()
{
return projections;
}

@JsonProperty
public String getFingerprint()
{
return fingerprint;
}

@Override
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
{
return materializedDelegateSupplier.get().loadSegment(destDir);
}

@Override
@Nullable
public SegmentRangeReader openRangeReader() throws IOException
{
return materializedDelegateSupplier.get().openRangeReader();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartialProjectionLoadSpec that = (PartialProjectionLoadSpec) o;
return Objects.equals(delegate, that.delegate)
&& Objects.equals(projections, that.projections)
&& Objects.equals(fingerprint, that.fingerprint);
}

@Override
public int hashCode()
{
return Objects.hash(delegate, projections, fingerprint);
}

@Override
public String toString()
{
return "PartialProjectionLoadSpec{" +
"delegate=" + delegate +
", projections=" + projections +
", fingerprint=" + fingerprint +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class PartialProjectionLoadSpecTest
{
private static final Map<String, Object> DELEGATE = ImmutableMap.of(
"type", "stub",
"path", "/var/druid/segments/foo"
);
private static final String FINGERPRINT = "v1:abcdef0123456789";

private static ObjectMapper configuredMapper()
{
final ObjectMapper m = new DefaultObjectMapper();
final SimpleModule module = new SimpleModule();
module.registerSubtypes(PartialProjectionLoadSpec.class, StubLoadSpec.class);
m.registerModule(module);
m.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, m));
return m;
}

private final ObjectMapper jsonMapper = configuredMapper();

@Test
void testJsonRoundTrip() throws Exception
{
PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
DELEGATE,
List.of("user_daily", "user_hourly"),
FINGERPRINT,
jsonMapper
);
String json = jsonMapper.writeValueAsString(spec);
LoadSpec reread = jsonMapper.readValue(json, LoadSpec.class);
Assertions.assertInstanceOf(PartialProjectionLoadSpec.class, reread);
Assertions.assertEquals(spec, reread);
}

@Test
void testWireFormHasPartialProjectionType() throws Exception
{
PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
DELEGATE,
List.of("a"),
FINGERPRINT,
jsonMapper
);
Map<String, Object> wireForm = jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
new TypeReference<>()
{
}
);
Assertions.assertEquals("partialProjection", wireForm.get("type"));
Assertions.assertEquals(DELEGATE, wireForm.get("delegate"));
Assertions.assertEquals(List.of("a"), wireForm.get("projections"));
Assertions.assertEquals(FINGERPRINT, wireForm.get("fingerprint"));
}

@Test
void testLoadSegmentDelegatesToInner() throws Exception
{
PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
DELEGATE,
List.of("a"),
FINGERPRINT,
jsonMapper
);
StubLoadSpec.LOAD_CALLS.set(0);
LoadSpec.LoadSpecResult result = spec.loadSegment(new File("/tmp/dest"));
Assertions.assertEquals(1, StubLoadSpec.LOAD_CALLS.get());
Assertions.assertEquals(42L, result.getSize());
}

@Test
void testOpenRangeReaderDelegatesToInner() throws Exception
{
PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
DELEGATE,
List.of("a"),
FINGERPRINT,
jsonMapper
);
StubLoadSpec.RANGE_CALLS.set(0);
SegmentRangeReader reader = spec.openRangeReader();
Assertions.assertNotNull(reader);
Assertions.assertEquals(1, StubLoadSpec.RANGE_CALLS.get());
}

@Test
void testOpenRangeReaderReturnsNullWhenInnerDoesNotSupport() throws Exception
{
PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
ImmutableMap.of("type", "stub", "path", "/", "supportsRange", false),
List.of("a"),
FINGERPRINT,
jsonMapper
);
Assertions.assertNull(spec.openRangeReader());
}

@Test
void testRejectsNullDelegate()
{
Assertions.assertThrows(
NullPointerException.class,
() -> new PartialProjectionLoadSpec(null, List.of("a"), "v1:x", jsonMapper)
);
}

@Test
void testRejectsNullOrEmptyProjections()
{
Assertions.assertThrows(
IllegalArgumentException.class,
() -> new PartialProjectionLoadSpec(DELEGATE, null, "v1:x", jsonMapper)
);
Assertions.assertThrows(
IllegalArgumentException.class,
() -> new PartialProjectionLoadSpec(DELEGATE, List.of(), "v1:x", jsonMapper)
);
}

@Test
void testRejectsNullFingerprint()
{
Assertions.assertThrows(
NullPointerException.class,
() -> new PartialProjectionLoadSpec(DELEGATE, List.of("a"), null, jsonMapper)
);
}

/**
* Stub LoadSpec used to verify delegation. Uses the same JSON "type"=="stub" key as the test {@link #DELEGATE}.
*/
@JsonTypeName("stub")
public static class StubLoadSpec implements LoadSpec
{
static final AtomicInteger LOAD_CALLS = new AtomicInteger(0);
static final AtomicInteger RANGE_CALLS = new AtomicInteger(0);

private final String path;
private final boolean supportsRange;

@JsonCreator
public StubLoadSpec(
@JsonProperty("path") String path,
@JsonProperty("supportsRange") @Nullable Boolean supportsRange
)
{
this.path = path;
this.supportsRange = supportsRange == null || supportsRange;
}

@JsonProperty
public String getPath()
{
return path;
}

@JsonProperty
public boolean isSupportsRange()
{
return supportsRange;
}

@Override
public LoadSpecResult loadSegment(File destDir)
{
LOAD_CALLS.incrementAndGet();
return new LoadSpecResult(42L);
}

@Override
@Nullable
public SegmentRangeReader openRangeReader()
{
if (!supportsRange) {
return null;
}
RANGE_CALLS.incrementAndGet();
return (filename, offset, length) -> new ByteArrayInputStream(new byte[0]);
}
}
}
Loading
Loading