Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: IMAP sync email update uniqueness across groups and minor improvements #10332

Merged
merged 12 commits into from Aug 3, 2020
Merged
14 changes: 8 additions & 6 deletions app/models/incoming_email.rb
Expand Up @@ -52,13 +52,15 @@ class IncomingEmail < ActiveRecord::Base
# imap_uid_validity :integer
# imap_uid :integer
# imap_sync :boolean
# imap_group_id :bigint
#
# Indexes
#
# index_incoming_emails_on_created_at (created_at)
# index_incoming_emails_on_error (error)
# index_incoming_emails_on_imap_sync (imap_sync)
# index_incoming_emails_on_message_id (message_id)
# index_incoming_emails_on_post_id (post_id)
# index_incoming_emails_on_user_id (user_id) WHERE (user_id IS NOT NULL)
# index_incoming_emails_on_created_at (created_at)
# index_incoming_emails_on_error (error)
# index_incoming_emails_on_imap_group_id (imap_group_id)
# index_incoming_emails_on_imap_sync (imap_sync)
# index_incoming_emails_on_message_id (message_id)
# index_incoming_emails_on_post_id (post_id)
# index_incoming_emails_on_user_id (user_id) WHERE (user_id IS NOT NULL)
#
22 changes: 22 additions & 0 deletions db/migrate/20200728072038_add_imap_group_id_to_incoming_email.rb
@@ -0,0 +1,22 @@
# frozen_string_literal: true

class AddImapGroupIdToIncomingEmail < ActiveRecord::Migration[6.0]
disable_ddl_transaction!

def up
execute <<~SQL
ALTER TABLE incoming_emails ADD COLUMN IF NOT EXISTS imap_group_id bigint NULL
SQL

execute <<~SQL
CREATE INDEX CONCURRENTLY IF NOT EXISTS
index_incoming_emails_on_imap_group_id ON incoming_emails USING btree (imap_group_id)
SQL
end

def down
execute <<~SQL
ALTER TABLE incoming_emails DROP COLUMN IF EXISTS imap_group_id
SQL
end
end
14 changes: 9 additions & 5 deletions lib/demon/email_sync.rb
Expand Up @@ -23,6 +23,7 @@ def suppress_stderr
def start_thread(db, group)
Thread.new do
RailsMultisite::ConnectionManagement.with_connection(db) do
puts "[EmailSync] Thread started for group #{group.name} (id = #{group.id}) in db #{db}"
begin
obj = Imap::Sync.for_group(group)
rescue Net::IMAP::NoResponseError => e
Expand All @@ -36,6 +37,7 @@ def start_thread(db, group)
idle = false

while @running && group.reload.imap_mailbox_name.present? do
puts "[EmailSync] Processing IMAP mailbox for group #{group.name} (id = #{group.id}) in db #{db}"
status = obj.process(
idle: obj.can_idle? && status && status[:remaining] == 0,
old_emails_limit: status && status[:remaining] > 0 ? 0 : nil,
Expand Down Expand Up @@ -74,14 +76,14 @@ def kill_threads
end

def after_fork
puts "Loading EmailSync in process id #{Process.pid}"
puts "[EmailSync] Loading EmailSync in process id #{Process.pid}"

loop do
break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
sleep HEARTBEAT_INTERVAL
end

puts "Starting EmailSync main thread"
puts "[EmailSync] Starting EmailSync main thread"

@running = true
@sync_data = {}
Expand Down Expand Up @@ -122,7 +124,7 @@ def after_fork
if !groups[group_id]
puts("[EmailSync] Killing thread for group (id = #{group_id}) because mailbox is no longer synced")
else
puts("[EmailSync] Thread for group #{groups[group_id].name} is dead")
puts("[EmailSync] Thread for group #{groups[group_id].name} (id = #{group_id}) is dead")
end

data[:thread].kill
Expand All @@ -135,8 +137,10 @@ def after_fork
# Spawn new threads for groups that are now synchronized.
groups.each do |group_id, group|
if !@sync_data[db][group_id]
puts("[EmailSync] Starting thread for group #{group.name} and mailbox #{group.imap_mailbox_name}")
@sync_data[db][group_id] = { thread: start_thread(db, group), obj: nil }
puts("[EmailSync] Starting thread for group #{group.name} (id = #{group.id}) and mailbox #{group.imap_mailbox_name}")
@sync_data[db][group_id] = {
thread: start_thread(db, group), obj: nil
}
end
end
end
Expand Down
30 changes: 23 additions & 7 deletions lib/email/receiver.rb
Expand Up @@ -66,11 +66,13 @@ def process!
id_hash = Digest::SHA1.hexdigest(@message_id)
DistributedMutex.synchronize("process_email_#{id_hash}") do
begin
@incoming_email = IncomingEmail.find_by(message_id: @message_id)
if @incoming_email
@incoming_email.update(imap_uid_validity: @opts[:uid_validity], imap_uid: @opts[:uid], imap_sync: false)
return
end

# if we find an existing incoming email record with the
# exact same message id, be sure to update it with the correct IMAP
# metadata based on sync. this is so we do not double-create emails.
@incoming_email = find_existing_and_update_imap
return if @incoming_email

ensure_valid_address_lists
ensure_valid_date
@from_email, @from_display_name = parse_from_field
Expand All @@ -89,6 +91,19 @@ def process!
end
end

def find_existing_and_update_imap
@incoming_email = IncomingEmail.find_by(message_id: @message_id)
return if !@incoming_email

@incoming_email.update(
imap_uid_validity: @opts[:imap_uid_validity],
imap_uid: @opts[:imap_uid],
imap_group_id: @opts[:imap_group_id],
imap_sync: false
)
@incoming_email
end

def ensure_valid_address_lists
[:to, :cc, :bcc].each do |field|
addresses = @mail[field]
Expand Down Expand Up @@ -118,8 +133,9 @@ def create_incoming_email
from_address: @from_email,
to_addresses: @mail.to&.map(&:downcase)&.join(";"),
cc_addresses: @mail.cc&.map(&:downcase)&.join(";"),
imap_uid_validity: @opts[:uid_validity],
imap_uid: @opts[:uid],
imap_uid_validity: @opts[:imap_uid_validity],
imap_uid: @opts[:imap_uid],
imap_group_id: @opts[:imap_group_id],
imap_sync: false
)
end
Expand Down
18 changes: 15 additions & 3 deletions lib/imap/providers/generic.rb
Expand Up @@ -4,6 +4,9 @@

module Imap
module Providers

class WriteDisabledError < StandardError; end

class Generic

def initialize(server, options = {})
Expand Down Expand Up @@ -65,7 +68,9 @@ def labels

def open_mailbox(mailbox_name, write: false)
if write
raise 'two-way IMAP sync is disabled' if !SiteSetting.enable_imap_write
if !SiteSetting.enable_imap_write
raise WriteDisabledError.new("Two-way IMAP sync is disabled! Cannot write to inbox.")
end
imap.select(mailbox_name)
else
imap.examine(mailbox_name)
Expand All @@ -77,7 +82,14 @@ def open_mailbox(mailbox_name, write: false)
end

def emails(uids, fields, opts = {})
imap.uid_fetch(uids, fields).map do |email|
fetched = imap.uid_fetch(uids, fields)

# This will happen if the email does not exist in the provided mailbox.
# It may have been deleted or otherwise moved, e.g. if deleted in Gmail
# it will end up in "[Gmail]/Bin"
return [] if fetched.nil?

fetched.map do |email|
attributes = {}

fields.each do |field|
Expand Down Expand Up @@ -105,7 +117,7 @@ def tag_to_flag(tag)
end

def tag_to_label(tag)
labels[tag]
tag
end

def list_mailboxes
Expand Down
26 changes: 16 additions & 10 deletions lib/imap/sync.rb
Expand Up @@ -59,12 +59,12 @@ def process(idle: false, import_limit: nil, old_emails_limit: nil, new_emails_li
# If UID validity changes, the whole mailbox must be synchronized (all
# emails are considered new and will be associated to existent topics
# in Email::Reciever by matching Message-Ids).
Rails.logger.warn("[IMAP] UIDVALIDITY = #{@status[:uid_validity]} does not match expected #{@group.imap_uid_validity}, invalidating IMAP cache and resyncing emails for group #{@group.name} and mailbox #{@group.imap_mailbox_name}")
Rails.logger.warn("[IMAP] (#{@group.name}) UIDVALIDITY = #{@status[:uid_validity]} does not match expected #{@group.imap_uid_validity}, invalidating IMAP cache and resyncing emails for group #{@group.name} and mailbox #{@group.imap_mailbox_name}")
@group.imap_last_uid = 0
end

if idle && !can_idle?
Rails.logger.warn("[IMAP] IMAP server for group #{@group.name} cannot IDLE")
Rails.logger.warn("[IMAP] (#{@group.name}) IMAP server for group cannot IDLE")
idle = false
end

Expand Down Expand Up @@ -95,7 +95,7 @@ def process(idle: false, import_limit: nil, old_emails_limit: nil, new_emails_li
# Sometimes, new_uids contains elements from old_uids.
new_uids = new_uids - old_uids

Rails.logger.debug("[IMAP] Remote email server has #{old_uids.size} old emails and #{new_uids.size} new emails")
Rails.logger.debug("[IMAP] (#{@group.name}) Remote email server has #{old_uids.size} old emails and #{new_uids.size} new emails")

all_old_uids_size = old_uids.size
all_new_uids_size = new_uids.size
Expand All @@ -111,7 +111,7 @@ def process(idle: false, import_limit: nil, old_emails_limit: nil, new_emails_li
new_uids = new_uids[0..new_emails_limit - 1] if new_emails_limit > 0

if old_uids.present?
Rails.logger.debug("[IMAP] Syncing #{old_uids.size} randomly-selected old emails")
Rails.logger.debug("[IMAP] (#{@group.name}) Syncing #{old_uids.size} randomly-selected old emails")
emails = @provider.emails(old_uids, ['UID', 'FLAGS', 'LABELS'], mailbox: @group.imap_mailbox_name)
emails.each do |email|
incoming_email = IncomingEmail.find_by(
Expand All @@ -122,13 +122,13 @@ def process(idle: false, import_limit: nil, old_emails_limit: nil, new_emails_li
if incoming_email.present?
update_topic(email, incoming_email, mailbox_name: @group.imap_mailbox_name)
else
Rails.logger.warn("[IMAP] Could not find old email (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']})")
Rails.logger.warn("[IMAP] (#{@group.name}) Could not find old email (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']})")
end
end
end

if new_uids.present?
Rails.logger.debug("[IMAP] Syncing #{new_uids.size} new emails (oldest first)")
Rails.logger.debug("[IMAP] (#{@group.name}) Syncing #{new_uids.size} new emails (oldest first)")

emails = @provider.emails(new_uids, ['UID', 'FLAGS', 'LABELS', 'RFC822'], mailbox: @group.imap_mailbox_name)
processed = 0
Expand All @@ -142,13 +142,14 @@ def process(idle: false, import_limit: nil, old_emails_limit: nil, new_emails_li
allow_auto_generated: true,
import_mode: import_mode,
destinations: [@group],
uid_validity: @status[:uid_validity],
uid: email['UID']
imap_uid_validity: @status[:uid_validity],
imap_uid: email['UID'],
imap_group_id: @group.id
)
receiver.process!
update_topic(email, receiver.incoming_email, mailbox_name: @group.imap_mailbox_name)
rescue Email::Receiver::ProcessingError => e
Rails.logger.warn("[IMAP] Could not process (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']}): #{e.message}")
Rails.logger.warn("[IMAP] (#{@group.name}) Could not process (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']}): #{e.message}")
end

processed += 1
Expand All @@ -167,7 +168,7 @@ def process(idle: false, import_limit: nil, old_emails_limit: nil, new_emails_li
if to_sync.size > 0
@provider.open_mailbox(@group.imap_mailbox_name, write: true)
to_sync.each do |incoming_email|
Rails.logger.debug("[IMAP] Updating email for #{@group.name} and incoming email ID = #{incoming_email.id}")
Rails.logger.debug("[IMAP] (#{@group.name}) Updating email and incoming email ID = #{incoming_email.id}")
update_email(@group.imap_mailbox_name, incoming_email)
end
end
Expand Down Expand Up @@ -234,6 +235,11 @@ def update_topic_tags(email, incoming_email, opts = {})
def update_email(mailbox_name, incoming_email)
return if !SiteSetting.tagging_enabled || !SiteSetting.allow_staff_to_tag_pms
return if incoming_email&.post&.post_number != 1 || !incoming_email.imap_sync

# if email is nil, the UID does not exist in the provider, meaning....
#
# A) the email has been deleted/moved to a different mailbox in the provider
# B) the UID does not belong to the provider
return unless email = @provider.emails(incoming_email.imap_uid, ['FLAGS', 'LABELS'], mailbox: mailbox_name).first
incoming_email.update(imap_sync: false)

Expand Down
10 changes: 6 additions & 4 deletions spec/components/imap/sync_spec.rb
Expand Up @@ -84,6 +84,7 @@
expect(incoming_email.imap_uid_validity).to eq(1)
expect(incoming_email.imap_uid).to eq(100)
expect(incoming_email.imap_sync).to eq(false)
expect(incoming_email.imap_group_id).to eq(group.id)
end

it 'does not duplicate topics' do
Expand Down Expand Up @@ -111,6 +112,7 @@
expect(incoming_email.imap_uid_validity).to eq(1)
expect(incoming_email.imap_uid).to eq(100)
expect(incoming_email.imap_sync).to eq(false)
expect(incoming_email.imap_group_id).to eq(group.id)
end
end

Expand Down Expand Up @@ -285,8 +287,8 @@
.and change { Post.where(post_type: Post.types[:regular]).count }.by(2)
.and change { IncomingEmail.count }.by(2)

imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid)
expect(imap_data).to contain_exactly([1, 100], [1, 200])
imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid, :imap_group_id)
expect(imap_data).to contain_exactly([1, 100, group.id], [1, 200, group.id])

provider.stubs(:open_mailbox).returns(uid_validity: 2)
provider.stubs(:uids).with.returns([111, 222])
Expand Down Expand Up @@ -326,8 +328,8 @@
.and change { Post.where(post_type: Post.types[:regular]).count }.by(0)
.and change { IncomingEmail.count }.by(0)

imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid)
expect(imap_data).to contain_exactly([2, 111], [2, 222])
imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid, :imap_group_id)
expect(imap_data).to contain_exactly([2, 111, group.id], [2, 222, group.id])
end
end
end