diff --git a/app/lib/application_extension.rb b/app/lib/application_extension.rb index 9245e00c14763..400c51a023d09 100644 --- a/app/lib/application_extension.rb +++ b/app/lib/application_extension.rb @@ -25,8 +25,13 @@ def confirmation_redirect_uri def push_to_streaming_api # TODO: #28793 Combine into a single topic - access_tokens.in_batches.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + access_tokens.in_batches do |tokens| + redis.pipelined do |pipeline| + tokens.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end + end end end end diff --git a/app/models/user.rb b/app/models/user.rb index 39f7b3b0cf92b..388be31fab797 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -346,8 +346,11 @@ def revoke_access! # Revoke each access token for the Streaming API, since `update_all`` # doesn't trigger ActiveRecord Callbacks: # TODO: #28793 Combine into a single topic - batch.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + redis.pipelined do |pipeline| + batch.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end end end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index 051e920454b84..3cc804af43168 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -438,8 +438,10 @@ let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) } let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) } + let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } + before do - allow(redis).to receive_messages(publish: nil) + allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) user.reset_password! end @@ -457,7 +459,7 @@ end it 'revokes streaming access for all access tokens' do - expect(redis).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once end it 'removes push subscriptions' do