Skip to content

Commit

Permalink
Add JSONPath for hash paths and add JSON parsing to the WebsiteAgent.
Browse files Browse the repository at this point in the history
  • Loading branch information
cantino committed Mar 17, 2013
1 parent 1e28e84 commit b876759
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 61 deletions.
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -8,6 +8,7 @@ gem 'kaminari'
gem 'bootstrap-kaminari-views'
gem "rufus-scheduler", :require => false
gem 'json', '>= 1.7.7'
gem 'jsonpath'

gem 'delayed_job', :git => 'https://github.com/wok/delayed_job' # Until the YAML issues are fixed in master.
gem 'delayed_job_active_record', "~> 0.3.3" # newer was giving a strange MySQL error
Expand Down
3 changes: 3 additions & 0 deletions Gemfile.lock
Expand Up @@ -115,6 +115,8 @@ GEM
jquery-rails
railties (>= 3.1.0)
json (1.7.7)
jsonpath (0.5.1)
multi_json
kaminari (0.14.1)
actionpack (>= 3.0.0)
activesupport (>= 3.0.0)
Expand Down Expand Up @@ -275,6 +277,7 @@ DEPENDENCIES
geokit-rails3
jquery-rails
json (>= 1.7.7)
jsonpath
kaminari
kramdown
mysql2
Expand Down
22 changes: 1 addition & 21 deletions app/models/agent.rb
Expand Up @@ -85,27 +85,7 @@ def validate_schedule
end

def make_message(payload, message = options[:message])
message.gsub(/<([^>]+)>/) { value_at(payload, $1) || "??" }
end

def value_at(data, path)
if data.is_a?(Hash)
path.split(".").inject(data) { |memo, segment|
if memo
if memo[segment]
memo[segment]
elsif memo[segment.to_sym]
memo[segment.to_sym]
else
nil
end
else
nil
end
}.to_s
else
data
end
message.gsub(/<([^>]+)>/) { Utils.value_at(payload, $1) || "??" }
end

def set_default_schedule
Expand Down
6 changes: 3 additions & 3 deletions app/models/agents/peak_detector_agent.rb
Expand Up @@ -7,7 +7,7 @@ class PeakDetectorAgent < Agent
description <<-MD
Use a PeakDetectorAgent to watch for peaks in an event stream. When a peak is detected, the resulting Event will have a payload message of `message`. You can include extractions in the message, for example: `I saw a bar of: <foo.bar>`
The `value_path` value is a hash path to the value of interest. `group_by_path` is a hash path that will be used to group values, if present.
The `value_path` value is a [JSONPaths](http://goessner.net/articles/JsonPath/) to the value of interest. `group_by_path` is a hash path that will be used to group values, if present.
Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
Expand Down Expand Up @@ -106,13 +106,13 @@ def peak_spacing
end

def group_for(event)
((options[:group_by_path].present? && value_at(event.payload, options[:group_by_path])) || 'no_group').to_sym
((options[:group_by_path].present? && Utils.value_at(event.payload, options[:group_by_path])) || 'no_group').to_sym
end

def remember(group, event)
memory[:data] ||= {}
memory[:data][group] ||= []
memory[:data][group] << [value_at(event.payload, options[:value_path]), event.created_at.to_i]
memory[:data][group] << [Utils.value_at(event.payload, options[:value_path]), event.created_at.to_i]
cleanup group
end

Expand Down
11 changes: 2 additions & 9 deletions app/models/agents/trigger_agent.rb
Expand Up @@ -7,14 +7,7 @@ class TriggerAgent < Agent
description <<-MD
Use a TriggerAgent to watch for a specific value in an Event payload.
The `rules` array contains hashes of `path`, `value`, and `type`. The `path` value is a dotted path through a hash, for example `foo.bar` would return `hello` from this structure:
{
:foo => {
:bar => "hello"
},
:something => "else"
}
The `rules` array contains hashes of `path`, `value`, and `type`. The `path` value is a dotted path through a hash in [JSONPaths](http://goessner.net/articles/JsonPath/) syntax.
The `type` can be one of #{VALID_COMPARISON_TYPES.map { |t| "`#{t}`" }.to_sentence} and compares with the `value`.
Expand Down Expand Up @@ -55,7 +48,7 @@ def working?
def receive(incoming_events)
incoming_events.each do |event|
match = options[:rules].all? do |rule|
value_at_path = value_at(event[:payload], rule[:path])
value_at_path = Utils.value_at(event[:payload], rule[:path])
case rule[:type]
when "regex"
value_at_path.to_s =~ Regexp.new(rule[:value], Regexp::IGNORECASE)
Expand Down
75 changes: 59 additions & 16 deletions app/models/agents/website_agent.rb
Expand Up @@ -7,25 +7,36 @@ class WebsiteAgent < Agent
cannot_receive_events!

description <<-MD
The WebsiteAgent scrapes a website and creates Events based on any changes in the results.
The WebsiteAgent scrapes a website, XML document, or JSON feed and creates Events based on the results.
Specify the website's `url` and select a `mode` for when to create Events based on the scraped data, either `all` or `on_change`.
Specify a `url` and select a `mode` for when to create Events based on the scraped data, either `all` or `on_change`.
To tell the Agent how to scrape the site, specify `extract` as a hash with keys naming the extractions and values of hashes.
These subhashes specify how to extract with a `:css` CSS selector and either `:text => true` or `attr` pointing to an attribute name to grab. An example:
The `type` value can be `xml`, `html`, or `json`.
To tell the Agent how to parse the content, specify `extract` as a hash with keys naming the extractions and values of hashes.
When parsing HTML or XML, these sub-hashes specify how to extract with a `:css` CSS selector and either `:text => true` or `attr` pointing to an attribute name to grab. An example:
:extract => {
:url => { :css => "#comic img", :attr => "src" },
:title => { :css => "#comic img", :attr => "title" },
:body_text => { :css => "div.main", :text => true }
}
Note that whatever you extract MUST have the same number of matches for each extractor. E.g., if you're extracting rows, all extractors must match all rows.
When parsing JSON, these sub-hashes specify [JSONPaths](http://goessner.net/articles/JsonPath/) to the values that you care about. For example:
:extract => {
:title => { :path => "results.data[*].title" },
:description => { :path => "results.data[*].description" }
}
Note that for all of the formats, whatever you extract MUST have the same number of matches for each extractor. E.g., if you're extracting rows, all extractors must match all rows.
Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
MD

event_description do <<-MD
event_description do
<<-MD
Events will have the fields you specified. Your options look like:
#{PP.pp(options[:extract], "")}
Expand All @@ -44,6 +55,7 @@ def default_options
{
:expected_update_period_in_days => "2",
:url => "http://xkcd.com",
:type => "html",
:mode => :on_change,
:extract => {
:url => {:css => "#comic img", :attr => "src"},
Expand All @@ -60,18 +72,22 @@ def check
hydra = Typhoeus::Hydra.new
request = Typhoeus::Request.new(options[:url], :followlocation => true)
request.on_complete do |response|
doc = (options[:type].to_s == "xml" || options[:url] =~ /\.(rss|xml)$/i) ? Nokogiri::XML(response.body) : Nokogiri::HTML(response.body)
doc = parse(response.body)
output = {}
options[:extract].each do |name, extraction_details|
output[name] = doc.css(extraction_details[:css]).map { |node|
if extraction_details[:attr]
node.attr(extraction_details[:attr])
elsif extraction_details[:text]
node.text()
else
raise StandardError, ":attr or :text is required on each of the extraction patterns."
end
}
if extraction_type == "json"
output[name] = Utils.values_at(doc, extraction_details[:path])
else
output[name] = doc.css(extraction_details[:css]).map { |node|
if extraction_details[:attr]
node.attr(extraction_details[:attr])
elsif extraction_details[:text]
node.text()
else
raise StandardError, ":attr or :text is required on HTML or XML extraction patterns"
end
}
end
end

num_unique_lengths = options[:extract].keys.map { |name| output[name].length }.uniq
Expand All @@ -94,5 +110,32 @@ def check
hydra.queue request
hydra.run
end

private

def extraction_type
(options[:type] || begin
if options[:url] =~ /\.(rss|xml)$/i
"xml"
elsif options[:url] =~ /\.json$/i
"json"
else
"html"
end
end).to_s
end

def parse(data)
case extraction_type
when "xml"
Nokogiri::XML(data)
when "json"
JSON.parse(data)
when "html"
Nokogiri::HTML(data)
else
raise "Unknown extraction type #{extraction_type}"
end
end
end
end
2 changes: 1 addition & 1 deletion config/initializers/multi_xml_patch.rb
Expand Up @@ -15,7 +15,7 @@ def initialize(type)
end
end

DISALLOWED_XML_TYPES = %w(symbol yaml)
DISALLOWED_XML_TYPES = %w(symbol yaml) unless defined?(DISALLOWED_XML_TYPES)

class << self
def parse(xml, options={})
Expand Down
10 changes: 10 additions & 0 deletions lib/utils.rb
@@ -1,3 +1,5 @@
require 'jsonpath'

module Utils
# Unindents if the indentation is 2 or more characters.
def self.unindent(s)
Expand All @@ -14,4 +16,12 @@ def self.recursively_symbolize_keys(object)
object
end
end

def self.value_at(data, path)
values_at(data, path).first
end

def self.values_at(data, path)
JsonPath.new(path).on(data.is_a?(String) ? data : data.to_json)
end
end
22 changes: 22 additions & 0 deletions spec/lib/utils_spec.rb
@@ -0,0 +1,22 @@
require 'spec_helper'

describe Utils do
describe "#value_at" do
it "returns the value at a JSON path" do
Utils.value_at({ :foo => { :bar => :baz }}.to_json, "foo.bar").should == "baz"
Utils.value_at({ :foo => { :bar => { :bing => 2 } }}, "foo.bar.bing").should == 2
end

it "returns nil when the path cannot be followed" do
Utils.value_at({ :foo => { :bar => :baz }}, "foo.bing").should be_nil
end
end

describe "#values_at" do
it "returns arrays of matching values" do
Utils.values_at({ :foo => { :bar => :baz }}, "foo.bar").should == %w[baz]
Utils.values_at({ :foo => [ { :bar => :baz }, { :bar => :bing } ]}, "foo[*].bar").should == %w[baz bing]
Utils.values_at({ :foo => [ { :bar => :baz }, { :bar => :bing } ]}, "foo[*].bar").should == %w[baz bing]
end
end
end
8 changes: 4 additions & 4 deletions spec/models/agents/peak_detector_agent_spec.rb
Expand Up @@ -22,25 +22,25 @@
events = build_events(:keys => [:count, :filter],
:values => [[1, "something"], [2, "something"], [3, "else"]])
@agent.receive events
@agent.memory[:data][:something].map(&:first).should == %w[1 2]
@agent.memory[:data][:something].map(&:first).should == [1, 2]
@agent.memory[:data][:something].last.last.should be_within(10).of((100 - 1).hours.ago.to_i)
@agent.memory[:data][:else].first.first.should == "3"
@agent.memory[:data][:else].first.first.should == 3
@agent.memory[:data][:else].first.last.should be_within(10).of((100 - 2).hours.ago.to_i)
end

it "works without a group_by_path as well" do
@agent.options[:group_by_path] = ""
events = build_events(:keys => [:count], :values => [[1], [2]])
@agent.receive events
@agent.memory[:data][:no_group].map(&:first).should == %w[1 2]
@agent.memory[:data][:no_group].map(&:first).should == [1, 2]
end

it "keeps a rolling window of data" do
@agent.options[:window_duration] = 5.hours
@agent.receive build_events(:keys => [:count],
:values => [1, 2, 3, 4, 5, 6, 7, 8].map {|i| [i]},
:pattern => { :filter => "something" })
@agent.memory[:data][:something].map(&:first).should == %w[4 5 6 7 8]
@agent.memory[:data][:something].map(&:first).should == [4, 5, 6, 7, 8]
end

it "finds peaks" do
Expand Down
7 changes: 0 additions & 7 deletions spec/models/agents/trigger_agent_spec.rb
Expand Up @@ -120,13 +120,6 @@
@checker.receive([@event])
}.should_not change { Event.count }


@event.payload = "world"
@checker.options[:rules].first[:path] = "anything"
lambda {
@checker.receive([@event])
}.should change { Event.count }.by(1)

@checker.options[:rules].first[:value] = "hi"
lambda {
@checker.receive([@event])
Expand Down

0 comments on commit b876759

Please sign in to comment.