Skip to content

Commit

Permalink
Fix graph hashing
Browse files Browse the repository at this point in the history
Add UniversalPlugin hooks for pipeline events
  • Loading branch information
andrewvc committed Feb 13, 2017
1 parent ab65407 commit f4b8f58
Show file tree
Hide file tree
Showing 26 changed files with 299 additions and 167 deletions.
88 changes: 88 additions & 0 deletions apache_stdout.conf
@@ -0,0 +1,88 @@
input {
file {
id => "logfileRead"
start_position => beginning
ignore_older => 0
path => "/Users/andrewvc/projects/ls_apache_materials/apache_access_logs"
}

stdin {id => logStdin}
}

filter {
grok {
id => "apacheCommonLog"
match => {
"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
}
}

geoip {
id => "clientGeo"
source => clientip
target => geoip
}

useragent {
id => "clientUA"
source => agent
target => useragent
}


date {
id => "clientDate"
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
locale => en
}

if [geoip][country_code2] == "US" {
mutate {
id => "addUsRegion"
add_field => { "aws-region" => "us-east-1" }
}
} else if [geoip][country_code2] == "CA" {
if [referrer] =~ /google/ {
sleep {
id => "pointlessSleep"
time => 0.001
}
}
} else {
mutate {
id => addOtherRegion
add_field => { "aws-region" => "eu-central-1" }
}
}


if [request] =~ /(?i)\.(png|jpg|gif)$/ {
grok {
id => grokImage
match => {
request => "%{(?i)\.(png|jpg|gif)$:extension}"
}
add_tag => ["image"]
}

mutate {
id => addCanadianRegion
add_field => { "aws-region" => "ca-central-1" }
}
} else if [request] =~ /articles/ {
mutate {
id => tagArticle
add_tag => ["article"]
}
}
}

output {
elasticsearch {
id => "mainEs"
index => "%{@type}-"
}
if [geoip][country_code2] != "US" {
stdout { id => "linuxStdout" codec => json_lines }
}
}
90 changes: 0 additions & 90 deletions gradlew.bat

This file was deleted.

22 changes: 16 additions & 6 deletions logstash-core/lib/logstash-core_jars.rb
@@ -1,8 +1,18 @@
# this is a generated file, to avoid over-writing it just delete this comment
require 'jar_dependencies'
begin
require 'jar_dependencies'
rescue LoadError
require 'org/apache/logging/log4j/log4j-core/2.6.2/log4j-core-2.6.2.jar'
require 'org/apache/logging/log4j/log4j-api/2.6.2/log4j-api-2.6.2.jar'
require 'com/fasterxml/jackson/core/jackson-core/2.7.4/jackson-core-2.7.4.jar'
require 'com/fasterxml/jackson/core/jackson-annotations/2.7.0/jackson-annotations-2.7.0.jar'
require 'com/fasterxml/jackson/core/jackson-databind/2.7.4/jackson-databind-2.7.4.jar'
end

require_jar( 'org.apache.logging.log4j', 'log4j-core', '2.6.2' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.0' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.7.4' )
require_jar( 'org.apache.logging.log4j', 'log4j-api', '2.6.2' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.4' )
if defined? Jars
require_jar( 'org.apache.logging.log4j', 'log4j-core', '2.6.2' )
require_jar( 'org.apache.logging.log4j', 'log4j-api', '2.6.2' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.4' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.0' )
require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.7.4' )
end
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Expand Up @@ -314,6 +314,7 @@ def start_pipeline(id)
if !t.alive?
return false
elsif pipeline.ready?
dispatcher.fire(:pipeline_started, pipeline)
return true
else
sleep 0.01
Expand All @@ -327,6 +328,7 @@ def stop_pipeline(id)
@logger.warn("stopping pipeline", :id => id)
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
@pipelines[id].thread.join
dispatcher.fire(:pipeline_stopped, pipeline)
end

def start_pipelines
Expand Down
19 changes: 15 additions & 4 deletions logstash-core/lib/logstash/compiler/lscl.rb
Expand Up @@ -478,10 +478,21 @@ class MethodCall < Node; end

class RegexpExpression < Node
def expr
selector, operator_method, regexp = recursive_select(Selector, LogStash::Compiler::LSCL::AST::RegExpOperator, LogStash::Compiler::LSCL::AST::RegExp).map(&:expr)

raise "Expected a selector #{text_value}!" unless selector
raise "Expected a regexp #{text_value}!" unless regexp
selector, operator_method, regexp = recursive_select(
Selector,
LogStash::Compiler::LSCL::AST::RegExpOperator,
LogStash::Compiler::LSCL::AST::RegExp,
LogStash::Compiler::LSCL::AST::String # Strings work as rvalues! :p
).map(&:expr)

# Handle string rvalues, they just get turned into regexps
# Maybe we really shouldn't handle these anymore...
if regexp.class == org.logstash.config.ir.expression.ValueExpression
regexp = jdsl.eRegex(regexp.get)
end

raise "Expected a selector in #{text_value}!" unless selector
raise "Expected a regexp in #{text_value}!" unless regexp

operator_method.call(source_meta, selector, regexp);
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/instrument/metric_store.rb
Expand Up @@ -218,7 +218,7 @@ def get_recursively(key_paths, map, new_hash)
key_candidates = extract_filter_keys(key_paths.shift)

key_candidates.each do |key_candidate|
raise MetricNotFound, "For path: #{key_candidate}" if map[key_candidate].nil?
raise MetricNotFound, "For path: #{key_candidate}. Map keys: #{map.keys}" if map[key_candidate].nil?

if key_paths.empty? # End of the user requested path
if map[key_candidate].is_a?(Concurrent::Map)
Expand Down
13 changes: 11 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Expand Up @@ -21,6 +21,7 @@
require "logstash/instrument/collector"
require "logstash/output_delegator"
require "logstash/filter_delegator"
require 'logstash/compiler'

module LogStash; class Pipeline
include LogStash::Util::Loggable
Expand All @@ -41,7 +42,8 @@ module LogStash; class Pipeline
:metric,
:filter_queue_client,
:input_queue_client,
:queue
:queue,
:lir

MAX_INFLIGHT_WARN_THRESHOLD = 10_000

Expand All @@ -53,13 +55,16 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
@logger = self.logger
@config_str = config_str
@config_hash = Digest::SHA1.hexdigest(@config_str)

@lir = compile_lir

# Every time #plugin is invoked this is incremented to give each plugin
# a unique id when auto-generating plugin ids
@plugin_counter ||= 0
@settings = settings
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
@reporter = PipelineReporter.new(@logger, self)

# A list of plugins indexed by id
@plugins_by_id = {}
@inputs = nil
Expand Down Expand Up @@ -119,6 +124,10 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
@running = Concurrent::AtomicBoolean.new(false)
@flushing = Concurrent::AtomicReference.new(false)
end # def initialize

def compile_lir
LogStash::Compiler.compile_pipeline(self.config_str)
end

def build_queue_from_settings
queue_type = settings.get("queue.type")
Expand Down
9 changes: 9 additions & 0 deletions logstash-core/spec/logstash/compiler/compiler_spec.rb
Expand Up @@ -392,6 +392,15 @@ def compose(*statements)
it "should compile correctly" do
expect(c_expression).to ir_eql(j.eRegexEq(j.eEventValue("[foo]"), j.eRegex('^abc$')))
end

# Believe it or not, "\.\." is a valid regexp!
describe "when given a quoted regexp" do
let(:expression) { '[foo] =~ "\\.\\."' }

it "should compile correctly" do
expect(c_expression).to ir_eql(j.eRegexEq(j.eEventValue("[foo]"), j.eRegex('\\.\\.')))
end
end
end

describe "'!~'" do
Expand Down
15 changes: 10 additions & 5 deletions logstash-core/src/main/java/org/logstash/common/Util.java
Expand Up @@ -9,16 +9,21 @@
*/
public class Util {
// Modified from http://stackoverflow.com/a/11009612/11105
public static String sha256(String base) {

public static MessageDigest defaultMessageDigest() {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(base.getBytes(StandardCharsets.UTF_8));
return bytesToHexString(hash);
return MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Your system is (somehow) missing the SHA-256 algorithm!", e);
throw new RuntimeException(e);
}
}

public static String digest(String base) {
MessageDigest digest = defaultMessageDigest();
byte[] hash = digest.digest(base.getBytes(StandardCharsets.UTF_8));
return bytesToHexString(hash);
}

public static String bytesToHexString(byte[] bytes) {
StringBuilder hexString = new StringBuilder();

Expand Down
Expand Up @@ -9,6 +9,6 @@ public interface IHashable {
String hashSource();

default String uniqueHash() {
return Util.sha256(this.hashSource());
return Util.digest(this.hashSource());
}
}
Expand Up @@ -30,6 +30,11 @@ public SpecialVertex getFilterOut() {
private final SpecialVertex filterOut;

public Pipeline(Graph inputSection, Graph filterSection, Graph outputSection) throws InvalidIRException {
// Validate all incoming graphs, we can't turn an invalid graph into a Pipeline!
inputSection.validate();
filterSection.validate();
outputSection.validate();

Graph tempGraph = inputSection.copy(); // The input section are our roots, so we can import that wholesale

// Connect all the input vertices out to the queue
Expand Down
Expand Up @@ -48,6 +48,6 @@ public String toRubyString() {

@Override
public String hashSource() {
return getLeft().hashSource() + this.getClass().getCanonicalName() + getRight().hashSource();
return this.getClass().getCanonicalName() + "[" + getLeft().hashSource() + "|" + getRight().hashSource() + "]";
}
}
Expand Up @@ -7,7 +7,7 @@
import org.logstash.config.ir.SourceComponent;
import org.logstash.config.ir.SourceMetadata;

/**
/*
* [foo] == "foostr" eAnd [bar] > 10
* eAnd(eEq(eventValueExpr("foo"), value("foostr")), eEq(eEventValue("bar"), value(10)))
*
Expand Down
Expand Up @@ -22,6 +22,6 @@ public UnaryBooleanExpression(SourceMetadata meta,

@Override
public String hashSource() {
return this.getClass().getCanonicalName() + this.expression.hashSource();
return this.getClass().getCanonicalName() + "[" + this.expression.hashSource() + "]";
}
}

0 comments on commit f4b8f58

Please sign in to comment.