-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CCOL-2039: Post process valid and invalid records from batch consumption #207
Conversation
slice(messages). | ||
each(&method(:update_database)) | ||
valid_records = [] | ||
invalid_records = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename one of these variables? They represent two different things (BatchRecord and ActiveRecord).
upserted.each_slice(max_db_batch_size) do |group| | ||
valid, invalid = upsert_records(group) | ||
valid_upserts.push(*valid) | ||
invalid_upserts.push(*invalid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simpler to do invalid_upserts.concat(invalid)
instead of .push(*invalid)
. Same above/below.
invalid_upserts.push(*invalid) | ||
end | ||
valid_upserts.compact! | ||
invalid_upserts.compact! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason you're compacting inside each iteration instead of after all iterations are done?
# @param block [Proc] | ||
# @return [Array<BatchRecord>] | ||
def partition!(method=nil, &block) | ||
valid, invalid = if method.nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never be nil
since it's defined on the base class, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a case where we had to override filter_records
like so
ex:
def filter_records(records)
some_filter_query = filter_query(records)
records.partition! do |r|
some_filter_query.exclude?(r.record.id) && valid_batch_record?(r.record,
r.associations)
end
end
Could this be done in a proc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure... I think I'd need to see this. It feels like you're diving too deep into Deimos guts to get this working - it feels wrong for calling code to have to call so many internal methods.
end | ||
end | ||
|
||
context 'with post processing' do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused - what's the difference between describe 'post processing'
and context 'with post processing'
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be with compacted messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't fully understand the use case here. We seem to be adding a lot of functionality for a very laser-specific use case.
5ebe94c
to
8813fa4
Compare
@@ -41,9 +42,17 @@ def consume_batch(payloads, metadata) | |||
uncompacted_update(messages) | |||
end | |||
end | |||
process_valid_records(@valid_active_records) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this also can't be a notification?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be a notification
# @return [Boolean] | ||
def should_consume?(_record) | ||
def should_consume?(_batch_record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this need to change? Did it actually change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It did change types from ActiveRecord::Base
to BatchRecord
. This is in order to validate associations
that are not yet saved
lib/deimos/instrumentation.rb
Outdated
|
||
ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| | ||
payload = ActiveSupport::Notifications::Event.new(*args).payload | ||
payload[:consumer].process_invalid_records(payload[:records]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling code should be able to do this themselves rather than us having a method that we call explicitly.
@@ -167,7 +178,13 @@ def upsert_records(messages) | |||
col_proc: col_proc, | |||
replace_associations: self.class.replace_associations, | |||
bulk_import_id_generator: self.class.bulk_import_id_generator) | |||
updater.mass_update(record_list) | |||
@valid_active_records.concat(updater.mass_update(record_list)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get this working without the instance variable? I.e. return the updated records from mass_update
to the calling code and have the concatenation happen there?
else | ||
uncompacted_update(messages) | ||
end | ||
@tags = %W(topic:#{metadata[:topic]}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's weird to pass metric tags so far down into the workflow. Can we do something like
Deimos.config.tracer.active_span.set_tag("topic", metadata[:topic])
...
Deimos.config.tracer.active_span.get_tag("topic")
We'd have to add a get_tag
method to the provider
class but it feels a lot better because we can separate out the tracing/metric logic from the actual business logic.
@@ -155,19 +151,35 @@ def update_database(messages) | |||
# @return [void] | |||
def upsert_records(messages) | |||
record_list = build_records(messages) | |||
record_list.filter!(self.method(:should_consume?).to_proc) | |||
invalid = filter_records(record_list) | |||
unless invalid.blank? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use positive conditions: if invalid.any?
|
||
Deimos.instrument('ar_consumer.consume_batch', @tags) do | ||
Deimos.instrument('ar_consumer.consume_batch', | ||
Deimos.config.tracer.active_span.get_tag('topic')) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol you don't need to do get_tag
here, you already have the topic.
@@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer | |||
- Feature: Add global `replace_assocations` value for for all consumers | |||
- Feature: Add individual `replace_assocations` value for for individual consumers | |||
- Feature: `should_consume?` method accepts BatchRecord associations | |||
- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion | |||
- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be added to the README.
# @param tag [String] | ||
def get_tag(tag) | ||
raise NotImplementedError | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to implement this for datadog.
lib/deimos/consumer.rb
Outdated
# in the event the schema class is an override, the inherited | ||
# schema and namespace will be applied | ||
schema_class = "Schemas::#{config[:schema]}".constantize.new | ||
Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be moved into the schema_backend
method rather than just used here?
lib/deimos/tracing/datadog.rb
Outdated
@@ -11,15 +11,12 @@ def initialize(config) | |||
raise 'Tracing config must specify service_name' if config[:service_name].nil? | |||
|
|||
@service = config[:service_name] | |||
@tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels more like a method than an instance variable. It isn't actually state that needs to be stored.
spec/consumer_spec.rb
Outdated
} | ||
end | ||
end | ||
stub_const('Schemas::MySchema', schema_class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we have to add this in all four specs? If we're just trying to test overriding, can we have a dedicated spec for that? Also, why does the overriding class have to reimplement every method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schemas::MySchema
wasn't defined anywhere. So in tests where SCHEMAS_CLASS_SETTING = on
test were failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be in spec/schemas/my_namespace
. If it's not showing up, maybe the settings are wrong for the tests? Or do we need to add it to spec/schemas
and require it similarly to the other generated schema files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much better! One tiny comment left :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filename is backwards :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof. Updated lol
Time to hit the big green button! Thanks @lionelpereira ! |
Pull Request Template
Description
should_consume?
method accepts validation of associationsFixes # (issue)
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration
Checklist: