Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spurious update fix #247

Merged
merged 9 commits into from
Oct 25, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -386,7 +387,19 @@ public boolean renewLease(T lease)
+ " because the lease counter was not " + lease.getLeaseCounter());
}

return false;
// If we had a spurious retry during the Dynamo update, then this conditional PUT failure
// might be incorrect. So, we get the item straight away and check if the lease owner + lease counter
// are what we expected.
String expectedOwner = lease.getLeaseOwner();
Long expectedCounter = lease.getLeaseCounter() + 1;
T updatedLease = getLease(lease.getLeaseKey());
if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) ||
!expectedCounter.equals(updatedLease.getLeaseCounter())) {
return false;
}

LOG.info("Detected spurious renewal failure for lease with key " + lease.getLeaseKey()
+ ", but recovered");
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;

import java.util.logging.Logger;

import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Ignore;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;

Expand Down Expand Up @@ -108,7 +108,8 @@ public void testHoldUpdatedLease() throws LeasingException {

KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey());

leaseManager.renewLease(lease);
// lose lease
leaseManager.takeLease(lease, "bar");

Assert.assertFalse(leaseManager.renewLease(leaseCopy));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executors;

import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executors;

public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {

Expand Down Expand Up @@ -58,7 +57,9 @@ public void testLeaseLoss() throws LeasingException {
builder.addLeasesToRenew(renewer, "1", "2");
KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");

leaseManager.updateLease(renewedLease);
// lose lease 2
leaseManager.takeLease(renewedLease, "bar");

builder.renewMutateAssert(renewer, "1");
}

Expand Down Expand Up @@ -96,17 +97,19 @@ public void testGetCurrentlyHeldLease() throws LeasingException {
public void testGetCurrentlyHeldLeases() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);

KinesisClientLease lease2 = builder.withLease("1", "foo").withLease("2", "foo").build().get("2");
builder.withLease("1", "foo").withLease("2", "foo").build();
builder.addLeasesToRenew(renewer, "1", "2");
builder.renewMutateAssert(renewer, "1", "2");
KinesisClientLease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2");

// This should be a copy that doesn't get updated
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();
Assert.assertEquals(2, heldLeases.size());
Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter());
Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter());

leaseManager.updateLease(lease2); // lose lease 2
// lose lease 2
leaseManager.takeLease(lease2, "bar");

// Do another renewal and make sure the copy doesn't change
builder.renewMutateAssert(renewer, "1");

Expand Down Expand Up @@ -176,7 +179,7 @@ public void testUpdateOldLease() throws LeasingException {
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");

// cause lease loss such that the renewer knows the lease has been lost when update is called
leaseManager.renewLease(lease);
leaseManager.takeLease(lease, "bar");
builder.renewMutateAssert(renewer);

lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint"));
Expand All @@ -195,7 +198,7 @@ public void testUpdateRegainedLease() throws LeasingException {
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");

// cause lease loss such that the renewer knows the lease has been lost when update is called
leaseManager.renewLease(lease);
leaseManager.takeLease(lease, "bar");
builder.renewMutateAssert(renewer);

// regain the lease
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;

Expand All @@ -35,6 +35,7 @@ public class TestHarnessBuilder {

private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>();
private KinesisClientLeaseManager leaseManager;
private Map<String, KinesisClientLease> originalLeases = new HashMap<>();

private Callable<Long> timeProvider = new Callable<Long>() {

Expand All @@ -54,6 +55,15 @@ public TestHarnessBuilder withLease(String shardId) {
}

public TestHarnessBuilder withLease(String shardId, String owner) {
KinesisClientLease lease = createLease(shardId, owner);
KinesisClientLease originalLease = createLease(shardId, owner);

leases.put(shardId, lease);
originalLeases.put(shardId, originalLease);
return this;
}

private KinesisClientLease createLease(String shardId, String owner) {
KinesisClientLease lease = new KinesisClientLease();
lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint"));
lease.setOwnerSwitchesSinceCheckpoint(0L);
Expand All @@ -62,8 +72,7 @@ public TestHarnessBuilder withLease(String shardId, String owner) {
lease.setParentShardIds(Collections.singleton("parentShardId"));
lease.setLeaseKey(shardId);

leases.put(shardId, lease);
return this;
return lease;
}

public Map<String, KinesisClientLease> build() throws LeasingException {
Expand Down Expand Up @@ -147,7 +156,7 @@ public Map<String, KinesisClientLease> renewMutateAssert(ILeaseRenewer<KinesisCl
Assert.assertEquals(renewedShardIds.length, heldLeases.size());

for (String shardId : renewedShardIds) {
KinesisClientLease original = leases.get(shardId);
KinesisClientLease original = originalLeases.get(shardId);
Assert.assertNotNull(original);

KinesisClientLease actual = heldLeases.get(shardId);
Expand Down