Skip to content

Commit

Permalink
Fix graph hashing (#6677)
Browse files Browse the repository at this point in the history
Fix graph hashing and add UniversalPlugin hooks for pipeline events
  • Loading branch information
andrewvc committed Feb 14, 2017
1 parent fa2ca09 commit ebc54ad
Show file tree
Hide file tree
Showing 24 changed files with 212 additions and 78 deletions.
22 changes: 16 additions & 6 deletions logstash-core/lib/logstash-core_jars.rb
@@ -1,8 +1,18 @@
# this is a generated file, to avoid over-writing it just delete this comment
require 'jar_dependencies'
begin
require 'jar_dependencies'
rescue LoadError
require 'org/apache/logging/log4j/log4j-core/2.6.2/log4j-core-2.6.2.jar'
require 'org/apache/logging/log4j/log4j-api/2.6.2/log4j-api-2.6.2.jar'
require 'com/fasterxml/jackson/core/jackson-core/2.7.4/jackson-core-2.7.4.jar'
require 'com/fasterxml/jackson/core/jackson-annotations/2.7.0/jackson-annotations-2.7.0.jar'
require 'com/fasterxml/jackson/core/jackson-databind/2.7.4/jackson-databind-2.7.4.jar'
end

require_jar( 'org.apache.logging.log4j', 'log4j-core', '2.6.2' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.0' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.7.4' )
require_jar( 'org.apache.logging.log4j', 'log4j-api', '2.6.2' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.4' )
if defined? Jars
require_jar( 'org.apache.logging.log4j', 'log4j-core', '2.6.2' )
require_jar( 'org.apache.logging.log4j', 'log4j-api', '2.6.2' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.4' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.0' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.7.4' )
end
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Expand Up @@ -403,6 +403,7 @@ def start_pipeline(id)
if !t.alive?
return false
elsif pipeline.running?
dispatcher.fire(:pipeline_started, pipeline)
return true
else
sleep 0.01
Expand All @@ -416,6 +417,7 @@ def stop_pipeline(id)
@logger.warn("stopping pipeline", :id => id)
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
@pipelines[id].thread.join
dispatcher.fire(:pipeline_stopped, pipeline)
end

def start_pipelines
Expand Down
19 changes: 15 additions & 4 deletions logstash-core/lib/logstash/compiler/lscl.rb
Expand Up @@ -478,10 +478,21 @@ class MethodCall < Node; end

class RegexpExpression < Node
def expr
selector, operator_method, regexp = recursive_select(Selector, LogStash::Compiler::LSCL::AST::RegExpOperator, LogStash::Compiler::LSCL::AST::RegExp).map(&:expr)

raise "Expected a selector #{text_value}!" unless selector
raise "Expected a regexp #{text_value}!" unless regexp
selector, operator_method, regexp = recursive_select(
Selector,
LogStash::Compiler::LSCL::AST::RegExpOperator,
LogStash::Compiler::LSCL::AST::RegExp,
LogStash::Compiler::LSCL::AST::String # Strings work as rvalues! :p
).map(&:expr)

# Handle string rvalues, they just get turned into regexps
# Maybe we really shouldn't handle these anymore...
if regexp.class == org.logstash.config.ir.expression.ValueExpression
regexp = jdsl.eRegex(regexp.get)
end

raise "Expected a selector in #{text_value}!" unless selector
raise "Expected a regexp in #{text_value}!" unless regexp

operator_method.call(source_meta, selector, regexp);
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/instrument/metric_store.rb
Expand Up @@ -218,7 +218,7 @@ def get_recursively(key_paths, map, new_hash)
key_candidates = extract_filter_keys(key_paths.shift)

key_candidates.each do |key_candidate|
raise MetricNotFound, "For path: #{key_candidate}" if map[key_candidate].nil?
raise MetricNotFound, "For path: #{key_candidate}. Map keys: #{map.keys}" if map[key_candidate].nil?

if key_paths.empty? # End of the user requested path
if map[key_candidate].is_a?(Concurrent::Map)
Expand Down
15 changes: 12 additions & 3 deletions logstash-core/lib/logstash/pipeline.rb
Expand Up @@ -20,16 +20,20 @@
require "logstash/output_delegator"
require "logstash/filter_delegator"
require "logstash/queue_factory"
require 'logstash/compiler'

module LogStash; class BasePipeline
include LogStash::Util::Loggable

attr_reader :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id

attr_reader :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id, :lir
def initialize(config_str, settings = SETTINGS)
@logger = self.logger
@config_str = config_str
@config_hash = Digest::SHA1.hexdigest(@config_str)

@lir = compile_lir

# Every time #plugin is invoked this is incremented to give each plugin
# a unique id when auto-generating plugin ids
@plugin_counter ||= 0
Expand Down Expand Up @@ -62,6 +66,10 @@ def initialize(config_str, settings = SETTINGS)
raise e
end
end

def compile_lir
LogStash::Compiler.compile_pipeline(self.config_str)
end

def plugin(plugin_type, name, *args)
@plugin_counter += 1
Expand Down Expand Up @@ -164,6 +172,8 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
@running = Concurrent::AtomicBoolean.new(false)
@flushing = Concurrent::AtomicReference.new(false)
end # def initialize



def ready?
@ready.value
Expand Down Expand Up @@ -499,7 +509,6 @@ def flush
end
end


# Calculate the uptime in milliseconds
#
# @return [Fixnum] Uptime in milliseconds, 0 if the pipeline is not started
Expand Down
9 changes: 9 additions & 0 deletions logstash-core/spec/logstash/compiler/compiler_spec.rb
Expand Up @@ -392,6 +392,15 @@ def compose(*statements)
it "should compile correctly" do
expect(c_expression).to ir_eql(j.eRegexEq(j.eEventValue("[foo]"), j.eRegex('^abc$')))
end

# Believe it or not, "\.\." is a valid regexp!
describe "when given a quoted regexp" do
let(:expression) { '[foo] =~ "\\.\\."' }

it "should compile correctly" do
expect(c_expression).to ir_eql(j.eRegexEq(j.eEventValue("[foo]"), j.eRegex('\\.\\.')))
end
end
end

describe "'!~'" do
Expand Down
15 changes: 10 additions & 5 deletions logstash-core/src/main/java/org/logstash/common/Util.java
Expand Up @@ -9,16 +9,21 @@
*/
public class Util {
// Modified from http://stackoverflow.com/a/11009612/11105
public static String sha256(String base) {

public static MessageDigest defaultMessageDigest() {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(base.getBytes(StandardCharsets.UTF_8));
return bytesToHexString(hash);
return MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Your system is (somehow) missing the SHA-256 algorithm!", e);
throw new RuntimeException(e);
}
}

public static String digest(String base) {
MessageDigest digest = defaultMessageDigest();
byte[] hash = digest.digest(base.getBytes(StandardCharsets.UTF_8));
return bytesToHexString(hash);
}

public static String bytesToHexString(byte[] bytes) {
StringBuilder hexString = new StringBuilder();

Expand Down
Expand Up @@ -9,6 +9,6 @@ public interface IHashable {
String hashSource();

default String uniqueHash() {
return Util.sha256(this.hashSource());
return Util.digest(this.hashSource());
}
}
Expand Up @@ -30,6 +30,11 @@ public SpecialVertex getFilterOut() {
private final SpecialVertex filterOut;

public Pipeline(Graph inputSection, Graph filterSection, Graph outputSection) throws InvalidIRException {
// Validate all incoming graphs, we can't turn an invalid graph into a Pipeline!
inputSection.validate();
filterSection.validate();
outputSection.validate();

Graph tempGraph = inputSection.copy(); // The input section are our roots, so we can import that wholesale

// Connect all the input vertices out to the queue
Expand Down
Expand Up @@ -48,6 +48,6 @@ public String toRubyString() {

@Override
public String hashSource() {
return getLeft().hashSource() + this.getClass().getCanonicalName() + getRight().hashSource();
return this.getClass().getCanonicalName() + "[" + getLeft().hashSource() + "|" + getRight().hashSource() + "]";
}
}
Expand Up @@ -7,7 +7,7 @@
import org.logstash.config.ir.SourceComponent;
import org.logstash.config.ir.SourceMetadata;

/**
/*
* [foo] == "foostr" eAnd [bar] > 10
* eAnd(eEq(eventValueExpr("foo"), value("foostr")), eEq(eEventValue("bar"), value(10)))
*
Expand Down
Expand Up @@ -22,6 +22,6 @@ public UnaryBooleanExpression(SourceMetadata meta,

@Override
public String hashSource() {
return this.getClass().getCanonicalName() + this.expression.hashSource();
return this.getClass().getCanonicalName() + "[" + this.expression.hashSource() + "]";
}
}
@@ -1,5 +1,6 @@
package org.logstash.config.ir.graph;

import org.logstash.common.Util;
import org.logstash.config.ir.ISourceComponent;
import org.logstash.config.ir.InvalidIRException;

Expand Down Expand Up @@ -50,7 +51,12 @@ public BooleanEdge(Boolean edgeType, Vertex outVertex, Vertex inVertex) throws I

@Override
public String individualHashSource() {
return this.getClass().getCanonicalName() + "|" + this.getEdgeType();
return this.getClass().getCanonicalName() + "|" + this.getEdgeType() + "|";
}

@Override
public String getId() {
return Util.digest(this.getFrom().getId() + "[" + this.getEdgeType() + "]->" + this.getTo().getId());
}

public String toString() {
Expand Down
Expand Up @@ -89,6 +89,9 @@ public boolean sourceComponentEquals(ISourceComponent sourceComponent) {

public abstract String individualHashSource();


public abstract String getId();

@Override
public SourceMetadata getMeta() {
return null;
Expand Down
46 changes: 32 additions & 14 deletions logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java
@@ -1,5 +1,6 @@
package org.logstash.config.ir.graph;

import org.logstash.common.Util;
import org.logstash.config.ir.IHashable;
import org.logstash.config.ir.ISourceComponent;
import org.logstash.config.ir.InvalidIRException;
Expand All @@ -8,6 +9,7 @@
import org.logstash.config.ir.graph.algorithms.GraphDiff;
import org.logstash.config.ir.graph.algorithms.TopologicalSort;

import java.security.MessageDigest;
import java.util.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand All @@ -17,14 +19,15 @@
* Created by andrewvc on 9/15/16.
*/
public class Graph implements ISourceComponent, IHashable {
private final Set<Vertex> vertices = new HashSet<>();
public final Set<Vertex> vertices = new HashSet<>();
private final Set<Edge> edges = new HashSet<>();
private Map<Vertex, Integer> vertexRanks = new HashMap<>();
private final Map<Vertex,Set<Edge>> outgoingEdgeLookup = new HashMap<>();
private final Map<Vertex,Set<Edge>> incomingEdgeLookup = new HashMap<>();
private List<Vertex> sortedVertices;


// Builds a graph that has the specified vertices and edges
// Note that this does *not* validate the result
public Graph(Collection<Vertex> vertices, Collection<Edge> edges) throws InvalidIRException {
for (Vertex vertex : vertices) { this.addVertex(vertex, false); }
for (Edge edge : edges) { this.addEdge(edge, false); }
Expand Down Expand Up @@ -195,7 +198,10 @@ public Collection<Edge> threadVerticesById(Edge.EdgeFactory edgeFactory, String.
return threadVertices(edgeFactory, argVertices);
}

public Collection<Edge> threadVertices(Edge.EdgeFactory edgeFactory, Vertex... argVertices) throws InvalidIRException {
// Will not validate the graph after running!
// You must invoke validate the graph yourself
// after invoking
public Collection<Edge> threadVerticesUnsafe(Edge.EdgeFactory edgeFactory, Vertex... argVertices) throws InvalidIRException {
List<Vertex> importedVertices = new ArrayList<>(argVertices.length);
for (Vertex va : argVertices) {
importedVertices.add(this.importVertex(va));
Expand All @@ -206,19 +212,25 @@ public Collection<Edge> threadVertices(Edge.EdgeFactory edgeFactory, Vertex... a
Vertex from = importedVertices.get(i);
Vertex to = importedVertices.get(i+1);

this.addVertex(from);
this.addVertex(to);
this.addVertex(from, false);
this.addVertex(to, false);

Edge edge = edgeFactory.make(from, to);
newEdges.add(edge);
this.addEdge(edge);
this.addEdge(edge, false);
}

refresh();

return newEdges;
}

public Collection<Edge> threadVertices(Edge.EdgeFactory edgeFactory, Vertex... argVertices) throws InvalidIRException {
Collection<Edge> edges = threadVerticesUnsafe(edgeFactory, argVertices);
validate();
return edges;
}

public Edge threadVertices(Vertex a, Vertex b) throws InvalidIRException {
return threadVertices(PlainEdge.factory, a, b).stream().findFirst().get();
}
Expand All @@ -240,7 +252,6 @@ public Collection<Edge> threadVertices(boolean bool, Vertex... vertices) throws
public void refresh() throws InvalidIRException {
this.calculateRanks();
this.calculateTopologicalSort();
this.validate();
}

private void calculateTopologicalSort() throws InvalidIRException {
Expand All @@ -267,8 +278,10 @@ public Map<String, List<Vertex>> verticesByHash() {
}

public void validate() throws InvalidIRException {
if (this.isEmpty()) return;

if (this.getVertices().stream().noneMatch(Vertex::isLeaf)) {
throw new InvalidIRException("Graph has no leaf vertices!" + this.toString());
throw new InvalidIRException("Graph has no leaf vertices!\n" + this.toString());
}

List<List<Vertex>> duplicates = verticesByHash().values().stream().filter((group) -> group.size() > 1).collect(Collectors.toList());
Expand All @@ -278,7 +291,7 @@ public void validate() throws InvalidIRException {

String joinedErrorMessageGroups = errorMessageGroups.collect(Collectors.joining("\n---\n"));

throw new InvalidIRException("Some nodes on the graph are fully redundant!\n" + joinedErrorMessageGroups);
throw new InvalidIRException("Some nodes on the graph are fully redundant!\n" + this + "|" + joinedErrorMessageGroups);
}
}

Expand Down Expand Up @@ -335,10 +348,12 @@ public String toString() {
edgelessVerticesStr = "";
}

return "<GRAPH>\n" +
edgesToFormat.map(Edge::toString).collect(Collectors.joining("\n")) +
edgelessVerticesStr +
"\n</GRAPH>";
return "**GRAPH**\n" +
"Vertices: " + this.vertices.size()+ " Edges: " + this.edges().count() + "\n" +
"----------------------" +
edgesToFormat.map(Edge::toString).collect(Collectors.joining("\n")) +
edgelessVerticesStr +
"\n**GRAPH**";
}

public Stream<Vertex> isolatedVertices() {
Expand Down Expand Up @@ -411,6 +426,9 @@ public Stream<Edge> edges() {

@Override
public String hashSource() {
return this.vertices.stream().map(Vertex::hashSource).sorted().collect(Collectors.joining("\n"));
MessageDigest lineageDigest = Util.defaultMessageDigest();
List<byte[]> sources = this.vertices.stream().parallel().map(Vertex::uniqueHash).sorted().map(String::getBytes).collect(Collectors.toList());
sources.forEach(lineageDigest::update);
return Util.bytesToHexString(lineageDigest.digest());
}
}
Expand Up @@ -93,6 +93,6 @@ public IfVertex copy() {

@Override
public String individualHashSource() {
return this.getClass().getCanonicalName() + "|" + this.booleanExpression.hashSource();
return this.getClass().getCanonicalName() + "{" + this.booleanExpression.hashSource() + "}";
}
}

0 comments on commit ebc54ad

Please sign in to comment.