Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1001] Implement TimePartitionGlobFinder #2846

Closed
wants to merge 6 commits into from

Conversation

zxcware
Copy link
Contributor

@zxcware zxcware commented Dec 10, 2019

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR:
    • A TimePartitionGlobFinder has the capability to create empty file system dataset if a partition has empty data

Tests

  • My PR adds the following unit tests:
    • TimePartitionGlobFinder.testDayPartition covers an empty day partition dataset is created when TimePartitionGlobFinder.enableEmptyPartition is true.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-io
Copy link

codecov-io commented Dec 10, 2019

Codecov Report

Merging #2846 into master will decrease coverage by 0.02%.
The diff coverage is 88.33%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #2846      +/-   ##
============================================
- Coverage     45.59%   45.57%   -0.03%     
- Complexity     8984     9031      +47     
============================================
  Files          1904     1908       +4     
  Lines         71347    71729     +382     
  Branches       7876     7912      +36     
============================================
+ Hits          32534    32692     +158     
- Misses        35806    36031     +225     
+ Partials       3007     3006       -1
Impacted Files Coverage Δ Complexity Δ
...gement/copy/TimeAwareRecursiveCopyableDataset.java 91.17% <ø> (ø) 15 <0> (ø) ⬇️
...anagement/dataset/DefaultFileSystemGlobFinder.java 100% <100%> (+100%) 2 <1> (+2) ⬆️
...ain/java/org/apache/gobblin/time/TimeIterator.java 63.15% <63.15%> (ø) 4 <4> (?)
...ta/management/dataset/SimpleFileSystemDataset.java 88.88% <88.88%> (ø) 4 <4> (?)
...ta/management/dataset/TimePartitionGlobFinder.java 93.4% <93.4%> (ø) 23 <23> (?)
...rg/apache/gobblin/salesforce/SalesforceSource.java 19.81% <0%> (-2.95%) 12% <0%> (+1%)
...in/java/org/apache/gobblin/cluster/HelixUtils.java 32.71% <0%> (-2.81%) 11% <0%> (-1%)
...service/modules/orchestration/DagManagerUtils.java 85.54% <0%> (-1.21%) 37% <0%> (ø)
...main/java/org/apache/gobblin/yarn/YarnService.java 14.84% <0%> (-0.85%) 3% <0%> (-1%)
...nversion/hive/query/HiveAvroORCQueryGenerator.java 64.51% <0%> (-0.27%) 85% <0%> (+1%)
... and 31 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0a1debc...2d5c8f4. Read the comment docs.

Copy link
Contributor

@autumnust autumnust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually why not using existing Glob finder with your time-partition pattern instead of inventing another finder and did you search through other alternatives ?
I don't remember exactly the finder but I have rough impression something else serve the same purpose, just curious

@zxcware
Copy link
Contributor Author

zxcware commented Dec 11, 2019

@autumnust It does use, compositely use instead of inheriting, an existing GlobFinder, which is DefaultFileSystemGlobFinder.

@autumnust
Copy link
Contributor

@autumnust It does use, compositely use instead of inheriting, an existing GlobFinder, which is DefaultFileSystemGlobFinder.

OK Gotcha. If enableEmptyPartition is not enabled, it does nothing more than just setting a different pattern using DefaultFileSystemGlobFinder. If it does, it somehow creates empty dataset for yesterday's partitions. This doesn't seem to be a generic finder to me but serving a very specific purpose. Would you consider doing it as a separated extension for GlobFinder internally instead of open-source?

@zxcware
Copy link
Contributor Author

zxcware commented Dec 11, 2019

@autumnust Yeah, yesterdayPartition is really specific, I'm thinking about generalize it to enforcePreviousN(looking for better name suggestions) partitions. Its main responsibility is to create EmptyFileSystemDataset if any of the previous N doesn't exist, signaling quiet time. In addition, it focuses on time partitions and supports different time formats(not limitted to yyyy/MM/dd) compared to vanilla DefaultFileSystemGlobFinder. (I'm adding comments about it s usage)

By enforcePreviousN, it's tied with company requirements even less and makes it more justifiable to open-source. In our use case, we capture the quiet time signal to publish compaction watermark. It can be captured by others to do different operations.

Another consideration was we have to make internal copies of open source compaction constructs(MRTask, Verifier, CompactionAction), if EmptyFileSystemDataset is made internal. Compared to make EmptyFileSystemDataset first citizen of open source compaction flow, the implementation and mountainous cost of internalization is high, given most of our pipelines use open source compaction constructs

Copy link
Contributor

@autumnust autumnust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline about the context and use-case, agreed on the approach to implement.
Several comments.

@@ -33,16 +33,6 @@ public DefaultFileSystemGlobFinder(FileSystem fs, Properties properties) throws
}

public FileSystemDataset datasetAtPath(final Path path) throws IOException {
return new FileSystemDataset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more than one places that having this anonymous class impl. Shall we refactor them all if you would like to replace it with the real impl. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed PinotAuditCountVerifierTest. Its usage in CompactionSuiteBase will be replaced by a new ser/de mechanism.

import org.apache.hadoop.fs.Path;


public class EmptyFileSystemDataset extends SimpleFileSystemDataset {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an overkill if only thing you want is a different class. Shall we add a flag field within SimpleFileSystemDataset to identify an emptyDataset instead ?

import org.apache.gobblin.dataset.FileSystemDataset;


public class SimpleFileSystemDataset implements FileSystemDataset {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc for the reasons why we need it



@Slf4j
public class TimePartitionGlobFinder implements DatasetsFinder<FileSystemDataset> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc to differentiate it with DefaultFileSystemGlobFinder is you decide not to override

datasets.forEach(dataset -> yesterdayDatasetPartitions.add(createYesterdayDatasetPartition(dataset)));

// Find all dataset time partitions
List<FileSystemDataset> datasetPartitions = findDatasets(datasetPartitionPattern);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this code block be optimized to be a single expression like :
datasetPartitions.addAll(yesterdaypartitions.stream().allMatch(x->!datasetPartitions.contains(x)).map(x -> createEmptyFileDataset)) ?

@zxcware
Copy link
Contributor Author

zxcware commented Dec 14, 2019

@autumnust I generalized the finder which supports look back and will create a virtual partition dataset for any partition within the look back window that doesn't have a physical folder

Copy link
Contributor

@autumnust autumnust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments

ZonedDateTime.now(ZoneId.of(properties.getProperty(TIME_ZONE, DEFAULT_TIME_ZONE))));
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I learned before, this annotation is not helping you testing methods and it doesn't even make the annotated method loaded in JVM: https://stackoverflow.com/questions/24051476/guava-visiblefortesting-help-me-with-a-complete-example it is mostly used for documentation purpose.

Can you double-check if you need it?

Copy link
Contributor Author

@zxcware zxcware Dec 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I learned it and meant to use it for documentation purpose if we haven't found an alternative way

List<FileSystemDataset> actualPartitions = findDatasets(datasetPartitionPattern);

String pathStr;
for (FileSystemDataset physicalPartition : actualPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this for-loop a intersection operation of actualPartitions and computedPartitions with a tweak on element type ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. The loop also results in the diff between computedPartitions and actualPartitions

Copy link
Contributor

@autumnust autumnust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall

@@ -46,6 +50,9 @@ public String datasetURN() {
return path.toString();
}

/**
* @return true if the dataset doesn't have a physical file/folder
*/
public boolean getIsVirtual() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it sound more intuitive if isVirtual ?

Copy link
Contributor

@htran1 htran1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@asfgit asfgit closed this in 0700092 Dec 21, 2019
@zxcware zxcware deleted the comp branch February 7, 2020 01:11
haojiliu pushed a commit to haojiliu/incubator-gobblin that referenced this pull request Apr 9, 2020
jhsenjaliya pushed a commit to jhsenjaliya/incubator-gobblin that referenced this pull request Apr 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants