Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
compare: 30fd2f7e23
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
View
4 Gemfile
@@ -2,3 +2,7 @@ source "http://rubygems.org"
# Specify your gem's dependencies in the gemspec
gemspec
+
+gem 'elasticsearch'
+gem 'elasticsearch-extensions' # Needed for testing
+gem 'pry'
View
2  README.md
@@ -87,7 +87,7 @@ Pupa.rb offers several ways to significantly improve performance.
In an example case, reducing disk I/O and skipping validation as described below reduced the time to scrape 10,000 documents from 100 cached HTTP responses from 100 seconds down to 5 seconds. Like fast tests, fast scrapers make development smoother.
-The `import` action's performance is currently limited by MongoDB when a dependency graph is used to determine the evaluation order. If a dependency graph cannot be used because you don't know a related object's ID, [several optimizations](https://github.com/opennorth/pupa-ruby/issues/12) can be implemented to improve performance.
+The `import` action's performance is currently limited by the database when a dependency graph is used to determine the evaluation order. If a dependency graph cannot be used because you don't know a related object's ID, [several optimizations](https://github.com/opennorth/pupa-ruby/issues/12) can be implemented to improve performance.
### Reducing HTTP requests
View
3  lib/pupa/processor/connection.rb
@@ -1,5 +1,6 @@
require 'pupa/processor/connection_adapters/mongodb_adapter'
require 'pupa/processor/connection_adapters/postgresql_adapter'
+require 'pupa/processor/connection_adapters/elasticsearch_adapter'
module Pupa
class Processor
@@ -17,6 +18,8 @@ def self.new(database_url)
PostgreSQLAdapter.new(database_url)
when 'mongodb'
MongoDBAdapter.new(database_url)
+ when /^http/
+ ElasticsearchAdapter.new(database_url)
else
raise NotImplementedError
end
View
92 lib/pupa/processor/connection_adapters/elasticsearch_adapter.rb
@@ -0,0 +1,92 @@
+require 'elasticsearch'
+require 'pry'
+
+module Pupa
+ class Processor
+ class Connection
+ # A proxy class to save plain old Ruby objects to Elasticsearch.
+ class ElasticsearchAdapter
+ attr_reader :raw_connection
+
+ # @param [Hash] config the Elasticsearch::Client connection config
+ def initialize(config)
+ @raw_connection = Elasticsearch::Client.new(config)
+ end
+
+ # Finds a document matching the selection criteria.
+ #
+ # The selection criteria *must* set a `_type` key in order to determine
+ # the collection to query.
+ #
+ # @param [Hash] selector the selection criteria
+ # @return [Hash,nil] the matched document, or nil
+ # @raises [Pupa::Errors::TooManyMatches] if multiple documents are found
+ def find(selector)
+ type_name = selector[:_type]
+ collection_name = collection_name_from_class_name(type_name.camelize)
+ body = selector.except(:_type)
+ if body.empty?
+ raise Errors::EmptySelectorError, "selector is empty during find in collection #{collection_name}"
+ end
+
+ result = raw_connection.search index: collection_name, type: type_name, body: { query: { term: body } }, size: 2
+
+ if result['hits']['hits'].any?
+ result['hits']['hits'].first['_source']
+ end
+ # TODO ?
+ # raise Errors::TooManyMatches, "selector matches multiple documents during find in collection #{collection_name}: #{JSON.dump(selector)}"
+ end
+
+ # Inserts or replaces a document in Elasticsearch.
+ #
+ # @param [Object] object an object
+ # @return [Array] whether the object was inserted and the object's database ID
+ # @raises [Pupa::Errors::TooManyMatches] if multiple documents would be updated
+ def save(object)
+ selector = object.fingerprint
+
+ collection_name = collection_name_from_class_name(object.class.to_s)
+ if selector.empty?
+ raise Errors::EmptySelectorError, "selector is empty during save in collection #{collection_name} for #{object._id}"
+ end
+
+ type_name = object._type
+ # Unfortunatly Elasticsearch does not support "update or by query"
+ # See https://github.com/elasticsearch/elasticsearch/issues/1607
+ binding.pry
+ if existing = find(selector.merge(_type: type_name))
+ # binding.pry
+ result = raw_connection.update index: collection_name,
+ type: type_name,
+ id: existing['_id'],
+ body: object.to_h(persist: true),
+ refresh: true
+ return [result['created'], result['_id']]
+ else
+ # binding.pry
+ result = raw_connection.index index: collection_name,
+ type: type_name,
+ body: object.to_h(persist: true),
+ refresh: true
+ return [result['created'], result['_id']]
+ end
+ # TODO ?
+ # run_callbacks(:create)
+ # run_callbacks(:save)
+ # raise Errors::TooManyMatches, "selector matches multiple documents during save in collection #{collection_name} for #{object._id}: #{JSON.dump(selector)}"
+ end
+
+ private
+
+ # Returns the name of the collection in which to save the object.
+ #
+ # @param [String] class_name the name of the object's class
+ # @return [String] the name of the collection in which to save the object
+ def collection_name_from_class_name(class_name)
+ class_name.demodulize.underscore.pluralize.to_sym
+ end
+ end
+ end
+ end
+end
View
2  lib/pupa/runner.rb
@@ -86,7 +86,7 @@ def opts
opts.on('--value_max_bytes BYTES', "The maximum Memcached item size") do |v|
options.value_max_bytes = v
end
- opts.on('-d', '--database_url SCHEME://USERNAME:PASSWORD@HOST:PORT/DATABASE', 'The database URL') do |v|
+ opts.on('-d', '--database_url', 'The database URL (e.g. mongodb://USER:PASSWORD@localhost:27017/pupa or postgres://USER:PASSWORD@localhost:5432/pupa') do |v|
options.database_url = v
end
opts.on('--[no-]validate', 'Validate JSON documents') do |v|
View
2  lib/pupa/version.rb
@@ -1,3 +1,3 @@
module Pupa
- VERSION = "0.1.3"
+ VERSION = "0.1.4"
end
View
77 spec/processor/connection_adapters/elasticsearch_adapter_spec.rb
@@ -0,0 +1,77 @@
+require File.expand_path(File.dirname(__FILE__) + '/../../spec_helper')
+require 'elasticsearch/extensions/test/cluster'
+
+describe Pupa::Processor::Connection::ElasticsearchAdapter do
+ def _type
+ if testing_python_compatibility?
+ 'person'
+ else
+ 'pupa/person'
+ end
+ end
+
+ def connection
+ Pupa::Processor::Connection::ElasticsearchAdapter.new(host: 'localhost:9250')
+ end
+
+ before :all do
+ Elasticsearch::Extensions::Test::Cluster.start nodes: 1
+ connection.save(Pupa::Person.new(_id: 'existing', name: 'existing', email: 'existing@example.com'))
+ 2.times do
+ connection.raw_connection.index index: :people,
+ type: 'pupa/person',
+ body: {
+ name: 'non-unique'
+ }
+ end
+ connection.raw_connection.index index: :people,
+ type: 'pupa/person',
+ body: {
+ name: 'unique'
+ },
+ refresh: true # Refresh index after all
+ end
+
+ after :all do
+ Elasticsearch::Extensions::Test::Cluster.stop
+ end
+
+ describe '.find' do
+ it 'should raise an error if selector is empty' do
+ expect{connection.find(_type: _type)}.to raise_error(Pupa::Errors::EmptySelectorError)
+ end
+
+ it 'should return nil if no matches' do
+ connection.find(_type: _type, name: 'nonexistent').should == nil
+ end
+
+ it 'should return a document if one match' do
+ connection.find(_type: _type, name: 'unique').should be_a(Hash)
+ end
+
+ #
+ # it 'should raise an error if many matches' do
+ # expect{connection.find(_type: 'pupa/person', name: 'non-unique')}.to raise_error(Pupa::Errors::TooManyMatches)
+ # end
+ end
+
+ describe '.save' do
+ it 'should raise an error if selector is empty' do
+ expect{connection.save(Pupa::Person.new)}.to raise_error(Pupa::Errors::EmptySelectorError)
+ end
+
+ it 'should insert a document if no matches' do
+ connection.save(Pupa::Person.new(_id: 'new', name: 'new', email: 'new@example.com')).should == [true, 'new']
+ connection.find(_type: _type, name: 'new')['email'].should == 'new@example.com'
+ end
+
+ it 'should update a document if one match' do
+ connection.save(Pupa::Person.new(_id: 'changed', name: 'existing', email: 'changed@example.com')).should == [false, 'existing']
+ connection.find(_type: _type, name: 'existing')['email'].should == 'changed@example.com'
+ end
+
+ # it 'should raise an error if many matches' do
+ # expect{connection.save(Pupa::Person.new(name: 'non-unique'))}.to raise_error(Pupa::Errors::TooManyMatches)
+ # end
+ end
+end

No commit comments for this range

Something went wrong with that request. Please try again.