New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1011] adjust compaction flow to work with virtual partition #2856
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2856 +/- ##
============================================
+ Coverage 45.57% 45.61% +0.03%
- Complexity 9032 9052 +20
============================================
Files 1908 1909 +1
Lines 71729 71766 +37
Branches 7912 7918 +6
============================================
+ Hits 32693 32738 +45
+ Misses 36031 36018 -13
- Partials 3005 3010 +5
Continue to review full report at Codecov.
|
@@ -46,14 +50,17 @@ | |||
@Slf4j | |||
public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> { | |||
public static final String SERIALIZE_COMPACTION_FILE_PATH_NAME = "compaction-file-path-name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to remove this SERIALIZE_COMPACTION_FILE_PATH_NAME?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
DateTime startTime = result.getTime(); | ||
DateTime endTime = startTime.plusHours(1); | ||
CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); | ||
ZonedDateTime startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(result.getTime().getMillis()), zone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure if we really need to convert to ZoneDateTime first. Seems like current joda DateTime already supports plusMinutes, plusHours, plusDays, plusMonths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because TimeIterator.inc
accepts ZoneDateTime
. I implemented TimeIterator
using java.time as joda time is kind of deprecated by java 8+.
The standard date and time classes prior to Java SE 8 are poor. By tackling this problem head-on, Joda-Time became the de facto standard date and time library for Java prior to Java SE 8. Note that from Java SE 8 onwards, users are asked to migrate to java.time (JSR-310) - a core part of the JDK which replaces this project.
@@ -75,6 +78,8 @@ public String getProgress() { | |||
public void run() { | |||
try { | |||
this.underlyingTask.run(); | |||
} catch (Exception e) { | |||
log.error(String.format("Task %s completed with exception", this.taskContext.getTaskState().getTaskId()), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be an anti-pattern to catch exception if run
method itself is not throwing an exception here. I wonder what is the reason for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that when underlyingTask
threw an exception (in my case, it is an NPE), there was no message at all(the exception got swallowed and didn't get a chance to be logged while bubbling up) and this wrapper continued to the finally
block, marking the task as a success.
@@ -473,6 +475,10 @@ public void dropTableIfExists(String dbName, String tableName) throws IOExceptio | |||
public void dropPartitionIfExists(String dbName, String tableName, List<Column> partitionKeys, | |||
List<String> partitionValues) throws IOException { | |||
try (AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) { | |||
if (client.get().getPartition(dbName, tableName, partitionValues) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Could you clean up the catch block on
NoSuchObjectException
is you decide to use existence check instead of relying on the exception to determine there's no such object ? - I am not familiar with this getPartition API but sending a list of partition values but returning single partition object seems to be a little bit strange. Can you double check the semantic ? Or alternative there's a
listPartitions
method which is usually the API that I used for checking existence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nice to catch NoSuchObjectException
, getPartition
also throws NoSuchObjectException
.
My understanding is that a partition has multiple partition columns and each value in the list corresponds a column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verified with a test job that we also need this catch exception to detect partition not exist..
|
||
public Builder withPartitionKeys(List<Column> partitionKeys) { | ||
this.partitionKeys = partitionKeys; | ||
return this; | ||
} | ||
|
||
public Builder withTableParameters(Map<String, String> tableParameters) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's reason that TableParameters are not included in the builder: We internally use some timestamp and record count in the table parameters. Including these values in the table object could cause diff-check failure and issue much more updatePartition call in the hive metastore. Could you double check with the potential write-amplification to relevant team ?
protected State state; | ||
protected CompactionJobConfigurator configurator; | ||
private static final Gson GSON = GsonInterfaceAdapter.getGson(FileSystemDataset.class); | ||
private static final String SERIALIZED_DATASET = "compaction.serializedDataset"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove old key name if you decide to replace it with a new name, also why this change ?
.setConfiguration(TimePartitionGlobFinder.ENABLE_VIRTUAL_PARTITION, "true"); | ||
|
||
JobExecutionResult result = embeddedGobblin.run(); | ||
Assert.assertTrue(result.isSuccessful()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we need to verify the contents of execution beyond simply verifying if the exception is successful ?
@@ -130,7 +127,6 @@ public String datasetURN() { | |||
* @return a map-reduce job which will compact files against {@link org.apache.gobblin.dataset.Dataset} | |||
*/ | |||
public Job createJob (FileSystemDataset dataset) throws IOException { | |||
configurator = CompactionJobConfigurator.instantiateConfigurator(this.state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the configurator has to be instantiated lazily. Check optionalInit
method which could inject overwriting properties in the state object.
@@ -75,6 +78,8 @@ public String getProgress() { | |||
public void run() { | |||
try { | |||
this.underlyingTask.run(); | |||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For compaction job, we already have logs in MRTask. Maybe we should propagate the exception back to GobblinMultiTaskAttempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -73,6 +74,11 @@ public CompactionCompleteFileOperationAction (State state, CompactionJobConfigur | |||
* Create a record count file containing the number of records that have been processed . | |||
*/ | |||
public void onCompactionJobComplete (FileSystemDataset dataset) throws IOException { | |||
if (dataset instanceof SimpleFileSystemDataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extending the FileSystemDataset interface with an isVirtual() method that defaults to false to avoid this specific instanceof and casting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Closes apache#2856 from zxcware/comp2
Closes apache#2856 from zxcware/comp2
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
CompactionVerifier
s andCompactionCompleteAction
s to work with virtual simple file system dataset proplyFileSystemDataset
inCompactionSuiteBase
Tests
TimeIteratorTest
covers functions inTimeIterator
AvroCompactionTaskTest.testCompactVirtualDataset
covers existing compaction constructs can handle virtualSimpleFileSystemDataset
correctlyHiveMetaStoreUtilsTest.testGetTableAvro
covers table parameters are loaded correctlyCommits