diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index 75cd67dc17763..f85194f201d36 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -169,6 +169,13 @@ under the License. test + + org.testcontainers + testcontainers + 1.15.3 + test + + org.apache.flink @@ -336,6 +343,18 @@ under the License. + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.1 + + + true + true + + + diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 7f30dcacd44c2..b0a729fb11eb5 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -401,7 +401,6 @@ public void cancel() { running = false; KinesisDataFetcher fetcher = this.fetcher; - this.fetcher = null; // this method might be called before the subtask actually starts running, // so we must check if the fetcher is actually created @@ -409,7 +408,6 @@ public void cancel() { try { // interrupt the fetcher of any work fetcher.shutdownFetcher(); - fetcher.awaitTermination(); } catch (Exception e) { LOG.warn("Error while closing Kinesis data fetcher", e); } @@ -419,6 +417,9 @@ public void cancel() { @Override public void close() throws Exception { cancel(); + // safe-guard when the fetcher has been interrupted, make sure to not leak resources + fetcher.awaitTermination(); + this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java new file mode 100644 index 0000000000000..0b224c35c4650 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.testcontainers.utility.DockerImageName; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertThat; + +/** IT cases for using Kinesis consumer/producer based on Kinesalite. */ +public class FlinkKinesisITCase { + public static final String TEST_STREAM = "test_stream"; + + @ClassRule + public static MiniClusterWithClientResource miniCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder().build()); + + @ClassRule + public static KinesaliteContainer kinesalite = + new KinesaliteContainer( + DockerImageName.parse("instructure/kinesalite").withTag("latest")); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema(); + + private KinesisPubsubClient client; + + @Before + public void setupClient() { + client = new KinesisPubsubClient(kinesalite.getContainerProperties()); + } + + /** + * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170). + * + *
    + *
  1. The test setups up a stream with 100 records and creates a Flink job that reads them + * with very slowly (using up a large chunk of time of the mailbox). + *
  2. After ensuring that consumption has started, the job is stopped in a parallel thread. + *
  3. Without the fix of FLINK-17170, the job now has a high chance to deadlock during + * cancel. + *
  4. With the fix, the job proceeds and we can lift the backpressure. + *
+ */ + @Test + public void testStopWithSavepoint() throws Exception { + client.createTopic(TEST_STREAM, 1, new Properties()); + + // add elements to the test stream + int numElements = 10; + List elements = + IntStream.range(0, numElements) + .mapToObj(String::valueOf) + .collect(Collectors.toList()); + for (String element : elements) { + client.sendMessage(TEST_STREAM, element); + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + Properties config = kinesalite.getContainerProperties(); + config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name()); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config); + + // call stop with savepoint in another thread + ForkJoinTask stopTask = + ForkJoinPool.commonPool() + .submit( + () -> { + WaitingMapper.firstElement.await(); + stopWithSavepoint(); + WaitingMapper.stopped = true; + return null; + }); + + try { + List result = + env.addSource(consumer).map(new WaitingMapper()).executeAndCollect(10000); + // stop with savepoint will most likely only return a small subset of the elements + // validate that the prefix is as expected + assertThat(result, hasSize(lessThan(numElements))); + assertThat(result, equalTo(elements.subList(0, result.size()))); + } finally { + stopTask.cancel(true); + } + } + + private String stopWithSavepoint() throws Exception { + JobStatusMessage job = + miniCluster.getClusterClient().listJobs().get().stream().findFirst().get(); + return miniCluster + .getClusterClient() + .stopWithSavepoint(job.getJobId(), true, temp.getRoot().getAbsolutePath()) + .get(); + } + + private static class WaitingMapper implements MapFunction { + static CountDownLatch firstElement = new CountDownLatch(1); + static volatile boolean stopped = false; + + @Override + public String map(String value) throws Exception { + if (firstElement.getCount() > 0) { + firstElement.countDown(); + } + if (!stopped) { + Thread.sleep(100); + } + return value; + } + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java new file mode 100644 index 0000000000000..800ab96f3ad03 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.testutils; + +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.model.ListStreamsResult; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static com.amazonaws.SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR; +import static com.amazonaws.SDKGlobalConfiguration.SECRET_KEY_ENV_VAR; + +/** + * A testcontainer based on Kinesalite. + * + *

Note that the more obvious localstack container with Kinesis took 1 minute to start vs 10 + * seconds of Kinesalite. + */ +public class KinesaliteContainer extends GenericContainer { + private static final String ACCESS_KEY = "access key"; + private static final String SECRET_KEY = "secret key"; + + public KinesaliteContainer(DockerImageName imageName) { + super(imageName); + + withEnv(ACCESS_KEY_ENV_VAR, ACCESS_KEY); + withEnv(SECRET_KEY_ENV_VAR, ACCESS_KEY); + withExposedPorts(4567); + waitingFor(new ListStreamsWaitStrategy()); + withCreateContainerCmdModifier( + cmd -> + cmd.withEntrypoint( + "/tini", + "--", + "/usr/src/app/node_modules/kinesalite/cli.js", + "--path", + "/var/lib/kinesalite", + "--ssl")); + } + + /** Returns the endpoint url to access the container from outside the docker network. */ + public String getContainerEndpointUrl() { + return String.format("https://%s:%s", getContainerIpAddress(), getMappedPort(4567)); + } + + /** Returns the endpoint url to access the host from inside the docker network. */ + public String getHostEndpointUrl() { + return String.format("https://%s:%s", getHost(), getMappedPort(4567)); + } + + public String getAccessKey() { + return ACCESS_KEY; + } + + public String getSecretKey() { + return SECRET_KEY; + } + + /** Returns the properties to access the container from outside the docker network. */ + public Properties getContainerProperties() { + return getProperties(getContainerEndpointUrl()); + } + + /** Returns the properties to access the host from inside the docker network. */ + public Properties getHostProperties() { + return getProperties(getHostEndpointUrl()); + } + + /** Returns the client to access the container from outside the docker network. */ + public AmazonKinesis getContainerClient() { + return getClient(getContainerEndpointUrl()); + } + + /** Returns the client to access the host from inside the docker network. */ + public AmazonKinesis getHostClient() { + return getClient(getHostEndpointUrl()); + } + + private AmazonKinesis getClient(String endPoint) { + return AmazonKinesisClientBuilder.standard() + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(getAccessKey(), getSecretKey()))) + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(endPoint, "us-east-1")) + .build(); + } + + private Properties getProperties(String endpointUrl) { + Properties config = new Properties(); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ENDPOINT, endpointUrl); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, getAccessKey()); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, getSecretKey()); + return config; + } + + private class ListStreamsWaitStrategy extends AbstractWaitStrategy { + @Override + protected void waitUntilReady() { + Unreliables.retryUntilSuccess( + (int) this.startupTimeout.getSeconds(), + TimeUnit.SECONDS, + () -> this.getRateLimiter().getWhenReady(() -> tryList())); + } + + private ListStreamsResult tryList() { + return getContainerClient().listStreams(); + } + } +} diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml index 37f88c6411103..45bd306c4bee3 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml @@ -125,6 +125,22 @@ under the License. + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.1 + + + + + true + true + true + true + + + diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java index bed4cf446b333..5256a50e59275 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.kinesis.test; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient; -import org.apache.flink.streaming.kinesis.test.containers.KinesaliteContainer; import org.apache.flink.streaming.kinesis.test.model.Order; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.categories.TravisGroup1; @@ -34,11 +34,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.Timeout; import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; import java.nio.file.Files; import java.nio.file.Path; @@ -49,62 +49,52 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; -import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; -import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** End-to-end test for Kinesis Table API using Kinesalite. */ @Category(value = {TravisGroup1.class}) public class KinesisTableApiITCase extends TestLogger { private static final String ORDERS_STREAM = "orders"; private static final String LARGE_ORDERS_STREAM = "large_orders"; + private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar"); - private final Network network = Network.newNetwork(); + private static final Network network = Network.newNetwork(); @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); - @Rule - public final KinesaliteContainer kinesalite = new KinesaliteContainer().withNetwork(network); + @ClassRule + public static final KinesaliteContainer KINESALITE = + new KinesaliteContainer( + DockerImageName.parse("instructure/kinesalite").withTag("latest")) + .withNetwork(network) + .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS); private KinesisPubsubClient kinesisClient; - @Rule - public final FlinkContainer flink = + @ClassRule + public static final FlinkContainer FLINK = FlinkContainer.builder() .build() - .withEnv("AWS_ACCESS_KEY_ID", "fakeid") - .withEnv("AWS_SECRET_KEY", "fakekey") + .withEnv("AWS_ACCESS_KEY_ID", KINESALITE.getAccessKey()) + .withEnv("AWS_SECRET_KEY", KINESALITE.getSecretKey()) .withEnv("AWS_CBOR_DISABLE", "1") .withEnv( "FLINK_ENV_JAVA_OPTS", "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking") .withNetwork(network) - .dependsOn(kinesalite); + .dependsOn(KINESALITE); @Before public void setUp() throws Exception { - // Required for Kinesalite. - // Including shaded and non-shaded conf to support test running from Maven and IntelliJ - System.setProperty("com.amazonaws.sdk.disableCertChecking", "1"); - System.setProperty("com.amazonaws.sdk.disableCbor", "1"); - System.setProperty( - "org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking", "1"); - System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "1"); - - Properties properties = new Properties(); - properties.setProperty(AWS_ENDPOINT, kinesalite.getEndpointUrl()); - properties.setProperty(AWS_ACCESS_KEY_ID, "ak"); - properties.setProperty(AWS_SECRET_ACCESS_KEY, "sk"); + Properties properties = KINESALITE.getContainerProperties(); kinesisClient = new KinesisPubsubClient(properties); kinesisClient.createTopic(ORDERS_STREAM, 1, properties); kinesisClient.createTopic(LARGE_ORDERS_STREAM, 1, properties); } - @Test(timeout = 120_000) + @Test public void testTableApiSourceAndSink() throws Exception { List smallOrders = ImmutableList.of(new Order("A", 5), new Order("B", 10)); @@ -118,8 +108,7 @@ public void testTableApiSourceAndSink() throws Exception { executeSqlStatements(readSqlFile("filter-large-orders.sql")); List result = readAllOrdersFromKinesis(kinesisClient); - assertEquals(expected.size(), result.size()); - assertTrue(result.containsAll(expected)); + assertEquals(expected, result); } private List readAllOrdersFromKinesis(final KinesisPubsubClient client) @@ -138,11 +127,11 @@ private List readAllOrdersFromKinesis(final KinesisPubsubClient client) } private List readSqlFile(final String resourceName) throws Exception { - return Files.readAllLines(Paths.get(getClass().getResource(resourceName).toURI())); + return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI())); } private void executeSqlStatements(final List sqlLines) throws Exception { - flink.submitSQLJob( + FLINK.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) .addJars(sqlConnectorKinesisJar) .build()); diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java deleted file mode 100644 index 5162106646aae..0000000000000 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.kinesis.test.containers; - -import org.testcontainers.containers.GenericContainer; - -/** A test Kinesis Data Streams container using Kinesalite. */ -public class KinesaliteContainer extends GenericContainer { - public static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; - - private static final String IMAGE = "instructure/kinesalite:latest"; - private static final int PORT = 4567; - - public KinesaliteContainer() { - super(IMAGE); - } - - @Override - protected void configure() { - withExposedPorts(PORT); - withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS); - withCreateContainerCmdModifier( - cmd -> - cmd.withEntrypoint( - "/tini", - "--", - "/usr/src/app/node_modules/kinesalite/cli.js", - "--path", - "/var/lib/kinesalite", - "--ssl")); - } - - public String getEndpointUrl() { - return "https://" + getHost() + ":" + getMappedPort(PORT); - } -} diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql similarity index 100% rename from flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql rename to flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000000..e463a0e1a088c --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n