Skip to content

Commit

Permalink
Added LS configuration variable 'pipeline.separate_logs' to separate …
Browse files Browse the repository at this point in the history
…logs per pipelines - use log4j RoutingAppender - avoid output to main log files when log per pipeline is enabled - closes 10427

Fixes #11108
  • Loading branch information
andsel committed Oct 8, 2019
1 parent e86cd4e commit e58a6e0
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 5 deletions.
3 changes: 3 additions & 0 deletions config/jvm.options
Expand Up @@ -79,3 +79,6 @@

# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true

# Avoid Nashorn deprecation logs in JDK > 11
-Dnashorn.args=--no-deprecation-warning
34 changes: 34 additions & 0 deletions config/log4j2.properties
Expand Up @@ -26,6 +26,11 @@ appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 30
appender.rolling.avoid_pipelined_filter.type = ScriptFilter
appender.rolling.avoid_pipelined_filter.script.type = Script
appender.rolling.avoid_pipelined_filter.script.name = filter_no_pipelined
appender.rolling.avoid_pipelined_filter.script.language = JavaScript
appender.rolling.avoid_pipelined_filter.script.value = ${sys:ls.pipeline.separate_logs} == false || !(logEvent.getContextData().containsKey("pipeline.id"))

appender.json_rolling.type = RollingFile
appender.json_rolling.name = json_rolling
Expand All @@ -42,10 +47,39 @@ appender.json_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.json_rolling.policies.size.size = 100MB
appender.json_rolling.strategy.type = DefaultRolloverStrategy
appender.json_rolling.strategy.max = 30
appender.json_rolling.avoid_pipelined_filter.type = ScriptFilter
appender.json_rolling.avoid_pipelined_filter.script.type = Script
appender.json_rolling.avoid_pipelined_filter.script.name = filter_no_pipelined
appender.json_rolling.avoid_pipelined_filter.script.language = JavaScript
appender.json_rolling.avoid_pipelined_filter.script.value = ${sys:ls.pipeline.separate_logs} == false || !(logEvent.getContextData().containsKey("pipeline.id"))

appender.routing.type = Routing
appender.routing.name = pipeline_routing_appender
appender.routing.routes.type = Routes
appender.routing.routes.script.type = Script
appender.routing.routes.script.name = routing_script
appender.routing.routes.script.language = JavaScript
appender.routing.routes.script.value = logEvent.getContextData().containsKey("pipeline.id") ? logEvent.getContextData().getValue("pipeline.id") : "sink";
appender.routing.routes.route_pipelines.type = Route
appender.routing.routes.route_pipelines.rolling.type = RollingFile
appender.routing.routes.route_pipelines.rolling.name = appender-${ctx:pipeline.id}
appender.routing.routes.route_pipelines.rolling.fileName = ${sys:ls.logs}/pipeline_${ctx:pipeline.id}.log
appender.routing.routes.route_pipelines.rolling.filePattern = ${sys:ls.logs}/pipeline_${ctx:pipeline.id}.%i.log.gz
appender.routing.routes.route_pipelines.rolling.layout.type = PatternLayout
appender.routing.routes.route_pipelines.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n
appender.routing.routes.route_pipelines.rolling.policy.type = SizeBasedTriggeringPolicy
appender.routing.routes.route_pipelines.rolling.policy.size = 100MB
appender.routing.routes.route_pipelines.strategy.type = DefaultRolloverStrategy
appender.routing.routes.route_pipelines.strategy.max = 30
appender.routing.routes.route_sink.type = Route
appender.routing.routes.route_sink.key = sink
appender.routing.routes.route_sink.null.type = Null
appender.routing.routes.route_sink.null.name = drop-appender

rootLogger.level = ${sys:ls.log.level}
rootLogger.appenderRef.console.ref = ${sys:ls.log.format}_console
rootLogger.appenderRef.rolling.ref = ${sys:ls.log.format}_rolling
rootLogger.appenderRef.routing.ref = pipeline_routing_appender

# Slowlog

Expand Down
4 changes: 4 additions & 0 deletions config/logstash.yml
Expand Up @@ -212,6 +212,10 @@
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
Expand Down
13 changes: 8 additions & 5 deletions logstash-core/benchmarks/build.gradle
Expand Up @@ -63,13 +63,16 @@ task jmh(type: JavaExec, dependsOn: [':logstash-core-benchmarks:clean', ':logsta

main = "-jar"

def include = project.properties.get('include', '')

doFirst {
args = [
"-Djava.io.tmpdir=${buildDir.absolutePath}",
"-XX:+UseParNewGC", "-XX:+UseConcMarkSweepGC", "-XX:CMSInitiatingOccupancyFraction=75",
"-XX:+UseCMSInitiatingOccupancyOnly", "-XX:+DisableExplicitGC",
"-XX:+HeapDumpOnOutOfMemoryError", "-Xms2g", "-Xmx2g",
shadowJar.archivePath,
"-Djava.io.tmpdir=${buildDir.absolutePath}",
"-XX:+UseParNewGC", "-XX:+UseConcMarkSweepGC", "-XX:CMSInitiatingOccupancyFraction=75",
"-XX:+UseCMSInitiatingOccupancyOnly", "-XX:+DisableExplicitGC",
"-XX:+HeapDumpOnOutOfMemoryError", "-Xms2g", "-Xmx2g",
shadowJar.archivePath,
include
]
}
}
@@ -0,0 +1,60 @@
package org.logstash.benchmark;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
import org.openjdk.jmh.annotations.*;

import java.util.concurrent.TimeUnit;

@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS)
@Fork(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class LogPerPipelineBenchmark {

private static final int EVENTS_PER_INVOCATION = 10_000_000;

@Setup
public void setUp() {
System.setProperty("ls.log.format", "plain");
}

@Benchmark
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
public final void logWithScriptingCodeToExecuteAndOneLogPerPipelineEnabled() {
System.setProperty("log4j.configurationFile", "log4j2-with-script.properties");
System.setProperty("ls.pipeline.separate_logs", "true");
logManyLines();
}

@Benchmark
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
public final void logWithScriptingCodeToExecuteAndOneLogPerPipelineDisabled() {
System.setProperty("log4j.configurationFile", "log4j2-with-script.properties");
System.setProperty("ls.pipeline.separate_logs", "false");
logManyLines();
}

@Benchmark
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
public final void logWithoutScriptingCodeToExecute() {
System.setProperty("log4j.configurationFile", "log4j2-without-script.properties");

logManyLines();
}

private void logManyLines() {
LoggerContext context = LoggerContext.getContext(false);
context.reconfigure();
ThreadContext.put("pipeline.id", "pipeline_1");
Logger logger = LogManager.getLogger(LogPerPipelineBenchmark.class);

for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
logger.info("log for pipeline 1");
}
}
}
@@ -0,0 +1,49 @@
status = error
name = LogstashPropertiesConfig

appender.rolling.type = RollingFile
appender.rolling.name = plain_rolling
appender.rolling.fileName = ${sys:ls.logs}/logstash-${sys:ls.log.format}.log
appender.rolling.filePattern = ${sys:ls.logs}/logstash-${sys:ls.log.format}-%d{yyyy-MM-dd}-%i.log.gz
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %-.10000m%n
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 30
appender.rolling.avoid_pipelined_filter.type = ScriptFilter
appender.rolling.avoid_pipelined_filter.script.type = Script
appender.rolling.avoid_pipelined_filter.script.name = filter_no_pipelined
appender.rolling.avoid_pipelined_filter.script.language = JavaScript
appender.rolling.avoid_pipelined_filter.script.value = ${sys:ls.pipeline.separate_logs} == false || !(logEvent.getContextData().containsKey("pipeline.id"))

appender.routing.type = Routing
appender.routing.name = pipeline_routing_appender
appender.routing.routes.type = Routes
appender.routing.routes.script.type = Script
appender.routing.routes.script.name = routing_script
appender.routing.routes.script.language = JavaScript
appender.routing.routes.script.value = logEvent.getContextData().containsKey("pipeline.id") ? logEvent.getContextData().getValue("pipeline.id") : "sink";
appender.routing.routes.route_pipelines.type = Route
appender.routing.routes.route_pipelines.rolling.type = RollingFile
appender.routing.routes.route_pipelines.rolling.name = appender-${ctx:pipeline.id}
appender.routing.routes.route_pipelines.rolling.fileName = ${sys:ls.logs}/pipeline_${ctx:pipeline.id}.log
appender.routing.routes.route_pipelines.rolling.filePattern = ${sys:ls.logs}/pipeline_${ctx:pipeline.id}.%i.log.gz
appender.routing.routes.route_pipelines.rolling.layout.type = PatternLayout
appender.routing.routes.route_pipelines.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n
appender.routing.routes.route_pipelines.rolling.policy.type = SizeBasedTriggeringPolicy
appender.routing.routes.route_pipelines.rolling.policy.size = 100MB
appender.routing.routes.route_pipelines.strategy.type = DefaultRolloverStrategy
appender.routing.routes.route_pipelines.strategy.max = 30
appender.routing.routes.route_sink.type = Route
appender.routing.routes.route_sink.key = sink
appender.routing.routes.route_sink.null.type = Null
appender.routing.routes.route_sink.null.name = drop-appender

rootLogger.level = INFO
rootLogger.appenderRef.rolling.ref = ${sys:ls.log.format}_rolling
rootLogger.appenderRef.routing.ref = pipeline_routing_appender
@@ -0,0 +1,21 @@
status = error
name = LogstashPropertiesConfig

appender.rolling.type = RollingFile
appender.rolling.name = plain_rolling
appender.rolling.fileName = ${sys:ls.logs}/logstash-${sys:ls.log.format}.log
appender.rolling.filePattern = ${sys:ls.logs}/logstash-${sys:ls.log.format}-%d{yyyy-MM-dd}-%i.log.gz
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %-.10000m%n
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 30

rootLogger.level = INFO
rootLogger.appenderRef.rolling.ref = ${sys:ls.log.format}_rolling
rootLogger.appenderRef.routing.ref = pipeline_routing_appender
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Expand Up @@ -44,6 +44,7 @@ module Environment
Setting::Boolean.new("pipeline.java_execution", true),
Setting::Boolean.new("pipeline.reloadable", true),
Setting::Boolean.new("pipeline.plugin_classloaders", false),
Setting::Boolean.new("pipeline.separate_logs", false),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),
Setting::Boolean.new("config.debug", false),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/runner.rb
Expand Up @@ -254,6 +254,7 @@ def execute
java.lang.System.setProperty("ls.logs", setting("path.logs"))
java.lang.System.setProperty("ls.log.format", setting("log.format"))
java.lang.System.setProperty("ls.log.level", setting("log.level"))
java.lang.System.setProperty("ls.pipeline.separate_logs", setting("pipeline.separate_logs").to_s)
unless java.lang.System.getProperty("log4j.configurationFile")
log4j_config_location = ::File.join(setting("path.settings"), "log4j2.properties")

Expand Down
@@ -0,0 +1,50 @@
package org.logstash.log;

import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.ConfigurationException;
import org.apache.logging.log4j.core.config.ConfigurationFactory;
import org.apache.logging.log4j.core.config.ConfigurationSource;
import org.apache.logging.log4j.core.config.Order;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
import org.apache.logging.log4j.core.config.properties.PropertiesConfigurationBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

@Plugin(name = "LogstashConfigurationFactory", category = ConfigurationFactory.CATEGORY)
@Order(9)
public class LogstashConfigurationFactory extends ConfigurationFactory {

static final String PIPELINE_ROUTING_APPENDER_NAME = "pipeline_routing_appender";
public static final String PIPELINE_SEPARATE_LOGS = "ls.pipeline.separate_logs";

@Override
protected String[] getSupportedTypes() {
return new String[] {".properties"};
}

@Override
public PropertiesConfiguration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
final Properties properties = new Properties();
try (final InputStream configStream = source.getInputStream()) {
properties.load(configStream);
} catch (final IOException ioe) {
throw new ConfigurationException("Unable to load " + source.toString(), ioe);
}
PropertiesConfiguration propertiesConfiguration = new PropertiesConfigurationBuilder()
.setConfigurationSource(source)
.setRootProperties(properties)
.setLoggerContext(loggerContext)
.build();

if (System.getProperty(PIPELINE_SEPARATE_LOGS, "false").equals("false")) {
// force init to avoid overwrite of appenders section
propertiesConfiguration.initialize();
propertiesConfiguration.removeAppender(PIPELINE_ROUTING_APPENDER_NAME);
}

return propertiesConfiguration;
}
}

0 comments on commit e58a6e0

Please sign in to comment.