Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-706]enable dynamic mappers #2576

Closed
wants to merge 6 commits into from
Closed

Conversation

ZihanLi58
Copy link
Contributor

@ZihanLi58 ZihanLi58 commented Mar 21, 2019

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Today the number of mappers is hard coded and almost always reached, however, many mappers do very little work. This tends to cause small files and short jobs that are overhead dominated. The improvement here is to enable user add a job property named target.mapper.size to set the target mapper size and Gobblin will dynamically scale the number of mappers up and down depending on the total load. If the property is not set, the number of mappers will still be the vale of mr.job.max.mappers.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@ZihanLi58
Copy link
Contributor Author

@ibuenros @autumnust @htran1

Copy link
Contributor

@ibuenros ibuenros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also write some basic unit tests for this functionality?

@@ -617,6 +617,7 @@
/** Specifies a static location in HDFS to upload jars to. Useful for sharing jars across different Gobblin runs.*/
public static final String MR_JARS_DIR = "mr.jars.dir";
public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
public static final String TARGET_MAPPER_SIZE = "target.mapper.size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the mr prefix to the key similar to the other configuration keys? e.g. mr.target.mapper.size

double totalEstDataSize = KafkaWorkUnitPacker.getInstance(this, state).getWorkUnitEstSizes(workUnits);
LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.TARGET_MAPPER_SIZE);
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably need to add 1 to this. If totalEstDataSize < targetMapperSize you would end up with 0 mappers :)

int numOfMultiWorkunits =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
if(state.contains(ConfigurationKeys.TARGET_MAPPER_SIZE)) {
double totalEstDataSize = KafkaWorkUnitPacker.getInstance(this, state).getWorkUnitEstSizes(workUnits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It be better to instantiate KafkaWorkUnitPacker once in line 264 then use the same object both here and in line 270.

@@ -362,6 +362,15 @@ public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSou
throw new IllegalArgumentException("WorkUnit packer type " + packerType + " not found");
}
}
public double getWorkUnitEstSizes(Map<String, List<WorkUnit>> workUnitsByTopic){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just use setWorkUnitEstSizes? the logic in here is almost identical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method setWorkUnitEsiSizes is a protected method. So I just create a public method, and this new method just calculating the size instead of setting the property of workUnits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make the other method public. In general, it is a bad idea to have the same logic duplicated in different locations.

Also, please add javadoc to any public methods.

double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize);
numOfMultiWorkunits = numOfMultiWorkunits>maxMapperNum? maxMapperNum:numOfMultiWorkunits;
numOfMultiWorkunits = numOfMultiWorkunits==0? 1: numOfMultiWorkunits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You actually need to add 1 in general. Since integer division truncates down, you always want to add 1. (e.g. total size is 1.99, but targetMapperSize is 1, the current algorithm would use 1 mapper).

LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize);
numOfMultiWorkunits = numOfMultiWorkunits>maxMapperNum? maxMapperNum:numOfMultiWorkunits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can use Math.min(maxMapperNum, numOfMultiWorkunits)

@@ -362,6 +362,15 @@ public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSou
throw new IllegalArgumentException("WorkUnit packer type " + packerType + " not found");
}
}
public double getWorkUnitEstSizes(Map<String, List<WorkUnit>> workUnitsByTopic){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make the other method public. In general, it is a bad idea to have the same logic duplicated in different locations.

Also, please add javadoc to any public methods.

Copy link
Contributor

@ibuenros ibuenros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 @htran1 can you merge?

@asfgit asfgit closed this in bd35490 Mar 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants