New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Making dataset name as part of staging directory in gobblin-distcp (f… #3054
Conversation
@@ -104,6 +106,7 @@ | |||
private final Options.Rename renameOptions; | |||
private final FileContext fileContext; | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this empty line if being added accidentally ?
@@ -393,6 +393,7 @@ | |||
public static final String SIMPLE_WRITER_DELIMITER = "simple.writer.delimiter"; | |||
public static final String SIMPLE_WRITER_PREPEND_SIZE = "simple.writer.prepend.size"; | |||
public static final String WRITER_ADD_TASK_TIMESTAMP = WRITER_PREFIX + ".addTaskTimestamp"; | |||
public static final String DATASET_DEFINED_STAGING_DIR_FLAG = "dataset.defined.staging.dir.flag"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we rename it ? since it is not clear to me what does "dataset.defined" mean here. Also, Let's avoid words like flag since it is usually ambiguous.
} else { | ||
this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get()) | ||
: WriterUtils.getWriterStagingDir(state, numBranches, branchId); | ||
} | ||
|
||
this.outputDir = getOutputDir(state); | ||
this.copyableDatasetMetadata = | ||
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
|
||
if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { | ||
this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); | ||
} else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think there's something missing here.
Please take a look at org.apache.gobblin.util.WriterUtils#getWriterStagingDir(org.apache.gobblin.configuration.State, int, int)
method, which is called below. The determination of a staging directory is not totally based on user-defined configuration, but some runtime behavior.
Specifically within the getWriterStagingDir
, the root path of staging dir comes from org.apache.gobblin.configuration.ConfigurationKeys#WRITER_STAGING_DIR
which is a conf set dynamically in the gobblin job. In the method it also contains logic to differentiate different staging directory for different forks. The current change you made will lost that feature.
We can sync offline on this if that could help.
Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString2); | ||
|
||
//testing dataset defined staging directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split this large test into several smaller unit tests that are focusing on specific use case? Then it will be easier to understand what are we testing and expecting. The common logic can be moved to a separate methods that we call explicitly. TestNG can also call them before each test, if we put @BeforeTest/@BeforeClass annotation on them.
//testing dataset defined staging directory | ||
CopyableDatasetMetadata metadata2 = new CopyableDatasetMetadata(new TestCopyableDataset(new Path(testRootDir + "/db1"))); | ||
WorkUnitState state3 = TestUtils.createTestWorkUnitState(); | ||
state3.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a missing space here. You can run reformat code command in IntelliJ to fix all the code style issues. If you want to fix only your code, you can first select it in the editor, and then run the command.
|
||
if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { | ||
this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); | ||
} else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { | ||
this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can pass multiple arguments to Path to concatenate them. It's better to avoid adding path strings to each other, because you can end up with multiple of no slashes in the path. For example, depending on user input, you can end up with "/first/second", "/firstsecond" or "/first//second".
@@ -88,6 +89,7 @@ | |||
public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false; | |||
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit"; | |||
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false; | |||
public static final String STAGING_DIR_SUFFIX = "/staging"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of things we need to keep in mind regarding this folder:
- Since we are going to put those in every dataset folder, we need to have a way to tell apart folders that are part of the dataset vs our temporary storage places.
- If our folder name starts with the dot ".", it will be hidden by default in UI and CLI tools, so users wouldn't be distracted by those folders.
- We'll need to have a way to find all such folders and delete them from time to time. If job gets interrupted and does not clean up after itself, some automation would need to go and delete old temporary files. Otherwise those abandoned temp files will consume all of the storage space.
I think we can go with ".tmp" to ensure that this folder is hidden and is clearly marked as temporary.
|
||
if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { | ||
this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); | ||
} else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { | ||
this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
URN is a standard with a special format like "urn:linkedin:data". Looks like we'll have a path and not a URN here. I wonder if there are other methods that will return data set location, or whenever we can rename this method to something like getDatasetURI() or getDatasetURL(), depending on what we can get here. See also the discussion on the differences between URI/URL/URN - https://stackoverflow.com/a/28865728
37bac1d
to
098354f
Compare
098354f
to
f99b612
Compare
…ile-based)
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Making dataset name as part of staging directory in gobblin-distcp (file-based)
Description
Currently, Embedded distcp creates it's own temp or user defined staging directory. But, this PR adds the option to have a staging directory which is being created within datasets.
Tests
My PR incorporates change to the following unit tests:
FileAwareInputStreamDataWriterTest.java
Commits