From 5d2cc7ba682f6bdb42220c0f249d5a81f04b5c31 Mon Sep 17 00:00:00 2001 From: zhchen Date: Tue, 10 Dec 2019 10:17:43 -0800 Subject: [PATCH 1/6] [GOBBLIN-1001] Implement TimePartitionGlobFinder --- .../dataset/DefaultFileSystemGlobFinder.java | 12 +- .../dataset/EmptyFileSystemDataset.java | 10 ++ .../dataset/SimpleFileSystemDataset.java | 25 ++++ .../dataset/TimePartitionGlobFinder.java | 131 ++++++++++++++++++ 4 files changed, 167 insertions(+), 11 deletions(-) create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java index b3c7805d29c..c94e8d4b5d9 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java @@ -33,16 +33,6 @@ public DefaultFileSystemGlobFinder(FileSystem fs, Properties properties) throws } public FileSystemDataset datasetAtPath(final Path path) throws IOException { - return new FileSystemDataset() { - @Override - public Path datasetRoot() { - return path; - } - - @Override - public String datasetURN() { - return path.toString(); - } - }; + return new SimpleFileSystemDataset(path); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java new file mode 100644 index 00000000000..ebc997115aa --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java @@ -0,0 +1,10 @@ +package org.apache.gobblin.data.management.dataset; + +import org.apache.hadoop.fs.Path; + + +public class EmptyFileSystemDataset extends SimpleFileSystemDataset { + public EmptyFileSystemDataset(Path path) { + super(path); + } +} diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java new file mode 100644 index 00000000000..08b8b8e63bb --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java @@ -0,0 +1,25 @@ +package org.apache.gobblin.data.management.dataset; + +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.dataset.FileSystemDataset; + + +public class SimpleFileSystemDataset implements FileSystemDataset { + + private final Path path; + + public SimpleFileSystemDataset(Path path) { + this.path = path; + } + + @Override + public Path datasetRoot() { + return path; + } + + @Override + public String datasetURN() { + return path.toString(); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java new file mode 100644 index 00000000000..f5798bf8402 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java @@ -0,0 +1,131 @@ +package org.apache.gobblin.data.management.dataset; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder; +import org.apache.gobblin.dataset.DatasetsFinder; +import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.util.PathUtils; + + +@Slf4j +public class TimePartitionGlobFinder implements DatasetsFinder { + private static final String CONF_PREFIX = "timePartitionGlobFinder."; + + public static final String PARTITION_PREFIX = CONF_PREFIX + "partitionPrefix"; + public static final String TIME_FORMAT = CONF_PREFIX + "timeFormat"; + public static final String TIME_ZONE = CONF_PREFIX + "timeZone"; + public static final String ENABLE_EMPTY_PARTITION = CONF_PREFIX + "enableEmptyPartition"; + + private static final String DEFAULT_TIME_ZONE = "America/Los_Angeles"; + + private final String datasetPattern; + private final String datasetPartitionPattern; + private final Path yesterdayPartition; + + private final Properties props; + private final FileSystem fs; + private final boolean enableEmptyPartition; + + public TimePartitionGlobFinder(FileSystem fs, Properties properties) { + datasetPattern = properties.getProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY); + Path datasetPartitionPath = new Path(datasetPattern); + + String partitionPrefix = properties.getProperty(PARTITION_PREFIX, ""); + String timeFormat = properties.getProperty(TIME_FORMAT).trim(); + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(timeFormat); + datasetPartitionPattern = new Path(datasetPartitionPath, + partitionPrefix + derivePartitionPattern(timeFormat)).toString(); + log.info("Dataset partition pattern is {}", datasetPartitionPattern); + + ZonedDateTime curTime = ZonedDateTime.now(ZoneId.of(properties.getProperty(TIME_ZONE, DEFAULT_TIME_ZONE))); + yesterdayPartition = new Path(partitionPrefix + timeFormatter.format(curTime.minusDays(1))); + log.info("Yesterday partition is {}", yesterdayPartition); + + enableEmptyPartition = Boolean.valueOf(properties.getProperty(ENABLE_EMPTY_PARTITION)); + + props = properties; + this.fs = fs; + } + + /** + * Derive partition glob pattern from time format. For example: + *

given {@code timePattern=yyyy/MM/dd}, return '*'/'*'/'*' (adding single quote to avoid breaking comments) + *

given {@code timePattern=yyyy-MM-dd}, return * + */ + private String derivePartitionPattern(String timeFormat) { + String[] parts = timeFormat.split("/"); + StringBuilder pattern = new StringBuilder("*"); + for (int i = 1; i < parts.length; i++) { + pattern.append("/*"); + } + return pattern.toString(); + } + + @Override + public List findDatasets() + throws IOException { + try { + return doFindDatasets(); + } finally { + // Recover ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY config + this.props.setProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY, datasetPattern); + } + } + + private List doFindDatasets() throws IOException { + if (!enableEmptyPartition) { + log.info("Will not create empty file system dataset"); + return findDatasets(datasetPartitionPattern); + } + + // Find datasets + List datasets = findDatasets(datasetPattern); + // Identify yesterday dataset partitions + Set yesterdayDatasetPartitions = new HashSet<>(); + datasets.forEach(dataset -> yesterdayDatasetPartitions.add(createYesterdayDatasetPartition(dataset))); + + // Find all dataset time partitions + List datasetPartitions = findDatasets(datasetPartitionPattern); + for (FileSystemDataset datasetPartition : datasetPartitions) { + // Remove yesterday dataset partitions already created + yesterdayDatasetPartitions.remove(datasetPartition.datasetRoot()); + } + + // Create the remaining ones + yesterdayDatasetPartitions.forEach(path -> { + log.info("Creating empty yesterday partition {}", path); + datasetPartitions.add(new EmptyFileSystemDataset(path)); + }); + + return datasetPartitions; + } + + private List findDatasets(String pattern) + throws IOException { + this.props.setProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY, pattern); + DefaultFileSystemGlobFinder datasetFinder = new DefaultFileSystemGlobFinder(this.fs, this.props); + return datasetFinder.findDatasets(); + } + + private Path createYesterdayDatasetPartition(FileSystemDataset dataset) { + return new Path(dataset.datasetRoot(), yesterdayPartition); + } + + @Override + public Path commonDatasetRoot() { + return PathUtils.deepestNonGlobPath(new Path(this.datasetPattern)); + } +} From 06403c7e7b9222d5ebabfb284192731926876539 Mon Sep 17 00:00:00 2001 From: zhchen Date: Tue, 10 Dec 2019 10:22:29 -0800 Subject: [PATCH 2/6] Add test file and apache licenses --- .../dataset/EmptyFileSystemDataset.java | 17 +++ .../dataset/SimpleFileSystemDataset.java | 17 +++ .../dataset/TimePartitionGlobFinder.java | 17 +++ .../TimePartitionedGlobFinderTest.java | 142 ++++++++++++++++++ 4 files changed, 193 insertions(+) create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java index ebc997115aa..a2e43ea55f1 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gobblin.data.management.dataset; import org.apache.hadoop.fs.Path; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java index 08b8b8e63bb..2b94a52422b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gobblin.data.management.dataset; import org.apache.hadoop.fs.Path; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java index f5798bf8402..07f6c7f56ff 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gobblin.data.management.dataset; import java.io.IOException; diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java new file mode 100644 index 00000000000..665a5171b1e --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.dataset; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Paths; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.apache.gobblin.dataset.FileSystemDataset; + + +public class TimePartitionedGlobFinderTest { + + private Path testRootDir; + private FileSystem localFs; + private DateTime startTime; + + @BeforeClass + private void setup() + throws IOException { + localFs = FileSystem.getLocal(new Configuration()); + testRootDir = new Path(Paths.get("").toAbsolutePath().toString(), + getClass().getSimpleName()); + if (localFs.exists(testRootDir)) { + localFs.delete(testRootDir, true); + } + localFs.mkdirs(testRootDir); + localFs.deleteOnExit(testRootDir); + startTime = DateTime.now(DateTimeZone.forID("America/Los_Angeles")); + } + + @Test + public void testDayPartitions() + throws IOException { + String hourlyFormat = "yyyy/MM/dd/HH"; + String hourlyPrefix = "hourly/"; + String dayFormat = "yyyy/MM/dd"; + + // create an empty dataset /db1/table1/hourly + Path ds1 = createDatasetPath("db1/table1"); + // create dataset /db2/table2/hourly + Path ds2 = createDatasetPath("db2/table2"); + createPartitions(ds2, hourlyPrefix,1, 2, hourlyFormat); + createPartitions(ds2, hourlyPrefix,2, 2, hourlyFormat); + + String datasetPattern = new Path(testRootDir, "*/*").toString(); + + // Test glob finder without creating empty partition + Properties props = new Properties(); + props.setProperty("gobblin.dataset.pattern", datasetPattern); + props.setProperty("timePartitionGlobFinder.partitionPrefix", hourlyPrefix); + props.setProperty("timePartitionGlobFinder.timeFormat", dayFormat); + TimePartitionGlobFinder finder = new TimePartitionGlobFinder(localFs, props); + List datasets = finder.findDatasets(); + Assert.assertEquals(datasets.size(), 2); + // Verify there are 2 day partitions for /db2/table2 + Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, 1, dayFormat), datasets)); + Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, 2, dayFormat), datasets)); + + // Test glob finder with creating empty partition + props.setProperty("timePartitionGlobFinder.enableEmptyPartition", "true"); + finder = new TimePartitionGlobFinder(localFs, props); + datasets = finder.findDatasets(); + Assert.assertEquals(datasets.size(), 3); + // Verify yesterday partition for /db1/table1 + FileSystemDataset dt1Dataset = find(getPartitionPath(ds1, hourlyPrefix, 1, dayFormat), datasets); + Assert.assertNotNull(dt1Dataset); + Assert.assertTrue(dt1Dataset instanceof EmptyFileSystemDataset); + } + + private Path getPartitionPath(Path dataset, String prefix, int dayOffset, String format) { + DateTimeFormatter formatter = DateTimeFormat.forPattern(format); + return new Path(dataset, prefix + formatter.print(startTime.minusDays(dayOffset))); + } + + private FileSystemDataset find(Path path, List list) { + for (FileSystemDataset dataset : list) { + if (dataset.datasetRoot().equals(path)) { + return dataset; + } + } + return null; + } + + private Path createDatasetPath(String dataset) + throws IOException { + Path datasetPath = new Path(testRootDir, dataset); + localFs.mkdirs(datasetPath); + return datasetPath; + } + + private void createPartitions(Path dataset, String prefix, int dayOffset, int hours, String format) + throws IOException { + DateTimeFormatter formatter = DateTimeFormat.forPattern(format); + DateTime dayTime = startTime.minusDays(dayOffset); + for (int i = 0; i < hours; i++) { + Path hourPath = new Path(formatter.print(dayTime.withHourOfDay(i))); + Path datasetPartitionPath = new Path(dataset, prefix + hourPath); + Path dataFile = new Path(datasetPartitionPath, "dataFile"); + try (OutputStream outputStream = localFs.create(dataFile, true)) { + outputStream.write(i); + } + } + } + + @AfterClass + private void cleanup() + throws IOException { + if (localFs != null && localFs.exists(testRootDir)) { + localFs.delete(testRootDir, true); + } + } +} From de6c2da8e40babd109075297bbf42feef4434ea7 Mon Sep 17 00:00:00 2001 From: zhchen Date: Fri, 13 Dec 2019 17:27:31 -0800 Subject: [PATCH 3/6] A more general TimePartitionGlobFinder --- .../verify/PinotAuditCountVerifierTest.java | 14 +- .../dataset/EmptyFileSystemDataset.java | 27 --- .../dataset/SimpleFileSystemDataset.java | 10 + .../dataset/TimePartitionGlobFinder.java | 210 ++++++++++++++---- .../org/apache/gobblin/time/TimeIterator.java | 75 +++++++ .../TimePartitionedGlobFinderTest.java | 104 ++++++--- 6 files changed, 331 insertions(+), 109 deletions(-) delete mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java index f9c855ab7a1..2c4372feee9 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java @@ -32,6 +32,7 @@ import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.FileSystemDataset; /** @@ -51,17 +52,8 @@ public void testTier() throws Exception { final String inputSub = "hourly"; final String outputSub = "hourly"; TestAuditCountClient client = new TestAuditCountClient(); - FileSystemDataset dataset = new FileSystemDataset() { - @Override - public Path datasetRoot() { - return new Path (input + topic + inputSub + "/2017/04/03/10"); - } - - @Override - public String datasetURN() { - return input + topic + inputSub + "/2017/04/03/10"; - } - }; + FileSystemDataset dataset = new SimpleFileSystemDataset( + new Path(input + topic + inputSub + "/2017/04/03/10")); State props = new State(); props.setProp (CompactionAuditCountVerifier.PRODUCER_TIER, PRODUCER_TIER); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java deleted file mode 100644 index a2e43ea55f1..00000000000 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/EmptyFileSystemDataset.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.dataset; - -import org.apache.hadoop.fs.Path; - - -public class EmptyFileSystemDataset extends SimpleFileSystemDataset { - public EmptyFileSystemDataset(Path path) { - super(path); - } -} diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java index 2b94a52422b..68d1561c058 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java @@ -25,9 +25,15 @@ public class SimpleFileSystemDataset implements FileSystemDataset { private final Path path; + private final boolean isVirtual; public SimpleFileSystemDataset(Path path) { + this(path, false); + } + + public SimpleFileSystemDataset(Path path, boolean isVirtual) { this.path = path; + this.isVirtual = isVirtual; } @Override @@ -39,4 +45,8 @@ public Path datasetRoot() { public String datasetURN() { return path.toString(); } + + public boolean getIsVirtual() { + return isVirtual; + } } \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java index 07f6c7f56ff..46d9d9a043e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java @@ -18,79 +18,185 @@ package org.apache.gobblin.data.management.dataset; import java.io.IOException; +import java.time.Duration; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.regex.Pattern; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder; import org.apache.gobblin.dataset.DatasetsFinder; import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.time.TimeIterator; import org.apache.gobblin.util.PathUtils; +/** + * A {@link TimePartitionGlobFinder} finds all dataset time partitions within time window + * [current time - look back time, current time]. It derives an efficient dataset partition pattern based + * on the time window and a supported {@value #TIME_FORMAT}. + * + *

If {@value #ENABLE_VIRTUAL_PARTITION} is set, it will create virtual {@link SimpleFileSystemDataset} + * instances if a partition within the time window doesn't exist + */ @Slf4j public class TimePartitionGlobFinder implements DatasetsFinder { private static final String CONF_PREFIX = "timePartitionGlobFinder."; public static final String PARTITION_PREFIX = CONF_PREFIX + "partitionPrefix"; public static final String TIME_FORMAT = CONF_PREFIX + "timeFormat"; + public static final String ENABLE_VIRTUAL_PARTITION = CONF_PREFIX + "enableVirtualPartition"; + /** + * Options are enumerated in {@link org.apache.gobblin.time.TimeIterator.Granularity} + */ + public static final String GRANULARITY = CONF_PREFIX + "granularity"; public static final String TIME_ZONE = CONF_PREFIX + "timeZone"; - public static final String ENABLE_EMPTY_PARTITION = CONF_PREFIX + "enableEmptyPartition"; + public static final String LOOKBACK_SPEC = CONF_PREFIX + "lookbackSpec"; private static final String DEFAULT_TIME_ZONE = "America/Los_Angeles"; + private static final Pattern SUPPORTED_TIME_FORMAT = Pattern.compile("(yyyy/MM(/.*)*)|(yyyy-MM(-.*)*)"); + private final String datasetPattern; private final String datasetPartitionPattern; - private final Path yesterdayPartition; + private final String partitionPrefix; + private final DateTimeFormatter timeFormatter; + private final boolean enableVirtualPartition; + + private final ZonedDateTime startTime; + private final ZonedDateTime endTime; + private final TimeIterator.Granularity granularity; private final Properties props; private final FileSystem fs; - private final boolean enableEmptyPartition; public TimePartitionGlobFinder(FileSystem fs, Properties properties) { + this(fs, properties, + ZonedDateTime.now(ZoneId.of(properties.getProperty(TIME_ZONE, DEFAULT_TIME_ZONE)))); + } + + @VisibleForTesting + TimePartitionGlobFinder(FileSystem fs, Properties properties, ZonedDateTime curTime) { datasetPattern = properties.getProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY); - Path datasetPartitionPath = new Path(datasetPattern); + Path datasetPath = new Path(datasetPattern); - String partitionPrefix = properties.getProperty(PARTITION_PREFIX, ""); + partitionPrefix = properties.getProperty(PARTITION_PREFIX, ""); String timeFormat = properties.getProperty(TIME_FORMAT).trim(); - DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(timeFormat); - datasetPartitionPattern = new Path(datasetPartitionPath, - partitionPrefix + derivePartitionPattern(timeFormat)).toString(); - log.info("Dataset partition pattern is {}", datasetPartitionPattern); + Preconditions.checkState(isTimeFormatSupported(timeFormat), + String.format("Unsupported time format %s, expecting %s", timeFormat, SUPPORTED_TIME_FORMAT)); + timeFormatter = DateTimeFormatter.ofPattern(timeFormat); - ZonedDateTime curTime = ZonedDateTime.now(ZoneId.of(properties.getProperty(TIME_ZONE, DEFAULT_TIME_ZONE))); - yesterdayPartition = new Path(partitionPrefix + timeFormatter.format(curTime.minusDays(1))); - log.info("Yesterday partition is {}", yesterdayPartition); + endTime = curTime; + Duration lookback = Duration.parse(properties.getProperty(LOOKBACK_SPEC)); + startTime = endTime.minus(lookback); + granularity = TimeIterator.Granularity.valueOf(properties.getProperty(GRANULARITY).toUpperCase()); - enableEmptyPartition = Boolean.valueOf(properties.getProperty(ENABLE_EMPTY_PARTITION)); + datasetPartitionPattern = new Path(datasetPath, + partitionPrefix + derivePartitionPattern(startTime, endTime, timeFormat)).toString(); + log.info("Dataset partition pattern is {}", datasetPartitionPattern); + + enableVirtualPartition = Boolean.valueOf(properties.getProperty(ENABLE_VIRTUAL_PARTITION, "false")); props = properties; this.fs = fs; } /** - * Derive partition glob pattern from time format. For example: - *

given {@code timePattern=yyyy/MM/dd}, return '*'/'*'/'*' (adding single quote to avoid breaking comments) - *

given {@code timePattern=yyyy-MM-dd}, return * + * The finder supports time format matching {@link #SUPPORTED_TIME_FORMAT} + */ + @VisibleForTesting + static boolean isTimeFormatSupported(String timeFormat) { + return SUPPORTED_TIME_FORMAT.matcher(timeFormat).matches(); + } + + /** + * Derive partition glob pattern from time format. It tries its best to provide + * a fine pattern by refining year and month options from reasoning + * start time, end time and {@link #SUPPORTED_TIME_FORMAT} */ - private String derivePartitionPattern(String timeFormat) { - String[] parts = timeFormat.split("/"); - StringBuilder pattern = new StringBuilder("*"); - for (int i = 1; i < parts.length; i++) { - pattern.append("/*"); + @VisibleForTesting + static String derivePartitionPattern(ZonedDateTime start, + ZonedDateTime end, String timeFormat) { + // Refine year options + int startYear = start.getYear(); + int endYear = end.getYear(); + StringBuilder yearOptions = new StringBuilder("{" + startYear); + appendOptions(yearOptions, startYear + 1, endYear); + yearOptions.append("}"); + + // Get month options + StringBuilder monthOptions = buildMonthOptions(start, end); + + StringBuilder pattern = new StringBuilder(yearOptions); + if (timeFormat.contains("-")) { + pattern.append("-"); + pattern.append(monthOptions); + // + if (!monthOptions.toString().equals("*")) { + pattern.append("*"); + } + } else { + pattern.append("/"); + pattern.append(monthOptions); + String[] parts = timeFormat.split("/"); + // We already processed year and month components + for (int i = 2; i < parts.length; i++) { + pattern.append("/*"); + } } + return pattern.toString(); } + /** + * Refine month options + */ + private static StringBuilder buildMonthOptions(ZonedDateTime start, + ZonedDateTime end) { + int startMonth = start.getMonthValue(); + int endMonth = end.getMonthValue(); + int yearDiff = end.getYear() - start.getYear(); + if ( yearDiff > 1 || (yearDiff == 1 && endMonth > startMonth)) { + // All 12 months + return new StringBuilder("*"); + } + StringBuilder monthOptions = new StringBuilder("{" + startMonth); + if (endMonth > startMonth) { + appendOptions(monthOptions, startMonth + 1, endMonth); + } else { + // from [startMonth + 1, 12] of start year + appendOptions(monthOptions, startMonth + 1, 12); + // from [1, endMonth] of current year + appendOptions(monthOptions, 1, endMonth); + } + monthOptions.append("}"); + return monthOptions; + } + + private static void appendOptions(StringBuilder stringBuilder, int start, int end) { + for (int i = start; i <= end; i++) { + stringBuilder.append(","); + if (i < 10) { + stringBuilder.append("0"); + } + stringBuilder.append(i); + } + } + @Override public List findDatasets() throws IOException { @@ -103,31 +209,49 @@ public List findDatasets() } private List doFindDatasets() throws IOException { - if (!enableEmptyPartition) { - log.info("Will not create empty file system dataset"); - return findDatasets(datasetPartitionPattern); - } - // Find datasets List datasets = findDatasets(datasetPattern); - // Identify yesterday dataset partitions - Set yesterdayDatasetPartitions = new HashSet<>(); - datasets.forEach(dataset -> yesterdayDatasetPartitions.add(createYesterdayDatasetPartition(dataset))); - - // Find all dataset time partitions - List datasetPartitions = findDatasets(datasetPartitionPattern); - for (FileSystemDataset datasetPartition : datasetPartitions) { - // Remove yesterday dataset partitions already created - yesterdayDatasetPartitions.remove(datasetPartition.datasetRoot()); + + // Compute partitions in theory based on startTime and endTime + Set computedPartitions = new HashSet<>(); + datasets.forEach(dataset -> computedPartitions.addAll(computePartitions(dataset))); + + // This is the final result + List resultPartitions = new ArrayList<>(computedPartitions.size()); + + // Find all physical dataset time partitions + List actualPartitions = findDatasets(datasetPartitionPattern); + + String pathStr; + for (FileSystemDataset physicalPartition : actualPartitions) { + pathStr = physicalPartition.datasetRoot().toString(); + if (computedPartitions.contains(pathStr)) { + resultPartitions.add(physicalPartition); + computedPartitions.remove(pathStr); + } + } + + // Create virtual ones; + if (enableVirtualPartition) { + computedPartitions.forEach(partition -> { + log.info("Creating virtual partition {}", partition); + resultPartitions.add(new SimpleFileSystemDataset(new Path(partition), true)); + }); + } else { + log.info("Will not create virtual partitions"); } - // Create the remaining ones - yesterdayDatasetPartitions.forEach(path -> { - log.info("Creating empty yesterday partition {}", path); - datasetPartitions.add(new EmptyFileSystemDataset(path)); - }); + return resultPartitions; + } - return datasetPartitions; + private Collection computePartitions(FileSystemDataset dataset) { + List partitions = new ArrayList<>(); + TimeIterator iterator = new TimeIterator(startTime, endTime, granularity); + while (iterator.hasNext()) { + partitions.add(new Path(dataset.datasetRoot(), + partitionPrefix + timeFormatter.format(iterator.next())).toString()); + } + return partitions; } private List findDatasets(String pattern) @@ -137,10 +261,6 @@ private List findDatasets(String pattern) return datasetFinder.findDatasets(); } - private Path createYesterdayDatasetPartition(FileSystemDataset dataset) { - return new Path(dataset.datasetRoot(), yesterdayPartition); - } - @Override public Path commonDatasetRoot() { return PathUtils.deepestNonGlobPath(new Path(this.datasetPattern)); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java new file mode 100644 index 00000000000..4630a7a55dc --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.time; + +import java.time.ZonedDateTime; +import java.util.Iterator; + + +/** + * A {@link TimeIterator} iterates over time points within [{@code startTime}, {@code endTime}]. It + * supports time points in various granularities (See {@link Granularity} + */ +public class TimeIterator implements Iterator { + + public enum Granularity { + MINUTE, HOUR, DAY, MONTH + } + + private ZonedDateTime startTime; + private ZonedDateTime endTime; + private Granularity granularity; + + public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime, Granularity granularity) { + this.startTime = startTime; + this.endTime = endTime; + this.granularity = granularity; + } + + @Override + public boolean hasNext() { + return !startTime.isAfter(endTime); + } + + @Override + public ZonedDateTime next() { + ZonedDateTime dateTime = startTime; + + switch (granularity) { + case MINUTE: + startTime = startTime.plusMinutes(1); + break; + case HOUR: + startTime = startTime.plusHours(1); + break; + case DAY: + startTime = startTime.plusDays(1); + break; + case MONTH: + startTime = startTime.plusMonths(1); + break; + } + + return dateTime; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java index 665a5171b1e..c7cf03bf170 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java @@ -20,17 +20,17 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.junit.Assert; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -42,7 +42,8 @@ public class TimePartitionedGlobFinderTest { private Path testRootDir; private FileSystem localFs; - private DateTime startTime; + private ZoneId zone; + private ZonedDateTime curTime; @BeforeClass private void setup() @@ -55,7 +56,9 @@ private void setup() } localFs.mkdirs(testRootDir); localFs.deleteOnExit(testRootDir); - startTime = DateTime.now(DateTimeZone.forID("America/Los_Angeles")); + zone = ZoneId.of("America/Los_Angeles"); + LocalDateTime localTime = LocalDateTime.of(2019,1,1,0,0); + curTime = ZonedDateTime.of(localTime, zone); } @Test @@ -69,8 +72,8 @@ public void testDayPartitions() Path ds1 = createDatasetPath("db1/table1"); // create dataset /db2/table2/hourly Path ds2 = createDatasetPath("db2/table2"); - createPartitions(ds2, hourlyPrefix,1, 2, hourlyFormat); - createPartitions(ds2, hourlyPrefix,2, 2, hourlyFormat); + createPartitions(ds1, hourlyPrefix,0, 2, hourlyFormat); + createPartitions(ds2, hourlyPrefix,-1, 2, hourlyFormat); String datasetPattern = new Path(testRootDir, "*/*").toString(); @@ -79,33 +82,37 @@ public void testDayPartitions() props.setProperty("gobblin.dataset.pattern", datasetPattern); props.setProperty("timePartitionGlobFinder.partitionPrefix", hourlyPrefix); props.setProperty("timePartitionGlobFinder.timeFormat", dayFormat); - TimePartitionGlobFinder finder = new TimePartitionGlobFinder(localFs, props); + props.setProperty("timePartitionGlobFinder.lookbackSpec", "P2D"); + props.setProperty("timePartitionGlobFinder.granularity", "DAY"); + TimePartitionGlobFinder finder = new TimePartitionGlobFinder(localFs, props, curTime); List datasets = finder.findDatasets(); Assert.assertEquals(datasets.size(), 2); // Verify there are 2 day partitions for /db2/table2 - Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, 1, dayFormat), datasets)); - Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, 2, dayFormat), datasets)); + Assert.assertNotNull(find(getPartitionPath(ds1, hourlyPrefix, 0, dayFormat), datasets)); + Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, -1, dayFormat), datasets)); // Test glob finder with creating empty partition - props.setProperty("timePartitionGlobFinder.enableEmptyPartition", "true"); - finder = new TimePartitionGlobFinder(localFs, props); + props.setProperty("timePartitionGlobFinder.enableVirtualPartition", "true"); + finder = new TimePartitionGlobFinder(localFs, props, curTime); datasets = finder.findDatasets(); - Assert.assertEquals(datasets.size(), 3); - // Verify yesterday partition for /db1/table1 - FileSystemDataset dt1Dataset = find(getPartitionPath(ds1, hourlyPrefix, 1, dayFormat), datasets); - Assert.assertNotNull(dt1Dataset); - Assert.assertTrue(dt1Dataset instanceof EmptyFileSystemDataset); + Assert.assertEquals(datasets.size(), 6); + // Verify virtual partitions for /db1/table1 + Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat), datasets).getIsVirtual()); + Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat), datasets).getIsVirtual()); + // Verify virtual partitions for /db2/table2 + Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat), datasets).getIsVirtual()); + Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat), datasets).getIsVirtual()); } private Path getPartitionPath(Path dataset, String prefix, int dayOffset, String format) { - DateTimeFormatter formatter = DateTimeFormat.forPattern(format); - return new Path(dataset, prefix + formatter.print(startTime.minusDays(dayOffset))); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format); + return new Path(dataset, prefix + formatter.format(curTime.plusDays(dayOffset))); } - private FileSystemDataset find(Path path, List list) { + private SimpleFileSystemDataset find(Path path, List list) { for (FileSystemDataset dataset : list) { if (dataset.datasetRoot().equals(path)) { - return dataset; + return (SimpleFileSystemDataset)dataset; } } return null; @@ -120,10 +127,10 @@ private Path createDatasetPath(String dataset) private void createPartitions(Path dataset, String prefix, int dayOffset, int hours, String format) throws IOException { - DateTimeFormatter formatter = DateTimeFormat.forPattern(format); - DateTime dayTime = startTime.minusDays(dayOffset); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format); + ZonedDateTime dayTime = curTime.plusDays(dayOffset); for (int i = 0; i < hours; i++) { - Path hourPath = new Path(formatter.print(dayTime.withHourOfDay(i))); + Path hourPath = new Path(formatter.format(dayTime.withHour(i))); Path datasetPartitionPath = new Path(dataset, prefix + hourPath); Path dataFile = new Path(datasetPartitionPath, "dataFile"); try (OutputStream outputStream = localFs.create(dataFile, true)) { @@ -132,6 +139,51 @@ private void createPartitions(Path dataset, String prefix, int dayOffset, int ho } } + @Test + public void testDerivePartitionPattern() { + String slashTimeFormat = "yyyy/MM/dd"; + String dashTimeFormat = "yyyy-MM-dd"; + + // Case 1: 2019/11/30 - 2019/12/3 + LocalDateTime localTime = LocalDateTime.of(2019,12,3,0,0); + ZonedDateTime end = ZonedDateTime.of(localTime, zone); + ZonedDateTime start = end.withMonth(11).withDayOfMonth(30); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), + "{2019}/{11,12}/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), + "{2019}-{11,12}*"); + + // Case 2: 2018/11/30 - 2019/12/3 + start = end.withYear(2018).withMonth(11).withDayOfMonth(30); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), + "{2018,2019}/*/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), + "{2018,2019}-*"); + + // Case 3: 2018/11/30 - 2019/01/3 + end = end.withMonth(1); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), + "{2018,2019}/{11,12,01}/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), + "{2018,2019}-{11,12,01}*"); + + // Test hourly + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, "yyyy/MM/dd/HH"), + "{2018,2019}/{11,12,01}/*/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, "yyyy-MM-dd-HH"), + "{2018,2019}-{11,12,01}*"); + } + + @Test + public void testSupportedTimeFormat() { + Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy/MM/dd/HH")); + Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy/MM/dd")); + Assert.assertFalse(TimePartitionGlobFinder.isTimeFormatSupported("MM/dd/yyyy")); + Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy-MM-dd")); + Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy-MM-dd-HH")); + Assert.assertFalse(TimePartitionGlobFinder.isTimeFormatSupported("MM-dd-yyyy")); + } + @AfterClass private void cleanup() throws IOException { From f8158652f1950731ba9775156c1a0906178650dd Mon Sep 17 00:00:00 2001 From: zhchen Date: Tue, 17 Dec 2019 17:24:28 -0800 Subject: [PATCH 4/6] Comment DateRangeIterator --- .../management/copy/TimeAwareRecursiveCopyableDataset.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java index 56f772a7ee1..4802d6255a4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java @@ -84,6 +84,10 @@ public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Propertie } } + /** + * TODO: Replace it with {@link org.apache.gobblin.time.TimeIterator} as {@link LocalDateTime} will not adjust time + * to a given time zone + */ public static class DateRangeIterator implements Iterator { private LocalDateTime startDate; private LocalDateTime endDate; From f8f9c54b78a690cec56aeec322ba61ee9377bb2e Mon Sep 17 00:00:00 2001 From: zhchen Date: Fri, 20 Dec 2019 10:21:03 -0800 Subject: [PATCH 5/6] Fix derive partition bugs --- .../dataset/TimePartitionGlobFinder.java | 4 ++-- .../TimePartitionedGlobFinderTest.java | 22 +++++++++++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java index 46d9d9a043e..8a707cce37e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java @@ -170,12 +170,12 @@ private static StringBuilder buildMonthOptions(ZonedDateTime start, int startMonth = start.getMonthValue(); int endMonth = end.getMonthValue(); int yearDiff = end.getYear() - start.getYear(); - if ( yearDiff > 1 || (yearDiff == 1 && endMonth > startMonth)) { + if ( yearDiff > 1 || (yearDiff == 1 && endMonth >= startMonth)) { // All 12 months return new StringBuilder("*"); } StringBuilder monthOptions = new StringBuilder("{" + startMonth); - if (endMonth > startMonth) { + if (endMonth >= startMonth) { appendOptions(monthOptions, startMonth + 1, endMonth); } else { // from [startMonth + 1, 12] of start year diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java index c7cf03bf170..fca8bb36d1f 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java @@ -144,23 +144,37 @@ public void testDerivePartitionPattern() { String slashTimeFormat = "yyyy/MM/dd"; String dashTimeFormat = "yyyy-MM-dd"; - // Case 1: 2019/11/30 - 2019/12/3 + // 2019/12/1 - 2019/12/3 LocalDateTime localTime = LocalDateTime.of(2019,12,3,0,0); ZonedDateTime end = ZonedDateTime.of(localTime, zone); - ZonedDateTime start = end.withMonth(11).withDayOfMonth(30); + ZonedDateTime start = end.withDayOfMonth(1); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), + "{2019}/{12}/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), + "{2019}-{12}*"); + + // 2019/11/30 - 2019/12/3 + start = end.withMonth(11).withDayOfMonth(30); Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), "{2019}/{11,12}/*"); Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), "{2019}-{11,12}*"); - // Case 2: 2018/11/30 - 2019/12/3 + // 2018/12/1 - 2019/12/3 + start = end.withYear(2018).withMonth(12).withDayOfMonth(1); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), + "{2018,2019}/*/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), + "{2018,2019}-*"); + + // 2018/11/30 - 2019/12/3 start = end.withYear(2018).withMonth(11).withDayOfMonth(30); Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), "{2018,2019}/*/*"); Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), "{2018,2019}-*"); - // Case 3: 2018/11/30 - 2019/01/3 + // 2018/11/30 - 2019/01/3 end = end.withMonth(1); Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), "{2018,2019}/{11,12,01}/*"); From 2d5c8f47f21b83055d66b61373f8571aa5435602 Mon Sep 17 00:00:00 2001 From: zhchen Date: Fri, 20 Dec 2019 10:41:11 -0800 Subject: [PATCH 6/6] Add comments --- .../data/management/dataset/SimpleFileSystemDataset.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java index 68d1561c058..338596c47ff 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java @@ -22,6 +22,10 @@ import org.apache.gobblin.dataset.FileSystemDataset; +/** + * A basic implementation of {@link FileSystemDataset}. It can represent a virtual + * file system dataset which doesn't have a physical file/folder + */ public class SimpleFileSystemDataset implements FileSystemDataset { private final Path path; @@ -46,6 +50,9 @@ public String datasetURN() { return path.toString(); } + /** + * @return true if the dataset doesn't have a physical file/folder + */ public boolean getIsVirtual() { return isVirtual; }