Skip to content

Commit

Permalink
Add system fields to input sources.
Browse files Browse the repository at this point in the history
Main changes:

1) The SystemField enum defines system fields "__file_uri", "__file_path",
   and "__file_bucket". They are associated with each input entity.

2) The SystemFieldInputSource interface can be added to any InputSource
   to make it system-field-capable. It sets up serialization of a list
   of configured "systemFields" in the JSON form of the input source, and
   provides a method getSystemFieldValue for computing the value of each
   system field. Cloud object, HDFS, HTTP, and Local now have this.
  • Loading branch information
gianm committed Oct 30, 2023
1 parent 7379477 commit c03328b
Show file tree
Hide file tree
Showing 46 changed files with 1,411 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ public Predicate<Throwable> getRetryCondition()
{
return OssUtils.RETRYABLE;
}

CloudObjectLocation getObject()
{
return object;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.data.input.aliyun;

import com.aliyun.oss.OSS;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -31,23 +30,22 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.storage.aliyun.OssInputDataConfig;
import org.apache.druid.storage.aliyun.OssStorageDruidModule;
import org.apache.druid.storage.aliyun.OssUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -80,10 +78,11 @@ public OssInputSource(
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("objectGlob") @Nullable String objectGlob,
@JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields,
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
)
{
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob, systemFields);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
Preconditions.checkNotNull(client, "client");
this.inputSourceConfig = inputSourceConfig;
Expand Down Expand Up @@ -164,10 +163,28 @@ public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<Lis
null,
split.get(),
getObjectGlob(),
systemFields,
getOssInputSourceConfig()
);
}

@Override
public Object getSystemFieldValue(InputEntity entity, SystemField field)
{
final OssEntity s3Entity = (OssEntity) entity;

switch (field) {
case URI:
return s3Entity.getUri().toString();
case BUCKET:
return s3Entity.getObject().getBucket();
case PATH:
return s3Entity.getObject().getPath();
default:
return null;
}
}

@Override
public int hashCode()
{
Expand Down Expand Up @@ -198,30 +215,8 @@ public String toString()
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", objectGlob=" + getObjectGlob() +
(systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) +
", ossInputSourceConfig=" + getOssInputSourceConfig() +
'}';
}

private Iterable<OSSObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<OSSObjectSummary> iterator = OssUtils.objectSummaryIterator(
clientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength()
);

// Skip files that didn't match glob filter.
if (StringUtils.isNotBlank(getObjectGlob())) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());

iterator = Iterators.filter(
iterator,
object -> m.matches(Paths.get(object.getKey()))
);
}

return iterator;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
Expand All @@ -54,6 +55,8 @@
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
Expand Down Expand Up @@ -83,6 +86,7 @@
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -153,10 +157,33 @@ public void testSerdeWithUris() throws Exception
null,
null,
null,
null,
null
);
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
Assert.assertEquals(Collections.emptySet(), serdeWithUris.getConfiguredSystemFields());
}

@Test
public void testSerdeWithUrisAndSystemFields() throws Exception
{
final OssInputSource withUris = (OssInputSource) new OssInputSource(
OSSCLIENT,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
null,
new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)),
null
);
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
Assert.assertEquals(
EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH),
serdeWithUris.getConfiguredSystemFields()
);
}

@Test
Expand All @@ -169,6 +196,7 @@ public void testSerdeWithPrefixes() throws Exception
PREFIXES,
null,
null,
null,
null
);
final OssInputSource serdeWithPrefixes =
Expand All @@ -186,6 +214,7 @@ public void testSerdeWithObjects() throws Exception
null,
EXPECTED_LOCATION,
null,
null,
null
);
final OssInputSource serdeWithPrefixes =
Expand All @@ -210,6 +239,7 @@ public void testInputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCre
null,
EXPECTED_LOCATION,
null,
null,
mockConfigPropertiesWithoutKeyAndSecret
);
Assert.assertNotNull(withPrefixes);
Expand All @@ -230,6 +260,7 @@ public void testSerdeOssClientLazyInitializedWithCrediential() throws Exception
null,
EXPECTED_LOCATION,
null,
null,
CLOUD_CONFIG_PROPERTIES
);
final OssInputSource serdeWithPrefixes =
Expand All @@ -250,6 +281,7 @@ public void testSerdeOssClientLazyInitializedWithoutCrediential() throws Excepti
null,
EXPECTED_LOCATION,
null,
null,
null
);
final OssInputSource serdeWithPrefixes =
Expand All @@ -268,6 +300,7 @@ public void testSerdeWithExtraEmptyLists() throws Exception
ImmutableList.of(),
EXPECTED_LOCATION,
null,
null,
null
);
final OssInputSource serdeWithPrefixes =
Expand All @@ -287,6 +320,7 @@ public void testSerdeWithInvalidArgs()
PREFIXES,
EXPECTED_LOCATION,
null,
null,
null
);
}
Expand All @@ -303,6 +337,7 @@ public void testSerdeWithOtherInvalidArgs()
PREFIXES,
ImmutableList.of(),
null,
null,
null
);
}
Expand All @@ -319,6 +354,7 @@ public void testSerdeWithOtherOtherInvalidArgs()
PREFIXES,
EXPECTED_LOCATION,
null,
null,
null
);
}
Expand All @@ -340,6 +376,7 @@ public void testWithUrisSplit()
null,
null,
null,
null,
null
);

Expand Down Expand Up @@ -367,6 +404,7 @@ public void testWithPrefixesSplit()
PREFIXES,
null,
null,
null,
null
);

Expand Down Expand Up @@ -394,6 +432,7 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint()
PREFIXES,
null,
null,
null,
null
);

Expand Down Expand Up @@ -424,6 +463,7 @@ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects()
PREFIXES,
null,
null,
null,
null
);

Expand Down Expand Up @@ -453,6 +493,7 @@ public void testAccessDeniedWhileListingPrefix()
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null,
null
);

Expand Down Expand Up @@ -484,6 +525,7 @@ public void testReader() throws IOException
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null,
null
);

Expand Down Expand Up @@ -530,6 +572,7 @@ public void testCompressedReader() throws IOException
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
null,
null
);

Expand Down Expand Up @@ -569,12 +612,48 @@ public void testGetTypes()
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
null,
null
);

Assert.assertEquals(ImmutableSet.of(OssStorageDruidModule.SCHEME), inputSource.getTypes());
}

@Test
public void testSystemFields()
{
OssInputSource inputSource = new OssInputSource(
OSSCLIENT,
INPUT_DATA_CONFIG,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)),
null
);

Assert.assertEquals(
EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH),
inputSource.getConfiguredSystemFields()
);

final OssEntity entity = new OssEntity(null, new CloudObjectLocation("foo", "bar"));

Assert.assertEquals("oss://foo/bar", inputSource.getSystemFieldValue(entity, SystemField.URI));
Assert.assertEquals("foo", inputSource.getSystemFieldValue(entity, SystemField.BUCKET));
Assert.assertEquals("bar", inputSource.getSystemFieldValue(entity, SystemField.PATH));
}

@Test
public void testEquals()
{
EqualsVerifier.forClass(OssInputSource.class)
.usingGetClass()
.withIgnoredFields("clientSupplier", "inputDataConfig")
.verify();
}

private static void expectListObjects(URI prefix, List<URI> uris, byte[] content)
{
final ObjectListing result = new ObjectListing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ protected String getPath()
{
return location.getPath();
}

CloudObjectLocation getLocation()
{
return location;
}
}
Loading

0 comments on commit c03328b

Please sign in to comment.