Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12333] Changing TimerKey to include TimerFamilyId #14802

Merged
merged 1 commit into from May 17, 2021

Conversation

ajothomas
Copy link
Contributor

@ajothomas ajothomas commented May 12, 2021

SamzaTimerInternalsFactory stores timer state, event time and processing time timers alike, in the form of a MapState<TimerKey<K>, Long>. TimerKey, however, doesn't include the TimerFamilyId and therefore the timer family information is not persisted. On the other hand, event time timers uses an additional time sorted set to persist the TimerData(which contains TimerFamilyId). We need to include timer family id in the TimerKey to ensure that it is persisted for Processing time timers.

This PR aims to add TimerFamilyId to TimerKey. Additionally, I am switching to the use of AutoValue to reduce boilerplate code.


ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

SamzaTimerInternalsFactory stores timer state, event time and processing time timers alike, in the form of a MapState<TimerKey, Long>. TimerKey, however, doesn't include the TimerFamilyId and is therefore not persisted. On the other hand, event time timers uses an additional time sorted set to persist the TimerData(which contains TimerFamilyId). We need to include timer family id in the TimerKey to ensure that it is persisted.

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@ajothomas
Copy link
Contributor Author

@xinyuiscool @kw2542

@ajothomas ajothomas changed the title Changing TimerKey to include TimerFamilyId [BEAM-12333] Changing TimerKey to include TimerFamilyId May 12, 2021
// encode a byte (1/0) to indicate the presence/absence of timerFamilyId
// We can use this approach to add additional fields in the future
if (!Strings.isNullOrEmpty(value.getTimerFamilyId())) {
BOOLEAN_CODER.encode(true, outStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you help me understand why do we need a boolean flag? I suppose inStream.available() > 0 could be sufficient to indicate there will be extra fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just for future proofing in the event we add more fields to TimerKey. TimerFamilyId is optional and in the case it is null/empty, I wanted to encode a flag to indicate the absence of a timer family id. If we add more fields in the future, we can easily decode in a backward compatible way using the flag.

The only way we can probably omit encoding this boolean flag is to encode an empty string ("") whenever family id is not present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, actually we do not need that because when timer family id is not present, we will still encode null in the bytes to actually indicate that timer family id is null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajothomas : I think we should be able to blindly encode timeFamilyId as it's going to be defaulted as blank "". See TimerInternals: line 198. The new TimerDataCoder (TimerDataCoderV2) also encode it directly without the need to do null check. We should be able to follow that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. As long as it defaults to empty string, StringUtf8Coder will be able to encode/decode it. Will remove the boolean flag.

// encode a byte (1/0) to indicate the presence/absence of timerFamilyId
// We can use this approach to add additional fields in the future
if (!Strings.isNullOrEmpty(value.getTimerFamilyId())) {
BOOLEAN_CODER.encode(true, outStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajothomas : I think we should be able to blindly encode timeFamilyId as it's going to be defaulted as blank "". See TimerInternals: line 198. The new TimerDataCoder (TimerDataCoderV2) also encode it directly without the need to do null check. We should be able to follow that.

// check if the stream has more available bytes. This is to ensure backward compatibility with
// old rocksdb state
// which does not encode timer family data
if (inStream.available() > 0 && BOOLEAN_CODER.decode(inStream)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As commented above, we can ignore this bool flag.

@ajothomas ajothomas force-pushed the TimerKeyChanges branch 3 times, most recently from c1c5ecc to 48c17e4 Compare May 17, 2021 21:42
Copy link
Contributor

@kw2542 kw2542 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


return new TimerKey<>(key, namespace, timerId);
return timerKeyBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we do not need to declare the builder at the begining

we can do

return TimerKey.builder().setX().setY().setZ().build();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RIght, it seems nicer to have the builder in the end instead of cutting it in half.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

}

STRING_CODER.encode(value.getTimerFamilyId(), outStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the answer is Yes but just to double check whether we want to encode it if timer family id is an empty string.

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@xinyuiscool xinyuiscool merged commit 67fbf95 into apache:master May 17, 2021
@boyuanzz
Copy link
Contributor

Hi there, this PR breaks https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/. I just filed https://issues.apache.org/jira/browse/BEAM-12358 for tracking and please consider either reverting this change or having a fix.

@xinyuiscool
Copy link
Contributor

thanks, @boyuanzz, we are looking into it.

@boyuanzz
Copy link
Contributor

Thank you : )

dxichen pushed a commit to linkedin/beam that referenced this pull request Aug 9, 2021
dxichen pushed a commit to linkedin/beam that referenced this pull request Sep 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants