Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1165. cleanup old zk versions. #239

Closed
wants to merge 27 commits into from

Conversation

sborya
Copy link
Contributor

@sborya sborya commented Jun 30, 2017

No description provided.

}

public void deleteOldVersionPath(String path, List<String> zNodeIds, int numVersionsToLeave, Comparator<String> c) {
if (StringUtils.isEmpty(path) || zNodeIds == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests for all the delete methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added two tests.

@@ -219,6 +219,8 @@ public void onNewJobModelConfirmed(String version) {
if (coordinatorListener != null) {
coordinatorListener.onNewJobModel(processorId, jobModel);
}

zkUtils.cleanupZK(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions here:
A) Can we please extract this 10 to a static constant in this class (Like NUM_CLEANUP_VERSIONS).

B) Is this getting called from leader alone or in all processors ? Two benefits doing it in leader

  • Will help us avert potential concurrency problems with delete.
  • Will remove additional overhead issuing delete in all stream processors when receiving this event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


public void deleteOldVersionPath(String path, List<String> zNodeIds, int numVersionsToLeave, Comparator<String> c) {
if (StringUtils.isEmpty(path) || zNodeIds == null) {
LOG.warn("cannot cleanup empty path or empty list in ZK");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right behavior would be return or throw up exception(may be you forgot).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. Good catch. Returned.

LOG.info("starting cleaning of barrier versions. size=" + size + "; num to leave=" + numVersionsToLeave);
for (String znodeId: zNodeIds) {
i++;
if (size - i < numVersionsToLeave) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor if it makes sense please change it:
Just wondering if we could simplify this check.

We could select numVersions from beginning of zNodeIds with subList or subArray (since zNodeIds.size() > numVersion) and delete this check completely. If this suggestion makes sense and acceptable, please change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

LOG.info(path + "/" + znodeId);
zkClient.deleteRecursive(path + "/" + znodeId);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor if makes sense please change it:
It would be great to Log a debug statement here in else saying we're not cleaning up since (zNodeIds.size() <= numVersion)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be too much. I added a logging of what we are deleting and how many we leave in place.

@@ -214,4 +212,7 @@ String getBarrierStatePath(String version) {
}
}

static public int getVersion(String barrierPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion:

s/static public/public static

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private void deleteOldBarrierVersions(int numVersionsToLeave) {
// read current list of barriers
String path = keyBuilder.getJobModelVersionBarrierPrefix();
LOG.info("jm path=" + path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/jm path/barrier path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// jm version name format is <num>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jobmodel name format comment is in barrier method and Barrier name format comment is in jobmodel method. Please fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if (size - i < numVersionsToLeave) {
break;
}
LOG.info(path + "/" + znodeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change description to deleting zk path since it's old or something like that.

Just logging path without context of taken action might not be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

}
LOG.info(path + "/" + znodeId);
zkClient.deleteRecursive(path + "/" + znodeId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor if it makes sense please change it:
It would help to record metrics here when we delete zookeeper nodes. Will aid debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need metrics for cleanup. I think logging should be sufficient. Will add log.

Copy link
Contributor

@shanthoosh shanthoosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, have few minor suggestions. Please land after fixing them.

Thanks.

@@ -237,6 +240,110 @@ public void testPublishNewJobModel() {
Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
}

@Test
public void testClenaupZkJobModels() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/testClenaupZkJobModels/testCleanUpZkJobModels

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -60,6 +60,7 @@
// TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
// with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
private static final int METADATA_CACHE_TTL_MS = 5000;
public static final int NUM_VERSIONS_TO_LEAVE = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please make this private(No usages outside).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

String path = keyBuilder.getJobModelVersionBarrierPrefix();
LOG.info("about to delete old barrier paths from " + path);
List<String> znodeIds = zkClient.getChildren(path);
LOG.info("all ids are ids = " + znodeIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be better to have clear log message("list of zNodeIds " + zNodeIds).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// get the znodes to delete
int size = zNodeIds.size();
List<String> zNodesToDelete = zNodeIds.subList(0, numVersionsToLeave);
LOG.info("starting cleaning of barrier versions. from size=" + size + "to size " + zNodesToDelete.size() + "; num to leave=" + numVersionsToLeave);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/starting cleaning of barrier versions/starting cleanUp of zKNodes.
This method is shared between deleteJobModel, deleteBarrier (Specific log message to barrier is not valid).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of nits. Otherwise, looks good! +1

LOG.info("Starting cleanup of barrier version zkNodes. From size=" + size + " to size " + zNodesToDelete.size() + "; num to leave=" + numVersionsToLeave);
for (String znodeId : zNodesToDelete) {
String pathToDelete = path + "/" + znodeId;
LOG.info(pathToDelete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant log line. Line 516 conveys the same info. We can remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Removed.

@@ -48,6 +48,8 @@
// Action name when the Processor membership changes
public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";

public static final String ON_ZK_CLEANUP = "OnCleanUp";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we document about this cleanup policy here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

@asfgit asfgit closed this in 4eb5153 Jul 21, 2017
jiayunwu pushed a commit to jiayunwu/samza-zf that referenced this pull request Jul 24, 2017
Author: Boris Shkolnik <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Navina <navina@apache.org>, Shanthoosh V<svenkata@linkedin.com>

Closes apache#239 from sborya/zkCleanUpBarrier1
@sborya sborya deleted the zkCleanUpBarrier1 branch October 25, 2017 00:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants