Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added new config for job.changelog.system #31

Closed
wants to merge 3 commits into from
Closed

added new config for job.changelog.system #31

wants to merge 3 commits into from

Conversation

sborya
Copy link
Contributor

@sborya sborya commented Dec 9, 2016

SAMZA-1060.
Allow to specify a changelog system separately, so user can only specify stream name for each store.
If user specifies both (system and stream) it overwrites the job.changelog.system setting.

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good. I do suggest adding documentation to the website regarding the new config.

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on @navina's comment on the documentation. LGTM, just one suggestion on an error case.

&& changelogSystem.isDefined) {
// get the system name
Some(changelogSystem.get + "." + systemStream)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it error out if systemStream only has stream name and changelogSystem is not defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was current behavior. I will add this check.

def testIsChangelogSystemSetting {
val configMap = Map[String, String](
FACTORY.format("store1") -> "some.factory.Class",
CHANGELOG_STREAM.format("store1") -> "system1.stream1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, with this change, I think that we will need to update our internal config rewriter as well, to detect all systems used by the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, but I think this change is not required in the open source.

// these values will be combined into <asystem>.<astream>
val systemStream = getOption(CHANGELOG_STREAM format name)
val changelogSystem = getOption(CHANGELOG_SYSTEM)
val systemStreamRes = if ( systemStream.isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be you can simplify this if-else statement as :

if (systemStream.isDefined && !systemStream.getOrElse("").contains(".")) {
   if (changelogSystem.isDefined) {
     Some(changelogSystem.get + "." + systemStream.get)
   } else {
     throw new SamzaException(...)
    }
} else {
  ...
}

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 comment. Otherwise, looks good!

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. +1

@asfgit asfgit closed this in 97edf10 Dec 15, 2016
fredji97 pushed a commit to fredji97/samza that referenced this pull request May 11, 2018
As per subject, it's a demo of stream-to-table join using RocksDb.

Author: Wei Song <wsong@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes apache#31 from weisong44/latest
khandelwal-ayush pushed a commit to khandelwal-ayush/samza that referenced this pull request Jun 12, 2024
* SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (apache#1692)

* SAMZA-2798: Populate worker.opts in environment variable only if available (apache#1693)

Description
Populate worker.opts in the environment variable only if available in the configs.

Changes
Check if worker.opts is present and then add it to environment variable

Tests
Updated unit tests

* Add MAX_BACKGROUND_JOBS config for RocksDB (apache#1694)

* SAMZA-2784: Remove excessive commit logs (apache#1695)

* SAMZA-2799: Remove worker.opts handling in shell command builder (apache#1696)

---------

Co-authored-by: ajo thomas <ajo.thomas24@gmail.com>
Co-authored-by: Bharath Kumarasubramanian <bharathkk@apache.org>
Co-authored-by: Shekhar Sharma <72765053+shekhars-li@users.noreply.github.com>
Co-authored-by: Daniel Chen <xrchen@uwaterloo.ca>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants