diff --git a/apache_stdout.conf b/apache_stdout.conf new file mode 100644 index 00000000000..8e996824890 --- /dev/null +++ b/apache_stdout.conf @@ -0,0 +1,88 @@ +input { + file { + id => "logfileRead" + start_position => beginning + ignore_older => 0 + path => "/Users/andrewvc/projects/ls_apache_materials/apache_access_logs" + } + + stdin {id => logStdin} +} + +filter { + grok { + id => "apacheCommonLog" + match => { + "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}' + } + } + + geoip { + id => "clientGeo" + source => clientip + target => geoip + } + + useragent { + id => "clientUA" + source => agent + target => useragent + } + + + date { + id => "clientDate" + match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] + locale => en + } + + if [geoip][country_code2] == "US" { + mutate { + id => "addUsRegion" + add_field => { "aws-region" => "us-east-1" } + } + } else if [geoip][country_code2] == "CA" { + if [referrer] =~ /google/ { + sleep { + id => "pointlessSleep" + time => 0.001 + } + } + } else { + mutate { + id => addOtherRegion + add_field => { "aws-region" => "eu-central-1" } + } + } + + + if [request] =~ /(?i)\.(png|jpg|gif)$/ { + grok { + id => grokImage + match => { + request => "%{(?i)\.(png|jpg|gif)$:extension}" + } + add_tag => ["image"] + } + + mutate { + id => addCanadianRegion + add_field => { "aws-region" => "ca-central-1" } + } + } else if [request] =~ /articles/ { + mutate { + id => tagArticle + add_tag => ["article"] + } + } +} + +output { + elasticsearch { + id => "mainEs" + index => "%{@type}-" + } + if [geoip][country_code2] != "US" { + stdout { id => "linuxStdout" codec => json_lines } + } +} diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100644 index f6d5974e72f..00000000000 --- a/gradlew.bat +++ /dev/null @@ -1,90 +0,0 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega diff --git a/logstash-core/lib/logstash-core_jars.rb b/logstash-core/lib/logstash-core_jars.rb index d1c7bc4332d..b759e1d5833 100644 --- a/logstash-core/lib/logstash-core_jars.rb +++ b/logstash-core/lib/logstash-core_jars.rb @@ -1,8 +1,18 @@ # this is a generated file, to avoid over-writing it just delete this comment -require 'jar_dependencies' +begin + require 'jar_dependencies' +rescue LoadError + require 'org/apache/logging/log4j/log4j-core/2.6.2/log4j-core-2.6.2.jar' + require 'org/apache/logging/log4j/log4j-api/2.6.2/log4j-api-2.6.2.jar' + require 'com/fasterxml/jackson/core/jackson-core/2.7.4/jackson-core-2.7.4.jar' + require 'com/fasterxml/jackson/core/jackson-annotations/2.7.0/jackson-annotations-2.7.0.jar' + require 'com/fasterxml/jackson/core/jackson-databind/2.7.4/jackson-databind-2.7.4.jar' +end -require_jar( 'org.apache.logging.log4j', 'log4j-core', '2.6.2' ) -require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.0' ) -require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.7.4' ) -require_jar( 'org.apache.logging.log4j', 'log4j-api', '2.6.2' ) -require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.4' ) +if defined? Jars + require_jar( 'org.apache.logging.log4j', 'log4j-core', '2.6.2' ) + require_jar( 'org.apache.logging.log4j', 'log4j-api', '2.6.2' ) + require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.4' ) + require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.0' ) + require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.7.4' ) +end diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 16ba2e583f7..6e9052a2ab8 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -314,6 +314,7 @@ def start_pipeline(id) if !t.alive? return false elsif pipeline.ready? + dispatcher.fire(:pipeline_started, pipeline) return true else sleep 0.01 @@ -327,6 +328,7 @@ def stop_pipeline(id) @logger.warn("stopping pipeline", :id => id) pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) } @pipelines[id].thread.join + dispatcher.fire(:pipeline_stopped, pipeline) end def start_pipelines diff --git a/logstash-core/lib/logstash/compiler/lscl.rb b/logstash-core/lib/logstash/compiler/lscl.rb index 53948b2f419..25953d603f6 100644 --- a/logstash-core/lib/logstash/compiler/lscl.rb +++ b/logstash-core/lib/logstash/compiler/lscl.rb @@ -478,10 +478,21 @@ class MethodCall < Node; end class RegexpExpression < Node def expr - selector, operator_method, regexp = recursive_select(Selector, LogStash::Compiler::LSCL::AST::RegExpOperator, LogStash::Compiler::LSCL::AST::RegExp).map(&:expr) - - raise "Expected a selector #{text_value}!" unless selector - raise "Expected a regexp #{text_value}!" unless regexp + selector, operator_method, regexp = recursive_select( + Selector, + LogStash::Compiler::LSCL::AST::RegExpOperator, + LogStash::Compiler::LSCL::AST::RegExp, + LogStash::Compiler::LSCL::AST::String # Strings work as rvalues! :p + ).map(&:expr) + + # Handle string rvalues, they just get turned into regexps + # Maybe we really shouldn't handle these anymore... + if regexp.class == org.logstash.config.ir.expression.ValueExpression + regexp = jdsl.eRegex(regexp.get) + end + + raise "Expected a selector in #{text_value}!" unless selector + raise "Expected a regexp in #{text_value}!" unless regexp operator_method.call(source_meta, selector, regexp); end diff --git a/logstash-core/lib/logstash/instrument/metric_store.rb b/logstash-core/lib/logstash/instrument/metric_store.rb index 3967cefbfb7..f072499ea8e 100644 --- a/logstash-core/lib/logstash/instrument/metric_store.rb +++ b/logstash-core/lib/logstash/instrument/metric_store.rb @@ -218,7 +218,7 @@ def get_recursively(key_paths, map, new_hash) key_candidates = extract_filter_keys(key_paths.shift) key_candidates.each do |key_candidate| - raise MetricNotFound, "For path: #{key_candidate}" if map[key_candidate].nil? + raise MetricNotFound, "For path: #{key_candidate}. Map keys: #{map.keys}" if map[key_candidate].nil? if key_paths.empty? # End of the user requested path if map[key_candidate].is_a?(Concurrent::Map) diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 24e28b0d4da..312fbfa1288 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -21,6 +21,7 @@ require "logstash/instrument/collector" require "logstash/output_delegator" require "logstash/filter_delegator" +require 'logstash/compiler' module LogStash; class Pipeline include LogStash::Util::Loggable @@ -41,7 +42,8 @@ module LogStash; class Pipeline :metric, :filter_queue_client, :input_queue_client, - :queue + :queue, + :lir MAX_INFLIGHT_WARN_THRESHOLD = 10_000 @@ -53,13 +55,16 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil) @logger = self.logger @config_str = config_str @config_hash = Digest::SHA1.hexdigest(@config_str) + + @lir = compile_lir + # Every time #plugin is invoked this is incremented to give each plugin # a unique id when auto-generating plugin ids @plugin_counter ||= 0 @settings = settings @pipeline_id = @settings.get_value("pipeline.id") || self.object_id @reporter = PipelineReporter.new(@logger, self) - + # A list of plugins indexed by id @plugins_by_id = {} @inputs = nil @@ -119,6 +124,10 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil) @running = Concurrent::AtomicBoolean.new(false) @flushing = Concurrent::AtomicReference.new(false) end # def initialize + + def compile_lir + LogStash::Compiler.compile_pipeline(self.config_str) + end def build_queue_from_settings queue_type = settings.get("queue.type") diff --git a/logstash-core/spec/logstash/compiler/compiler_spec.rb b/logstash-core/spec/logstash/compiler/compiler_spec.rb index 9f4856110bc..29f54fba101 100644 --- a/logstash-core/spec/logstash/compiler/compiler_spec.rb +++ b/logstash-core/spec/logstash/compiler/compiler_spec.rb @@ -392,6 +392,15 @@ def compose(*statements) it "should compile correctly" do expect(c_expression).to ir_eql(j.eRegexEq(j.eEventValue("[foo]"), j.eRegex('^abc$'))) end + + # Believe it or not, "\.\." is a valid regexp! + describe "when given a quoted regexp" do + let(:expression) { '[foo] =~ "\\.\\."' } + + it "should compile correctly" do + expect(c_expression).to ir_eql(j.eRegexEq(j.eEventValue("[foo]"), j.eRegex('\\.\\.'))) + end + end end describe "'!~'" do diff --git a/logstash-core/src/main/java/org/logstash/common/Util.java b/logstash-core/src/main/java/org/logstash/common/Util.java index e5be66b0cb8..be0b73af01c 100644 --- a/logstash-core/src/main/java/org/logstash/common/Util.java +++ b/logstash-core/src/main/java/org/logstash/common/Util.java @@ -9,16 +9,21 @@ */ public class Util { // Modified from http://stackoverflow.com/a/11009612/11105 - public static String sha256(String base) { + + public static MessageDigest defaultMessageDigest() { try { - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - byte[] hash = digest.digest(base.getBytes(StandardCharsets.UTF_8)); - return bytesToHexString(hash); + return MessageDigest.getInstance("SHA-256"); } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("Your system is (somehow) missing the SHA-256 algorithm!", e); + throw new RuntimeException(e); } } + public static String digest(String base) { + MessageDigest digest = defaultMessageDigest(); + byte[] hash = digest.digest(base.getBytes(StandardCharsets.UTF_8)); + return bytesToHexString(hash); + } + public static String bytesToHexString(byte[] bytes) { StringBuilder hexString = new StringBuilder(); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/IHashable.java b/logstash-core/src/main/java/org/logstash/config/ir/IHashable.java index 38ab60bbb29..99b04f5cc11 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/IHashable.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/IHashable.java @@ -9,6 +9,6 @@ public interface IHashable { String hashSource(); default String uniqueHash() { - return Util.sha256(this.hashSource()); + return Util.digest(this.hashSource()); } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/Pipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/Pipeline.java index 927b63e6b04..66b155abaab 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/Pipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/Pipeline.java @@ -30,6 +30,11 @@ public SpecialVertex getFilterOut() { private final SpecialVertex filterOut; public Pipeline(Graph inputSection, Graph filterSection, Graph outputSection) throws InvalidIRException { + // Validate all incoming graphs, we can't turn an invalid graph into a Pipeline! + inputSection.validate(); + filterSection.validate(); + outputSection.validate(); + Graph tempGraph = inputSection.copy(); // The input section are our roots, so we can import that wholesale // Connect all the input vertices out to the queue diff --git a/logstash-core/src/main/java/org/logstash/config/ir/expression/BinaryBooleanExpression.java b/logstash-core/src/main/java/org/logstash/config/ir/expression/BinaryBooleanExpression.java index d707ca11e0e..2045bd2b06a 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/expression/BinaryBooleanExpression.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/expression/BinaryBooleanExpression.java @@ -48,6 +48,6 @@ public String toRubyString() { @Override public String hashSource() { - return getLeft().hashSource() + this.getClass().getCanonicalName() + getRight().hashSource(); + return this.getClass().getCanonicalName() + "[" + getLeft().hashSource() + "|" + getRight().hashSource() + "]"; } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/expression/Expression.java b/logstash-core/src/main/java/org/logstash/config/ir/expression/Expression.java index 96a5c7509a0..8a6bb653fac 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/expression/Expression.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/expression/Expression.java @@ -7,7 +7,7 @@ import org.logstash.config.ir.SourceComponent; import org.logstash.config.ir.SourceMetadata; -/** +/* * [foo] == "foostr" eAnd [bar] > 10 * eAnd(eEq(eventValueExpr("foo"), value("foostr")), eEq(eEventValue("bar"), value(10))) * diff --git a/logstash-core/src/main/java/org/logstash/config/ir/expression/UnaryBooleanExpression.java b/logstash-core/src/main/java/org/logstash/config/ir/expression/UnaryBooleanExpression.java index 1d5f1a47a0e..51e175caa98 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/expression/UnaryBooleanExpression.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/expression/UnaryBooleanExpression.java @@ -22,6 +22,6 @@ public UnaryBooleanExpression(SourceMetadata meta, @Override public String hashSource() { - return this.getClass().getCanonicalName() + this.expression.hashSource(); + return this.getClass().getCanonicalName() + "[" + this.expression.hashSource() + "]"; } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/BooleanEdge.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/BooleanEdge.java index 814af86e66f..d1cbaa56dce 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/BooleanEdge.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/BooleanEdge.java @@ -1,5 +1,6 @@ package org.logstash.config.ir.graph; +import org.logstash.common.Util; import org.logstash.config.ir.ISourceComponent; import org.logstash.config.ir.InvalidIRException; @@ -50,7 +51,12 @@ public BooleanEdge(Boolean edgeType, Vertex outVertex, Vertex inVertex) throws I @Override public String individualHashSource() { - return this.getClass().getCanonicalName() + "|" + this.getEdgeType(); + return this.getClass().getCanonicalName() + "|" + this.getEdgeType() + "|"; + } + + @Override + public String getId() { + return Util.digest(this.getFrom().getId() + "[" + this.getEdgeType() + "]->" + this.getTo().getId()); } public String toString() { diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/Edge.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/Edge.java index 44422e99ae3..f45b903ce62 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/Edge.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/Edge.java @@ -89,6 +89,9 @@ public boolean sourceComponentEquals(ISourceComponent sourceComponent) { public abstract String individualHashSource(); + + public abstract String getId(); + @Override public SourceMetadata getMeta() { return null; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java index 92bf99de3f4..463bfe2fc3c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java @@ -1,5 +1,6 @@ package org.logstash.config.ir.graph; +import org.logstash.common.Util; import org.logstash.config.ir.IHashable; import org.logstash.config.ir.ISourceComponent; import org.logstash.config.ir.InvalidIRException; @@ -8,6 +9,7 @@ import org.logstash.config.ir.graph.algorithms.GraphDiff; import org.logstash.config.ir.graph.algorithms.TopologicalSort; +import java.security.MessageDigest; import java.util.*; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -17,14 +19,15 @@ * Created by andrewvc on 9/15/16. */ public class Graph implements ISourceComponent, IHashable { - private final Set vertices = new HashSet<>(); + public final Set vertices = new HashSet<>(); private final Set edges = new HashSet<>(); private Map vertexRanks = new HashMap<>(); private final Map> outgoingEdgeLookup = new HashMap<>(); private final Map> incomingEdgeLookup = new HashMap<>(); private List sortedVertices; - + // Builds a graph that has the specified vertices and edges + // Note that this does *not* validate the result public Graph(Collection vertices, Collection edges) throws InvalidIRException { for (Vertex vertex : vertices) { this.addVertex(vertex, false); } for (Edge edge : edges) { this.addEdge(edge, false); } @@ -195,7 +198,10 @@ public Collection threadVerticesById(Edge.EdgeFactory edgeFactory, String. return threadVertices(edgeFactory, argVertices); } - public Collection threadVertices(Edge.EdgeFactory edgeFactory, Vertex... argVertices) throws InvalidIRException { + // Will not validate the graph after running! + // You must invoke validate the graph yourself + // after invoking + public Collection threadVerticesUnsafe(Edge.EdgeFactory edgeFactory, Vertex... argVertices) throws InvalidIRException { List importedVertices = new ArrayList<>(argVertices.length); for (Vertex va : argVertices) { importedVertices.add(this.importVertex(va)); @@ -206,12 +212,12 @@ public Collection threadVertices(Edge.EdgeFactory edgeFactory, Vertex... a Vertex from = importedVertices.get(i); Vertex to = importedVertices.get(i+1); - this.addVertex(from); - this.addVertex(to); + this.addVertex(from, false); + this.addVertex(to, false); Edge edge = edgeFactory.make(from, to); newEdges.add(edge); - this.addEdge(edge); + this.addEdge(edge, false); } refresh(); @@ -219,6 +225,12 @@ public Collection threadVertices(Edge.EdgeFactory edgeFactory, Vertex... a return newEdges; } + public Collection threadVertices(Edge.EdgeFactory edgeFactory, Vertex... argVertices) throws InvalidIRException { + Collection edges = threadVerticesUnsafe(edgeFactory, argVertices); + validate(); + return edges; + } + public Edge threadVertices(Vertex a, Vertex b) throws InvalidIRException { return threadVertices(PlainEdge.factory, a, b).stream().findFirst().get(); } @@ -240,7 +252,6 @@ public Collection threadVertices(boolean bool, Vertex... vertices) throws public void refresh() throws InvalidIRException { this.calculateRanks(); this.calculateTopologicalSort(); - this.validate(); } private void calculateTopologicalSort() throws InvalidIRException { @@ -267,8 +278,10 @@ public Map> verticesByHash() { } public void validate() throws InvalidIRException { + if (this.isEmpty()) return; + if (this.getVertices().stream().noneMatch(Vertex::isLeaf)) { - throw new InvalidIRException("Graph has no leaf vertices!" + this.toString()); + throw new InvalidIRException("Graph has no leaf vertices!\n" + this.toString()); } List> duplicates = verticesByHash().values().stream().filter((group) -> group.size() > 1).collect(Collectors.toList()); @@ -278,7 +291,7 @@ public void validate() throws InvalidIRException { String joinedErrorMessageGroups = errorMessageGroups.collect(Collectors.joining("\n---\n")); - throw new InvalidIRException("Some nodes on the graph are fully redundant!\n" + joinedErrorMessageGroups); + throw new InvalidIRException("Some nodes on the graph are fully redundant!\n" + this + "|" + joinedErrorMessageGroups); } } @@ -335,10 +348,12 @@ public String toString() { edgelessVerticesStr = ""; } - return "\n" + - edgesToFormat.map(Edge::toString).collect(Collectors.joining("\n")) + - edgelessVerticesStr + - "\n"; + return "**GRAPH**\n" + + "Vertices: " + this.vertices.size()+ " Edges: " + this.edges().count() + "\n" + + "----------------------" + + edgesToFormat.map(Edge::toString).collect(Collectors.joining("\n")) + + edgelessVerticesStr + + "\n**GRAPH**"; } public Stream isolatedVertices() { @@ -411,6 +426,9 @@ public Stream edges() { @Override public String hashSource() { - return this.vertices.stream().map(Vertex::hashSource).sorted().collect(Collectors.joining("\n")); + MessageDigest lineageDigest = Util.defaultMessageDigest(); + List sources = this.vertices.stream().parallel().map(Vertex::uniqueHash).sorted().map(String::getBytes).collect(Collectors.toList()); + sources.forEach(lineageDigest::update); + return Util.bytesToHexString(lineageDigest.digest()); } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/IfVertex.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/IfVertex.java index b6374e99a3d..014d77a0f41 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/IfVertex.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/IfVertex.java @@ -93,6 +93,6 @@ public IfVertex copy() { @Override public String individualHashSource() { - return this.getClass().getCanonicalName() + "|" + this.booleanExpression.hashSource(); + return this.getClass().getCanonicalName() + "{" + this.booleanExpression.hashSource() + "}"; } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/PlainEdge.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/PlainEdge.java index 32c9d6f68e1..64ee584305c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/PlainEdge.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/PlainEdge.java @@ -1,5 +1,6 @@ package org.logstash.config.ir.graph; +import org.logstash.common.Util; import org.logstash.config.ir.InvalidIRException; /** @@ -20,6 +21,11 @@ public String individualHashSource() { return this.getClass().getCanonicalName(); } + @Override + public String getId() { + return Util.digest(this.getFrom().getId() + "->" + this.getTo().getId()); + } + public PlainEdge(Vertex from, Vertex to) throws InvalidIRException { super(from, to); } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java index 9bb71a5421b..20e0c1d8e7c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java @@ -41,18 +41,19 @@ public PluginVertex(SourceMetadata meta, PluginDefinition pluginDefinition) { } public String toString() { - return "P[" + pluginDefinition + "]"; + return "P[" + pluginDefinition + "|" + this.getMeta() + "]"; } @Override public String individualHashSource() { - return Util.sha256(this.getClass().getCanonicalName() + "|" + + return Util.digest(this.getClass().getCanonicalName() + "|" + (this.id != null ? this.id : "NOID") + "|" + + //this.getMeta().getSourceLine() + "|" + this.getMeta().getSourceColumn() + "|" + // Temp hack REMOVE BEFORE RELEASE this.getPluginDefinition().hashSource()); } public String individualHash() { - return Util.sha256(individualHashSource()); + return Util.digest(individualHashSource()); } @Override diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java index 846c2ef4623..0196869ed9a 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java @@ -7,10 +7,8 @@ import org.logstash.config.ir.SourceMetadata; import org.logstash.config.ir.graph.algorithms.DepthFirst; -import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Collection; import java.util.Collections; import java.util.stream.Collectors; @@ -123,29 +121,36 @@ public int rank() { } @Override - public String hashSource() { + public String uniqueHash() { // Sort the lineage to ensure consistency. We prepend each item with a lexicographically sortable // encoding of its rank (using hex notation) so that the sort order is identical to the traversal order. // This is a required since there may be individually identical components in different locations in the graph. // It is, however, illegal to have functionally identical vertices, that is to say two vertices with the same // contents that have the same lineage. - try { - MessageDigest lineageDigest = MessageDigest.getInstance("SHA-256"); - - // The lineage can be quite long and we want to avoid the quadratic complexity of string concatenation - lineage(). - map(Vertex::contextualHashSource). - sorted(). - forEachOrdered(v -> { - byte[] bytes = v.getBytes(StandardCharsets.UTF_8); - lineageDigest.update(bytes); - }); - - return hashPrefix() + Util.bytesToHexString(lineageDigest.digest()); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } + MessageDigest lineageDigest = Util.defaultMessageDigest(); + + lineageDigest.update(hashPrefix().getBytes()); + + // The lineage can be quite long and we want to avoid the quadratic complexity of string concatenation + // Thus, in this case there's no real way to get the hash source, we just hash as we go. + lineage(). + map(Vertex::contextualHashSource). + sorted(). + forEachOrdered(v -> { + byte[] bytes = v.getBytes(StandardCharsets.UTF_8); + lineageDigest.update(bytes); + }); + + String digest = Util.bytesToHexString(lineageDigest.digest()); + + return digest; + } + + @Override + public String hashSource() { + // In this case the source can be quite large, so we never actually use this function. + return this.uniqueHash(); } public String hashPrefix() { @@ -154,14 +159,14 @@ public String hashPrefix() { public String contextualHashSource() { // This string must be lexicographically sortable hence the ID at the front. It also must have the individualHashSource - // repeated at the front for the case of a graph with two nodes at the same rank + // repeated at the front for the case of a graph with two nodes at the same rank, same contents, but different lineages StringBuilder result = new StringBuilder(); result.append(hashPrefix()); + result.append(individualHashSource()); - - result.append("INCOMING="); + result.append("I:"); this.incomingEdges().map(Edge::individualHashSource).sorted().forEachOrdered(result::append); - result.append("OUTGOING="); + result.append("O:"); this.outgoingEdges().map(Edge::individualHashSource).sorted().forEachOrdered(result::append); return result.toString(); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/imperative/IfStatement.java b/logstash-core/src/main/java/org/logstash/config/ir/imperative/IfStatement.java index 871de9ebace..76efbbe0759 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/imperative/IfStatement.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/imperative/IfStatement.java @@ -93,11 +93,11 @@ public Graph toGraph() throws InvalidIRException { newGraph.addVertex(ifVertex); for (Vertex v : trueRoots) { - newGraph.threadVertices(BooleanEdge.trueFactory, ifVertex, v); + newGraph.threadVerticesUnsafe(BooleanEdge.trueFactory, ifVertex, v); } for (Vertex v : falseRoots) { - newGraph.threadVertices(BooleanEdge.falseFactory, ifVertex, v); + newGraph.threadVerticesUnsafe(BooleanEdge.falseFactory, ifVertex, v); } return newGraph; diff --git a/logstash-core/src/test/java/org/logstash/config/ir/IRHelpers.java b/logstash-core/src/test/java/org/logstash/config/ir/IRHelpers.java index 1379fee5694..96a5e517965 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/IRHelpers.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/IRHelpers.java @@ -99,6 +99,11 @@ public Edge copy(Vertex from, Vertex to) throws InvalidIRException { public String individualHashSource() { return "TEdge"; } + + @Override + public String getId() { + return individualHashSource(); + } } public static BooleanExpression testExpression() throws InvalidIRException { diff --git a/logstash-core/src/test/java/org/logstash/config/ir/graph/GraphTest.java b/logstash-core/src/test/java/org/logstash/config/ir/graph/GraphTest.java index 238f09c56f0..91740ac3959 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/graph/GraphTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/graph/GraphTest.java @@ -1,8 +1,12 @@ package org.logstash.config.ir.graph; import org.junit.Test; +import org.logstash.config.ir.DSL; import org.logstash.config.ir.IRHelpers; import org.logstash.config.ir.InvalidIRException; +import org.logstash.config.ir.PluginDefinition; +import org.logstash.config.ir.graph.algorithms.GraphDiff; +import org.logstash.config.ir.imperative.IfStatement; import java.util.ArrayList; import java.util.Collection; @@ -82,11 +86,11 @@ public void SimpleConsistencyTest() throws InvalidIRException { } @Test - public void ComplexConsistencyTest() throws InvalidIRException { + public void complexConsistencyTest() throws InvalidIRException { Graph g1 = IRHelpers.samplePipeline().getGraph(); Graph g2 = IRHelpers.samplePipeline().getGraph(); - assertEquals(g1.hashSource(), g2.hashSource()); + assertEquals(g1.uniqueHash(), g2.uniqueHash()); } @Test @@ -139,6 +143,27 @@ public void copyTest() throws InvalidIRException { assertTrue(rv.sourceComponentEquals(lv)); } + @Test + public void uniqueHashingOfSimilarLeaves() throws InvalidIRException { + // the initial implementation didn't handle this well, so we'll leave it here as a tricky test + + IfStatement imperative = DSL.iIf( + DSL.eTruthy(DSL.eValue("1")), + DSL.iPlugin(PluginDefinition.Type.FILTER, "drop"), + DSL.iIf( + DSL.eTruthy(DSL.eValue("2")), + DSL.iPlugin(PluginDefinition.Type.FILTER, "drop"), + DSL.iIf( + DSL.eTruthy(DSL.eValue("3")), + DSL.iPlugin(PluginDefinition.Type.FILTER, "drop") + ) + ) + ); + + Graph g = imperative.toGraph(); + g.validate(); + } + private void assertVerticesConnected(Graph graph, String fromId, String toId) { Vertex from = graph.getVertexById(fromId); assertNotNull(from); diff --git a/logstash-core/src/test/java/org/logstash/config/ir/imperative/ImperativeToGraphtest.java b/logstash-core/src/test/java/org/logstash/config/ir/imperative/ImperativeToGraphtest.java index e3ad7d7b5a8..10265244a02 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/imperative/ImperativeToGraphtest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/imperative/ImperativeToGraphtest.java @@ -19,6 +19,8 @@ public class ImperativeToGraphtest { @Test public void convertSimpleExpression() throws InvalidIRException { Graph imperative = iComposeSequence(iPlugin(FILTER, "json"), iPlugin(FILTER, "stuff")).toGraph(); + imperative.validate(); // Verify this is a valid graph + Graph regular = Graph.empty(); regular.threadVertices(gPlugin(FILTER, "json"), gPlugin(FILTER, "stuff")); @@ -28,6 +30,8 @@ public void convertSimpleExpression() throws InvalidIRException { @Test public void testIdsDontAffectSourceComponentEquality() throws InvalidIRException { Graph imperative = iComposeSequence(iPlugin(FILTER, "json", "oneid"), iPlugin(FILTER, "stuff", "anotherid")).toGraph(); + imperative.validate(); // Verify this is a valid graph + Graph regular = Graph.empty(); regular.threadVertices(gPlugin(FILTER, "json", "someotherid"), gPlugin(FILTER, "stuff", "graphid")); @@ -36,7 +40,7 @@ public void testIdsDontAffectSourceComponentEquality() throws InvalidIRException @Test public void convertComplexExpression() throws InvalidIRException { - Graph generated = iComposeSequence( + Graph imperative = iComposeSequence( iPlugin(FILTER, "p1"), iPlugin(FILTER, "p2"), iIf(eAnd(eTruthy(eValue(5l)), eTruthy(eValue(null))), @@ -44,6 +48,7 @@ public void convertComplexExpression() throws InvalidIRException { iComposeSequence(iPlugin(FILTER, "p4"), iPlugin(FILTER, "p5")) ) ).toGraph(); + imperative.validate(); // Verify this is a valid graph PluginVertex p1 = gPlugin(FILTER, "p1"); PluginVertex p2 = gPlugin(FILTER, "p2"); @@ -58,14 +63,14 @@ public void convertComplexExpression() throws InvalidIRException { expected.threadVertices(false, testIf, p4); expected.threadVertices(p4, p5); - assertGraphEquals(expected, generated); + assertGraphEquals(expected, imperative); } // This test has an imperative grammar with nested ifs and dangling // partial leaves. This makes sure they all wire-up right @Test public void deepDanglingPartialLeaves() throws InvalidIRException { - Graph generated = iComposeSequence( + Graph imperative = iComposeSequence( iPlugin(FILTER, "p0"), iIf(eTruthy(eValue(1)), iPlugin(FILTER, "p1"), @@ -79,6 +84,7 @@ public void deepDanglingPartialLeaves() throws InvalidIRException { iPlugin(FILTER, "pLast") ).toGraph(); + imperative.validate(); // Verify this is a valid graph IfVertex if1 = gIf(eTruthy(eValue(1))); IfVertex if2 = gIf(eTruthy(eValue(2))); @@ -104,7 +110,7 @@ public void deepDanglingPartialLeaves() throws InvalidIRException { expected.threadVertices(p3, pLast); expected.threadVertices(p4,pLast); - assertGraphEquals(generated, expected); + assertGraphEquals(imperative, expected); } // This is a good test for what the filter block will do, where there @@ -112,7 +118,7 @@ public void deepDanglingPartialLeaves() throws InvalidIRException { // single node @Test public void convertComplexExpressionWithTerminal() throws InvalidIRException { - Graph generated = iComposeSequence( + Graph imperative = iComposeSequence( iPlugin(FILTER, "p1"), iIf(eTruthy(eValue(1)), iComposeSequence( @@ -126,6 +132,7 @@ public void convertComplexExpressionWithTerminal() throws InvalidIRException { ), iPlugin(FILTER, "terminal") ).toGraph(); + imperative.validate(); // Verify this is a valid graph PluginVertex p1 = gPlugin(FILTER,"p1"); PluginVertex p2 = gPlugin(FILTER, "p2"); @@ -154,7 +161,7 @@ public void convertComplexExpressionWithTerminal() throws InvalidIRException { expected.threadVertices(p4, p5); expected.threadVertices(p5, terminal); - assertGraphEquals(generated, expected); + assertGraphEquals(imperative, expected); } } diff --git a/rakelib/compile.rake b/rakelib/compile.rake index b2df88ec222..d4246376dd7 100644 --- a/rakelib/compile.rake +++ b/rakelib/compile.rake @@ -10,20 +10,27 @@ namespace "compile" do desc "Compile the config grammar" task "grammar" => "logstash-core/lib/logstash/config/grammar.rb" + + def safe_system(*args) + if !system(*args) + status = $? + raise "Got exit status #{status.exitstatus} attempting to execute #{args.inspect}!" + end + end task "logstash-core-java" do puts("Building logstash-core using gradle") - system("./gradlew", "jar", "-p", "./logstash-core") + safe_system("./gradlew", "jar", "-p", "./logstash-core") end task "logstash-core-event-java" do puts("Building logstash-core-event-java using gradle") - system("./gradlew", "jar", "-p", "./logstash-core-event-java") + safe_system("./gradlew", "jar", "-p", "./logstash-core-event-java") end task "logstash-core-queue-jruby" do puts("Building logstash-core-queue-jruby using gradle") - system("./gradlew", "jar", "-p", "./logstash-core-queue-jruby") + safe_system("./gradlew", "jar", "-p", "./logstash-core-queue-jruby") end desc "Build everything"