Skip to content

Commit

Permalink
🎉 improvement: speed up mongodb schema discovery (#2851)
Browse files Browse the repository at this point in the history
Signed-off-by: fut <fut.wrk@gmail.com>
  • Loading branch information
FUT committed Apr 15, 2021
1 parent abb54c9 commit 6bdceef
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 24 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mongodb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ RUN bundle install
ENTRYPOINT ["ruby", "/airbyte/source.rb"]

LABEL io.airbyte.name=airbyte/source-mongodb
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,13 @@ def after_stream_processed
private

def determine_cursor_field_type
explorer = MongodbTypesExplorer.new(collection: @client[stream_name], field: cursor_field) do |type|
MongodbTypesExplorer.run(collection: @client[stream_name], field: cursor_field) do |type|
if DATETIME_TYPES.include?(type)
CURSOR_TYPES[:datetime]
else
CURSOR_TYPES[:integer]
end
end

explorer.field_type
end

def convert_cursor(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def discover(config:)
@config = JSON.parse(File.read(config))

streams = client.collections.map do |collection|
AirbyteLogger.log("Discovering stream #{collection.name}")
MongodbStream.new(collection: collection).discover
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
require_relative './airbyte_protocol.rb'
require_relative './airbyte_logger.rb'

require_relative './mongodb_types_explorer.rb'

class MongodbStream
DISCOVER_LIMIT = 10_000

AIRBYTE_TYPES = {
boolean: 'boolean',
number: 'number',
Expand Down Expand Up @@ -46,11 +49,9 @@ def discover


def discover_property_type(property)
explorer = MongodbTypesExplorer.new(collection: @collection, field: property) do |type|
MongodbTypesExplorer.run(collection: @collection, field: property) do |type|
TYPES_MAPPING[type] || FALLBACK_TYPE
end

explorer.field_type || FALLBACK_TYPE
end || FALLBACK_TYPE
end

def discover_properties
Expand All @@ -62,13 +63,14 @@ def discover_properties
raw: true,
}

view = Mongo::Collection::View.new(@collection)
view = Mongo::Collection::View.new(@collection, {}, limit: DISCOVER_LIMIT)
props = view.map_reduce(map, reduce, opts).map do |obj|
obj['_id']
end

props.each do |prop|
@properties[prop] = { 'type' => discover_property_type(prop) }
AirbyteLogger.log(" #{@collection.name}.#{prop} TYPE IS #{@properties[prop]['type']}")
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,36 @@
class MongodbTypesExplorer
EXPLORE_LIMIT = 1_000

attr_reader :field_type
@@cache = {}

def initialize(collection:, field:, limit: EXPLORE_LIMIT, &type_mapping_block)
@collection = collection
@field = field
@limit = limit
@type_mapping_block = type_mapping_block
def self.run(collection:, field:, limit: EXPLORE_LIMIT, &type_mapping_block)
determine_field_types_for_collection(collection: collection, limit: limit, &type_mapping_block)

@field_type = determine_field_type
@@cache[collection.name][field]
end

private

def determine_field_type
airbyte_types = Set[]
def self.determine_field_types_for_collection(collection:, limit:, &type_mapping_block)
return if @@cache[collection.name]

@collection.find(@field => { "$nin": [nil] }).limit(@limit).each do |item|
mapped_value = @type_mapping_block[item[@field].class]
airbyte_types.add(mapped_value)
airbyte_types = {}

collection.find.limit(limit).each do |item|
item.each_pair do |key, value|
mapped_value = type_mapping_block[value.class]

airbyte_types[key] ||= Set[]
airbyte_types[key].add(mapped_value)
end
end

# Has one specific type
if airbyte_types.count == 1
airbyte_types.first
@@cache[collection.name] = {}
airbyte_types.each_pair do |field, types|
# Has one specific type
if types.count == 1
@@cache[collection.name][field] = types.first
end
end
end
end

0 comments on commit 6bdceef

Please sign in to comment.