Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

[8.6] Fix: [MongoDB Connector] Missing aggregation pipeline crashes sync job (#495) #496

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/connectors/mongodb/connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ def create_db_cursor_on_collection(collection)
def create_aggregate_cursor(collection)
aggregate = @advanced_filter_config[:aggregate]

pipeline = aggregate[:pipeline]
pipeline = aggregate[:pipeline] || []
options = extract_options(aggregate)

if !pipeline.nil? && pipeline.empty? && !options.present?
if pipeline.empty? && options.empty?
Utility::Logger.warn('\'Aggregate\' was specified with an empty pipeline and empty options.')
end

Expand Down
114 changes: 49 additions & 65 deletions spec/connectors/mongodb/connector_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -565,93 +565,77 @@
end
end

context 'aggregate field exists (with pipeline and options) in an active advanced filtering config' do
# find should not exist
let(:advanced_snippet) {
{
:aggregate => aggregate
}
}

it 'calls the mongo client aggregate method with pipeline and options' do
expect(actual_collection).to receive(:aggregate).with(pipeline, options)
expect(mongo_collection_cursor).to receive(:each)
context 'when aggregate is present' do
shared_examples_for 'calls aggregate' do |expected_pipeline, expected_options|
it '' do
expect(actual_collection).to receive(:aggregate).with(expected_pipeline, expected_options)
expect(mongo_collection_cursor).to receive(:each)

# aggregate does not expose this functionality
expect(mongo_collection_cursor).to_not receive(:skip)
expect(mongo_collection_cursor).to_not receive(:limit)
# aggregate does not expose this functionality directly (only through pipeline stages)
expect(mongo_collection_cursor).to_not receive(:skip)
expect(mongo_collection_cursor).to_not receive(:limit)

subject.yield_documents
subject.yield_documents
end
end
end

context 'aggregate field exists (with pipeline and options) in an active advanced filtering config' do
let(:aggregate) {
{
:pipeline => pipeline,
# empty options
:options => {}
}
}
# necessary to reuse it as parameter for it_behaves_like and let()
pipeline = [{ '$skip' => 5 }]
options = { 'batch_size' => 10 }

let(:pipeline) { pipeline }

let(:options) { options }

# find should not exist
let(:advanced_snippet) {
{
:aggregate => aggregate
}
}

it 'calls the mongo client aggregate method with pipeline and empty options' do
expect(actual_collection).to receive(:aggregate).with(pipeline, {})
expect(mongo_collection_cursor).to receive(:each)
it_behaves_like 'calls aggregate', pipeline, options

subject.yield_documents
context 'when options are not present' do
context 'when options are empty' do
let(:options) { {} }

it_behaves_like 'calls aggregate', pipeline, {}
end

context 'when options are nil' do
let(:options) { nil }

it_behaves_like 'calls aggregate', pipeline, {}
end
end
end

context 'aggregate field exists (with pipeline and options) in an active advanced filtering config' do
let(:aggregate) {
{
# empty pipeline
:pipeline => [],
:options => options
}
}
context 'when pipeline is not present' do
context 'when pipeline is empty' do
let(:pipeline) { [] }

# find should not exist
let(:advanced_snippet) {
{
:aggregate => aggregate
}
}
it_behaves_like 'calls aggregate', [], options
end

it 'calls the mongo client aggregate method with pipeline and empty options' do
expect(actual_collection).to receive(:aggregate).with([], options)
expect(mongo_collection_cursor).to receive(:each)
context 'when pipeline is nil' do
let(:pipeline) { nil }

subject.yield_documents
# still calls aggregate with an empty pipeline. Nil pipeline would lead to the aggregation to crash
it_behaves_like 'calls aggregate', [], options
end
end
end

context 'aggregate exists, but pipeline and options are empty' do
let(:aggregate) {
{
:pipeline => [],
:options => {}
}
}
context 'when pipeline and options are nil' do
let(:pipeline) { nil }
let(:options) { nil }

let(:advanced_snippet) {
{
:aggregate => aggregate
}
}
it_behaves_like 'calls aggregate', [], {}

it 'logs a warning' do
expect(Utility::Logger).to receive(:warn).with('\'Aggregate\' was specified with an empty pipeline and empty options.')
expect(mongo_collection_cursor).to receive(:each)
it 'logs a warning' do
expect(Utility::Logger).to receive(:warn).with('\'Aggregate\' was specified with an empty pipeline and empty options.')
expect(mongo_collection_cursor).to receive(:each)

subject.yield_documents
subject.yield_documents
end
end
end
end
Expand Down