Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLCOMPUTE-1203 | Configure Spark driver pod memory and cores based on Spark args #3892

Merged
merged 10 commits into from
Jun 17, 2024

Conversation

CaptainSame
Copy link
Contributor

No description provided.

@nemacysts
Copy link
Member

nemacysts commented Jun 10, 2024

@CaptainSame is this for the new soaconfigs config style? if so, we probably need to figure out how to reconcile this with the usual way of specifying resources

@CaptainSame
Copy link
Contributor Author

@CaptainSame is this for the new soaconfigs config style? if so, we probably need to figure out how to reconcile this with the usual way of specifying resources

Yes there was a case where cpus and mem were not specified in the soa config but spark.driver.memory and spark.driver.cores were specified and the Spark driver container was started with default resources which were quite low.

Do you think it makes sense to override cpus and mem values with spark.driver.. values if both are specified?
OR
We take the maximum of both sets of values if both are specified?
OR
Ask the users to keep both the values equal and we just read the cpus and mem in soaconfigs like the status quo?

@chi-yelp
Copy link
Contributor

In paasta spark-run, people can use spark configs (spark.driver.cores and spark.driver.memory) to specify driver resources needed. I think it's still important to support doing it in this way. Also I'm a little worried if we allow to use paasta config's cpus and mem for spark jobs, people might get confused for whether they are for Spark driver or executors

Comment on lines 265 to 269
try:
memory_bytes = float(mem)
except ValueError:
print(f"Unable to parse memory value {mem}")
memory_unit = memory_bytes / MEM_MULTIPLIER[unit]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably worth doing some validation in paasta validate / the schema to ensure that this is correctly specified: right now, if there's an issue with the value provided, this will return 0

(also, we probably want to verify that the provided unit is valid too - otherwise the division line will throw an exception when indexing into MEM_MULTIPLIER)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(oh, i see - we hardcode the unit when we call this)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to return a default 2 GB memory

Comment on lines 287 to 296
def get_cpus(self) -> float:
# set Spark driver pod CPU if it is specified by Spark arguments
cpus = 0.0
if (
self.action_spark_config
and "spark.driver.cores" in self.action_spark_config
):
cpus = float(self.action_spark_config["spark.driver.cores"])
# use the soa config otherwise
return cpus or super().get_cpus()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_cpus(self) -> float:
# set Spark driver pod CPU if it is specified by Spark arguments
cpus = 0.0
if (
self.action_spark_config
and "spark.driver.cores" in self.action_spark_config
):
cpus = float(self.action_spark_config["spark.driver.cores"])
# use the soa config otherwise
return cpus or super().get_cpus()
def get_cpus(self) -> float:
if (
self.get_executor() == "spark"
and "spark.driver.cores" in self.action_spark_config
):
return self.action_spark_config["spark.driver.cores"]
# NOTE: we fallback to this if there's no spark.driver.cores config to
# use the paasta default
return super().get_cpus()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that said, maybe we want a different default CPU for spark drivers? the paasta default for both cpu and memory might be too small for drivers :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also, imo we should add spark.driver.cores to the spark_args schema and enforce that it's a float there so that we don't need to remember to cast it in the python code :))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather handle enforce the default for spark.driver.cores and spark.driver.memory in service_configuration_lib than using another default value here.
Even if we enforce the float in schema, it passes through operations in service_configuration_lib and everything is returned as a string

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this to return the resources that are actually used tho (either user-specified or the defaults that are used) - this can call service_configuration_lib code if necessary :)

re: s_c_l: i think it's also worth slowly refactoring that code - it's sorta painful to work with when everything in that library is stringly-typed :)

Comment on lines 298 to 310
def get_mem(self) -> float:
# set Spark driver pod memory if it is specified by Spark arguments
mem_mb = 0.0
if (
self.action_spark_config
and "spark.driver.memory" in self.action_spark_config
):
# need to set mem in MB based on tron schema
mem_mb = spark_tools.get_spark_memory_in_unit(
self.action_spark_config["spark.driver.memory"], "m"
)
# use the soa config otherwise
return mem_mb or super().get_mem()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_mem(self) -> float:
# set Spark driver pod memory if it is specified by Spark arguments
mem_mb = 0.0
if (
self.action_spark_config
and "spark.driver.memory" in self.action_spark_config
):
# need to set mem in MB based on tron schema
mem_mb = spark_tools.get_spark_memory_in_unit(
self.action_spark_config["spark.driver.memory"], "m"
)
# use the soa config otherwise
return mem_mb or super().get_mem()
def get_mem(self) -> float:
if (
self.get_executor() == "spark"
and "spark.driver.memory" in self.action_spark_config
):
return spark_tools.get_spark_memory_in_unit(
self.action_spark_config["spark.driver.memory"], "m"
)
# NOTE: we fallback to this if there's no spark.driver.memory config to
# use the paasta default
return super().get_mem()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(we probably also want to do the same thing i mentioned above re: making the schema ensure that spark.driver.memory is a number :))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and i think we might need to make sure this returns an int? i don't think we allow for fractional mem values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.driver.memory doesn't have to be a number. We allow values allowed by Spark, which are JVM memory strings. I saw that the signature of the original get_mem method returns a float so I kept the same here
Should I change it to return int at both the places??

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: spark.driver.memory: i don't think that would work with the code as-is since we're hardcoding that the unit is "m"

that said, we can still add some schema/paasta validate validation so that we can ensure that we're not getting junk data from users

re: changing the signature: we can probably leave this for another time - i see that flink is using floats for things like mem: 1.5Gi

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, re: unit="m" - i re-read the code with fresh eyes today and realized i'd been misreading it, the code is converting the code to mb :p

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that said, all the pods i'm spot-checking only have integer PAASTA_RESOURCE_MEM values and the flink pods that are setting fractional mem values don't add that env var - so casting to an int here is probably a good idea to keep the data looking the same (mypy shouldn't complain about returning an int from a function typed to return a float from my testing)

paasta_tools/spark_tools.py Outdated Show resolved Hide resolved
paasta_tools/tron_tools.py Outdated Show resolved Hide resolved
error_msgs.append(
f"{self.get_job_name()}.{self.get_action_name()} is a Spark job. `mem` config is not allowed. "
f"Please specify the driver memory using `spark.driver.memory`."
)
Copy link
Contributor

@chi-yelp chi-yelp Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can can add checks in the yelpsoa_configs' pre-commit hook (after this), so users can notice the problem earlier before the next run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably worth adding this to paasta validate now - it'd be essentially dropping this same code in there :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh actually, this is already being called by paasta validate :)

Comment on lines 262 to 264
if mem:
if mem[-1] in MEM_MULTIPLIER:
memory_bytes = float(mem[:-1]) * MEM_MULTIPLIER[mem[-1]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't add validation for the user-provided value here now, then this needs to be wrapped in a try-except as well (since otherwise this will crash if a user puts in "lolk" or 1B as a spark.driver.memory value)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants