Skip to content
Permalink
Browse files
[FLINK-26043][runtime][security] Add periodic kerberos relogin to Ker…
…berosDelegationTokenManager

[FLINK-27605][tests] Updated Mockito version to 3.4.6 in order to use static method mocking
  • Loading branch information
gaborgsomogyi authored and dmvk committed May 23, 2022
1 parent 18a967f commit 220ef999d2a353fd52cc0aa1a93c26d9b696c1ce
Showing 15 changed files with 508 additions and 148 deletions.
@@ -38,5 +38,11 @@
<td>Boolean</td>
<td>Indicates whether to read from your Kerberos ticket cache.</td>
</tr>
<tr>
<td><h5>security.kerberos.relogin.period</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
</tr>
</tbody>
</table>
@@ -50,6 +50,12 @@
<td>Boolean</td>
<td>Indicates whether to read from your Kerberos ticket cache.</td>
</tr>
<tr>
<td><h5>security.kerberos.relogin.period</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
</tr>
<tr>
<td><h5>security.module.factory.classes</h5></td>
<td style="word-wrap: break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
@@ -68,6 +68,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -97,13 +98,14 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/** Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. */
@RunWith(PowerMockRunner.class)
@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
@PrepareForTest(FlinkKinesisConsumer.class)
public class FlinkKinesisConsumerTest extends TestLogger {

// ----------------------------------------------------------------------
@@ -324,13 +326,12 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();

// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) {
TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
}

@Test
@@ -374,28 +375,28 @@ public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);

// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) {

// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
}
}
}

@@ -451,40 +452,40 @@ public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exceptio
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);

// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredStateForOthers.entrySet()) {
// should never get restored state not belonging to itself
Mockito.verify(mockedFetcher, never())
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
}
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredState.entrySet()) {
// should get restored state belonging to itself
Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) {

// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredStateForOthers.entrySet()) {
// should never get restored state not belonging to itself
Mockito.verify(mockedFetcher, never())
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
}
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredState.entrySet()) {
// should get restored state belonging to itself
Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
}
}
}

@@ -564,33 +565,35 @@ public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShar
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) {

// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

fakeRestoredState.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
fakeRestoredState.put(
new StreamShardHandle(
"fakeStream2",
new Shard()
.withShardId(
KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
restoredShard.getKey()),
restoredShard.getKey(),
restoredShard.getValue()));
}
}
}

@@ -709,26 +712,26 @@ public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) {

// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------

TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
closedStreamShardHandle),
closedStreamShardHandle,
fakeRestoredState.get(closedStreamShardHandle)));
TestableFlinkKinesisConsumer consumer =
new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

Mockito.verify(mockedFetcher)
.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(
closedStreamShardHandle),
closedStreamShardHandle,
fakeRestoredState.get(closedStreamShardHandle)));
}
}

private static final class TestingListState<T> implements ListState<T> {
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;

import java.time.Duration;
import java.util.List;

import static org.apache.flink.configuration.ConfigOptions.key;
@@ -123,6 +124,14 @@ public class SecurityOptions {
+ "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, "
+ "to handle delegation tokens.");

@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
public static final ConfigOption<Duration> KERBEROS_RELOGIN_PERIOD =
key("security.kerberos.relogin.period")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription(
"The time period when keytab login happens automatically in order to always have a valid TGT.");

// ------------------------------------------------------------------------
// ZooKeeper Security Options
// ------------------------------------------------------------------------
@@ -32,7 +32,6 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
@@ -48,7 +47,6 @@
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.hadoop.HadoopDependency;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -68,8 +66,7 @@
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
@@ -392,11 +389,11 @@ protected void initializeServices(Configuration configuration, PluginManager plu
configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
heartbeatServices = createHeartbeatServices(configuration);
delegationTokenManager =
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
&& HadoopDependency.isHadoopCommonOnClasspath(
getClass().getClassLoader())
? new KerberosDelegationTokenManager(configuration)
: new NoOpDelegationTokenManager();
KerberosDelegationTokenManagerFactory.create(
getClass().getClassLoader(),
configuration,
commonRpcService.getScheduledExecutor(),
ioExecutor);
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

final RpcService metricQueryServiceRpcService =

0 comments on commit 220ef99

Please sign in to comment.