Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: Hotfix for trunk failing #3492

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -1812,7 +1813,7 @@ public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() thro

}

@Test
@Ignore
public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
final TaskId taskId = new TaskId(0, 0);

Expand All @@ -1839,7 +1840,6 @@ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() thro
final TaskId taskId = new TaskId(0, 0);

final StreamThread thread = setupTest(taskId);
thread.start();

final StateDirectory testStateDir = new StateDirectory(
applicationId,
Expand All @@ -1856,7 +1856,7 @@ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() thro
}
}

private StreamThread setupTest(final TaskId taskId) {
private StreamThread setupTest(final TaskId taskId) throws InterruptedException {
final TopologyBuilder builder = new TopologyBuilder();
builder.setApplicationId(applicationId);
builder.addSource("source", "topic");
Expand Down Expand Up @@ -1896,12 +1896,21 @@ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPar
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
activeTasks.put(testStreamTask.id, testStreamTask.partitions);
thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));

thread.start();
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return thread.state() == StreamThread.State.RUNNING;
}
}, 10 * 1000, "Thread never started.");
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);

return thread;
}

@Test
@Ignore
public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
final TaskId taskId = new TaskId(0, 0);

Expand All @@ -1922,12 +1931,11 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exce
}
}

@Test
@Ignore
public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
final TaskId taskId = new TaskId(0, 0);

final StreamThread thread = setupStandbyTest(taskId);
thread.start();

final StateDirectory testStateDir = new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG),
Expand All @@ -1943,7 +1951,7 @@ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSus
}
}

private StreamThread setupStandbyTest(final TaskId taskId) {
private StreamThread setupStandbyTest(final TaskId taskId) throws InterruptedException {
final String storeName = "store";
final String changelogTopic = applicationId + "-" + storeName + "-changelog";

Expand Down Expand Up @@ -1997,8 +2005,15 @@ public void commit() {

final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));

thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks));
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
thread.start();
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return thread.state() == StreamThread.State.RUNNING;
}
}, 10 * 1000, "Thread never started.");

return thread;
}
Expand Down