Skip to content

SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP#1576

Merged
mynameborat merged 1 commit intoapache:masterfrom
lakshmi-manasa-g:elasticity-factor-intro
Feb 4, 2022
Merged

SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP#1576
mynameborat merged 1 commit intoapache:masterfrom
lakshmi-manasa-g:elasticity-factor-intro

Conversation

@lakshmi-manasa-g
Copy link
Contributor

Feature: Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.

  • This is achieved by having elastic tasks which is the same as a task for all practical purposes. But an elastic task consumes only a subset of the messages of an SSP.
  • With an elasticity factor F (integer), the number of elastic tasks will be F times N with N = original task count.
  • The F elastic tasks per original task all consume subsets of same SSP as the original task. There will be F subsets (aka key bucket) per SSP and a message falls into an SSP bucket 'i' if its message.key.hash()%F == i.

Changes:

  1. introduce the config for enabling elasticity as job.elasticity.factor. If the job without elasticity has N tasks then with factor = F > 1, there will be F times N (elastic) tasks
  2. Add "key bucket" (an integer ranging 0-F) to SSP which will identify the messages within the SSP
  3. Compute the key bucket the IncomingMessageEnvelope falls into given elasticity factor F.
  4. SamzaObjectMapper changes to serde keyBucket component of SSP.

Tests: updated unit tests and added new ones.

API Changes: no public API changes

Upgrade Instructions: N/A

Usage Instructions: set the config job.elasticity.factor > 1 to enable elasticity for the job.

super(system, stream);
this.partition = partition;
this.hash = computeHashCode();
this.keyBucket = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not default the keyBucket to just 0 or 1 however you are basing the bucket index? Cut shorts all the additional handling of -1 and so-on and also logically makes sense that in the event of keyBucket < 1 there is one bucket to which all the of the messages belonging to the task is assigned to.

Copy link
Contributor

@rayman7718 rayman7718 Jan 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, keyBucket can just be 0 by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed that keeping keyBucket = 0 when there is no elasticity makes this particular SSP class simpler.

my reason for having it to -1 was as follows.
making keyBucket = 0 and removing the check keyBucket == -1 means every time SSP is printed into logs and task model, it will be `SystemStreamPartition [system, stream, partition, 0] with no elasticity enabled which can be confusing.

this check of keyBucket-1 will be restricted to this class and hence i think it is better to have this additional check rather than having to introduce SSP containing 0 in all places.

pl lmk if this makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during the second pr #1580 dev, i realized that there is a bigger problem with having keyBucket=0 in the ssp.tostring when elasticity is not enabled.

GroupBySystemStreamPartition uses ssp.tostring as the taksName and task level metrics (like sends etc) use the task name. keeping keyBucket=0 in all ssp.tostring even when elasticity is not enabled will result in loss of metrics history.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But cant you still have the !=0 check in toString to preserve existing behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is doable but imo doesnt differ much from the current !=-1 check. pl lmk if you feel strongly about this and i can update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How exactly is this doable? is your bucket based on 1-index is it? when you have elasticity enabled, one of the bucket will have 0 as the index and hence its representation would be different from others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right.currently my indexes are 0-based and hence using -1 as the default when elasticity is off.

if i choose keyBucket=0 for elasticity off, then i will have to start my elasticity enabled indexes from 1,2.. but this does not differ from default when elasticity off = -1 and start indexes from 0,1,2..

return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
}

public boolean getElasticityEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the question why getElasticityEnabled is needed when we have getElasticityFactor?
callers wanting to know if elasticity is enabled need not and should not be required to know that elasticity.factor = 1 means disabled and >1 means enabled.

Copy link
Contributor

@rayman7718 rayman7718 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of minor comments, feel free to fix and ship

final int prime = 31;
int result = super.hashCode();
result = prime * result + ((partition == null) ? 0 : partition.hashCode());
result = prime * result + ((keyBucket == -1) ? 0 : keyBucket);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized this hashing function yields the same hash regardless of elasticity enabled or not for bucket 0 vs -1 respectively.

I feel you could treat 0 as default and modify the toString() to include keyBucket regardless and just regenerate dashboard or make this is API break change in metrics name explicitly as we are anyways going to run into it.

Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync'd up with Rayman! I am fine with sentinel value being either -1 or 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants