Skip to content

Commit

Permalink
[GOBBLIN-573] Add option to use finer level granularity at the hour l…
Browse files Browse the repository at this point in the history
…evel for TimeAwareDatasetfinder

Closes #2438 from sv2000/hourly
  • Loading branch information
sv2000 authored and htran1 committed Aug 31, 2018
1 parent 749b5bd commit ef438c8
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 34 deletions.
Expand Up @@ -17,51 +17,58 @@

package org.apache.gobblin.data.management.copy;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.testng.Assert;

import com.google.common.collect.Lists;

public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset {
private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive";
public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern";
public static final String LOOKBACK_DAYS_KEY = CONFIG_PREFIX + ".lookback.days";
public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + ".lookback.time";

private final Integer lookbackDays;
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
private final boolean isPatternHourly;

public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) {
super(fs, rootPath, properties, glob);
this.lookbackDays = Integer.parseInt(properties.getProperty(LOOKBACK_DAYS_KEY));
this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
this.isPatternHourly = isDatePatternHourly(datePattern);

//Daily directories cannot have a "hourly" lookback pattern. But hourly directories can accept lookback pattern with days.
if (!this.isPatternHourly) {
Assert.assertTrue(isLookbackTimeStringDaily(this.lookbackTime), "Expected day format for lookback time; found hourly format");
}
}

public static class DateRangeIterator implements Iterator {
private LocalDateTime startDate;
private LocalDateTime endDate;
private boolean isDatePatternHourly;

public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, String datePattern) {
public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, boolean isDatePatternHourly) {
this.startDate = startDate;
this.endDate = endDate;
this.isDatePatternHourly = isDatePatternHourly(datePattern);
}

private boolean isDatePatternHourly(String datePattern) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
LocalDateTime refDateTime = LocalDateTime.of(2017, 01, 01, 10, 0, 0);
String refDateTimeString = refDateTime.format(formatter);
LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHour(0);
String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.format(formatter);
return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
this.isDatePatternHourly = isDatePatternHourly;
}

@Override
Expand All @@ -72,7 +79,7 @@ public boolean hasNext() {
@Override
public LocalDateTime next() {
LocalDateTime dateTime = startDate;
startDate = isDatePatternHourly ? startDate.plusHours(1) : startDate.plusDays(1);
startDate = this.isDatePatternHourly ? startDate.plusHours(1) : startDate.plusDays(1);
return dateTime;
}

Expand All @@ -82,15 +89,35 @@ public void remove() {
}
}

private boolean isDatePatternHourly(String datePattern) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
String refDateTimeString = refDateTime.toString(formatter);
LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
}

private boolean isLookbackTimeStringDaily(String lookbackTime) {
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
try {
periodFormatter.parsePeriod(lookbackTime);
return true;
} catch (Exception e) {
return false;
}
}

@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime endDate = LocalDateTime.now();
LocalDateTime startDate = endDate.minusDays(lookbackDays);
DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, datePattern);
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);

DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, isPatternHourly);
List<FileStatus> fileStatuses = Lists.newArrayList();
while (dateRangeIterator.hasNext()) {
Path pathWithDateTime = new Path(path, dateRangeIterator.next().format(formatter));
Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
}
return fileStatuses;
Expand Down
Expand Up @@ -17,8 +17,9 @@

package org.apache.gobblin.data.management.copy;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.joda.time.LocalDateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -29,28 +30,28 @@ public class DateRangeIteratorTest {

@Test
public void testIterator() {
LocalDateTime endDate = LocalDateTime.of(2017, 1, 1, 0, 0, 0);
LocalDateTime endDate = new LocalDateTime(2017, 1, 1, 0, 0, 0);
LocalDateTime startDate = endDate.minusHours(2);
String datePattern = "HH/yyyy/MM/dd";
DateTimeFormatter format = DateTimeFormatter.ofPattern(datePattern);
DateTimeFormatter format = DateTimeFormat.forPattern(datePattern);
TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, datePattern);
new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, true);
LocalDateTime dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.format(format), "22/2016/12/31");
Assert.assertEquals(dateTime.toString(format), "22/2016/12/31");
dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.format(format), "23/2016/12/31");
Assert.assertEquals(dateTime.toString(format), "23/2016/12/31");
dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.format(format), "00/2017/01/01");
Assert.assertEquals(dateTime.toString(format), "00/2017/01/01");
Assert.assertEquals(dateRangeIterator.hasNext(), false);

datePattern = "yyyy/MM/dd";
format = DateTimeFormatter.ofPattern(datePattern);
format = DateTimeFormat.forPattern(datePattern);
startDate = endDate.minusDays(1);
dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, datePattern);
dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, false);
dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.format(format), "2016/12/31");
Assert.assertEquals(dateTime.toString(format), "2016/12/31");
dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.format(format), "2017/01/01");
Assert.assertEquals(dateTime.toString(format), "2017/01/01");
Assert.assertEquals(dateRangeIterator.hasNext(), false);
}
}
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.HiddenFilter;

public class TimeAwareRecursiveCopyableDatasetTest {
private FileSystem fs;
private Path baseDir1;
private Path baseDir2;

private static final String NUM_LOOKBACK_DAYS_STR = "2d";
private static final Integer NUM_LOOKBACK_DAYS = 2;
private static final String NUM_LOOKBACK_HOURS_STR = "4h";
private static final Integer NUM_LOOKBACK_HOURS = 4;
private static final Integer MAX_NUM_DAILY_DIRS = 4;
private static final Integer MAX_NUM_HOURLY_DIRS = 48;
private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h";
private static final Integer NUM_DAYS_HOURS_DIRS = 25;

@BeforeClass
public void setUp() throws IOException {
Assert.assertTrue(NUM_LOOKBACK_DAYS < MAX_NUM_DAILY_DIRS);
Assert.assertTrue(NUM_LOOKBACK_HOURS < MAX_NUM_HOURLY_DIRS);

this.fs = FileSystem.getLocal(new Configuration());

baseDir1 = new Path("/tmp/src/ds1/hourly");
if (fs.exists(baseDir1)) {
fs.delete(baseDir1, true);
}
fs.mkdirs(baseDir1);

baseDir2 = new Path("/tmp/src/ds1/daily");
if (fs.exists(baseDir2)) {
fs.delete(baseDir2, true);
}
fs.mkdirs(baseDir2);
PeriodFormatter formatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
}

@Test
public void testGetFilesAtPath() throws IOException {
String datePattern = "yyyy/MM/dd/HH";
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);

LocalDateTime endDate = LocalDateTime.now();

Set<String> candidateFiles = new HashSet<>();
for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
String startDate = endDate.minusHours(i).toString(formatter);
Path subDirPath = new Path(baseDir1, new Path(startDate));
fs.mkdirs(subDirPath);
Path filePath = new Path(subDirPath, i + ".avro");
fs.create(filePath);
if (i < (NUM_LOOKBACK_HOURS + 1)) {
candidateFiles.add(filePath.toString());
}
}

//Lookback time = "4h"
Properties properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_HOURS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH");

PathFilter pathFilter = new HiddenFilter();
TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
new Path("/tmp/src/*/hourly"));
List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);

Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_HOURS + 1);

for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}

//Lookback time = "1d1h"
properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH");
dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
new Path("/tmp/src/*/hourly"));
fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);
candidateFiles = new HashSet<>();
datePattern = "yyyy/MM/dd/HH";
formatter = DateTimeFormat.forPattern(datePattern);

for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
String startDate = endDate.minusHours(i).toString(formatter);
Path subDirPath = new Path(baseDir1, new Path(startDate));
Path filePath = new Path(subDirPath, i + ".avro");
if (i < NUM_DAYS_HOURS_DIRS + 1) {
candidateFiles.add(filePath.toString());
}
}

Assert.assertEquals(fileStatusList.size(), NUM_DAYS_HOURS_DIRS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}

//Lookback time = "2d"
datePattern = "yyyy/MM/dd";
formatter = DateTimeFormat.forPattern(datePattern);
endDate = LocalDateTime.now();

candidateFiles = new HashSet<>();
for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
String startDate = endDate.minusDays(i).toString(formatter);
Path subDirPath = new Path(baseDir2, new Path(startDate));
fs.mkdirs(subDirPath);
Path filePath = new Path(subDirPath, i + ".avro");
fs.create(filePath);
if (i < (NUM_LOOKBACK_DAYS + 1)) {
candidateFiles.add(filePath.toString());
}
}

properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd");

dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
new Path("/tmp/src/*/daily"));
fileStatusList = dataset.getFilesAtPath(fs, baseDir2, pathFilter);

Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
}

@Test (expectedExceptions = AssertionError.class)
public void testInstantiationError() {
//Daily directories, but look back time has days and hours. We should expect an assertion error.
Properties properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd");

TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
new Path("/tmp/src/*/daily"));
}

@AfterClass
public void clean() throws IOException {
//Delete tmp directories
this.fs.delete(baseDir1, true);
this.fs.delete(baseDir2, true);
}
}

0 comments on commit ef438c8

Please sign in to comment.