Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: IMAP sync email update uniqueness across groups and minor improvements #10332

Merged
merged 12 commits into from Aug 3, 2020
Merged
15 changes: 9 additions & 6 deletions app/models/incoming_email.rb
Expand Up @@ -4,6 +4,7 @@ class IncomingEmail < ActiveRecord::Base
belongs_to :user
belongs_to :topic
belongs_to :post
belongs_to :group, foreign_key: :imap_group_id, class_name: 'Group'

scope :errored, -> { where("NOT is_bounce AND error IS NOT NULL") }

Expand Down Expand Up @@ -52,13 +53,15 @@ class IncomingEmail < ActiveRecord::Base
# imap_uid_validity :integer
# imap_uid :integer
# imap_sync :boolean
# imap_group_id :bigint
#
# Indexes
#
# index_incoming_emails_on_created_at (created_at)
# index_incoming_emails_on_error (error)
# index_incoming_emails_on_imap_sync (imap_sync)
# index_incoming_emails_on_message_id (message_id)
# index_incoming_emails_on_post_id (post_id)
# index_incoming_emails_on_user_id (user_id) WHERE (user_id IS NOT NULL)
# index_incoming_emails_on_created_at (created_at)
# index_incoming_emails_on_error (error)
# index_incoming_emails_on_imap_group_id (imap_group_id)
# index_incoming_emails_on_imap_sync (imap_sync)
# index_incoming_emails_on_message_id (message_id)
# index_incoming_emails_on_post_id (post_id)
# index_incoming_emails_on_user_id (user_id) WHERE (user_id IS NOT NULL)
#
22 changes: 22 additions & 0 deletions db/migrate/20200728072038_add_imap_group_id_to_incoming_email.rb
@@ -0,0 +1,22 @@
# frozen_string_literal: true

class AddImapGroupIdToIncomingEmail < ActiveRecord::Migration[6.0]
disable_ddl_transaction!

def up
execute <<~SQL
ALTER TABLE incoming_emails ADD COLUMN IF NOT EXISTS imap_group_id bigint NULL
SQL

execute <<~SQL
CREATE INDEX CONCURRENTLY IF NOT EXISTS
index_incoming_emails_on_imap_group_id ON incoming_emails USING btree (imap_group_id)
SQL
end

def down
execute <<~SQL
ALTER TABLE incoming_emails DROP COLUMN IF EXISTS imap_group_id
SQL
end
end
16 changes: 11 additions & 5 deletions lib/demon/email_sync.rb
Expand Up @@ -23,6 +23,7 @@ def suppress_stderr
def start_thread(db, group)
Thread.new do
RailsMultisite::ConnectionManagement.with_connection(db) do
puts "[EmailSync] Thread started for group #{group.name} (id = #{group.id}) in db #{db}"
begin
obj = Imap::Sync.for_group(group)
rescue Net::IMAP::NoResponseError => e
Expand All @@ -36,12 +37,15 @@ def start_thread(db, group)
idle = false

while @running && group.reload.imap_mailbox_name.present? do
puts "[EmailSync] Processing IMAP mailbox for group #{group.name} (id = #{group.id}) in db #{db}"
status = obj.process(
idle: obj.can_idle? && status && status[:remaining] == 0,
old_emails_limit: status && status[:remaining] > 0 ? 0 : nil,
)

if !obj.can_idle? && status[:remaining] == 0
puts "[EmailSync] Going to sleep for group #{group.name} (id = #{group.id}) in db #{db} to wait for new emails."

# Thread goes into sleep for a bit so it is better to return any
# connection back to the pool.
ActiveRecord::Base.connection_handler.clear_active_connections!
Expand Down Expand Up @@ -74,14 +78,14 @@ def kill_threads
end

def after_fork
puts "Loading EmailSync in process id #{Process.pid}"
puts "[EmailSync] Loading EmailSync in process id #{Process.pid}"

loop do
break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
sleep HEARTBEAT_INTERVAL
end

puts "Starting EmailSync main thread"
puts "[EmailSync] Starting EmailSync main thread"

@running = true
@sync_data = {}
Expand Down Expand Up @@ -122,7 +126,7 @@ def after_fork
if !groups[group_id]
puts("[EmailSync] Killing thread for group (id = #{group_id}) because mailbox is no longer synced")
else
puts("[EmailSync] Thread for group #{groups[group_id].name} is dead")
puts("[EmailSync] Thread for group #{groups[group_id].name} (id = #{group_id}) is dead")
end

data[:thread].kill
Expand All @@ -135,8 +139,10 @@ def after_fork
# Spawn new threads for groups that are now synchronized.
groups.each do |group_id, group|
if !@sync_data[db][group_id]
puts("[EmailSync] Starting thread for group #{group.name} and mailbox #{group.imap_mailbox_name}")
@sync_data[db][group_id] = { thread: start_thread(db, group), obj: nil }
puts("[EmailSync] Starting thread for group #{group.name} (id = #{group.id}) and mailbox #{group.imap_mailbox_name}")
@sync_data[db][group_id] = {
thread: start_thread(db, group), obj: nil
}
end
end
end
Expand Down
63 changes: 50 additions & 13 deletions lib/email/receiver.rb
Expand Up @@ -66,11 +66,13 @@ def process!
id_hash = Digest::SHA1.hexdigest(@message_id)
DistributedMutex.synchronize("process_email_#{id_hash}") do
begin
@incoming_email = IncomingEmail.find_by(message_id: @message_id)
if @incoming_email
@incoming_email.update(imap_uid_validity: @opts[:uid_validity], imap_uid: @opts[:uid], imap_sync: false)
return
end

# if we find an existing incoming email record with the
# exact same message id, be sure to update it with the correct IMAP
# metadata based on sync. this is so we do not double-create emails.
@incoming_email = find_existing_and_update_imap
return if @incoming_email

ensure_valid_address_lists
ensure_valid_date
@from_email, @from_display_name = parse_from_field
Expand All @@ -89,6 +91,32 @@ def process!
end
end

def find_existing_and_update_imap
incoming_email = IncomingEmail.find_by(message_id: @message_id)

# if we are not doing this for IMAP purposes, then we do not want
# to double-process the same Message-ID
if @opts[:imap_uid].blank?
return incoming_email
end

return if !incoming_email

# if the message_id matches the post id regexp then we
# generated the message_id not the imap server, e.g. in GroupSmtpEmail,
# so we want to just update the incoming email. Otherwise the
# incoming email is a completely new one from the IMAP server.
return if (@message_id =~ message_id_post_id_regexp).nil?

incoming_email.update(
imap_uid_validity: @opts[:imap_uid_validity],
imap_uid: @opts[:imap_uid],
imap_group_id: @opts[:imap_group_id],
imap_sync: false
)
incoming_email
end

def ensure_valid_address_lists
[:to, :cc, :bcc].each do |field|
addresses = @mail[field]
Expand Down Expand Up @@ -118,8 +146,9 @@ def create_incoming_email
from_address: @from_email,
to_addresses: @mail.to&.map(&:downcase)&.join(";"),
cc_addresses: @mail.cc&.map(&:downcase)&.join(";"),
imap_uid_validity: @opts[:uid_validity],
imap_uid: @opts[:uid],
imap_uid_validity: @opts[:imap_uid_validity],
imap_uid: @opts[:imap_uid],
imap_group_id: @opts[:imap_group_id],
imap_sync: false
)
end
Expand Down Expand Up @@ -913,12 +942,8 @@ def find_related_post(force: false)
message_ids = Email::Receiver.extract_reply_message_ids(@mail, max_message_id_count: 5)
return if message_ids.empty?

host = Email::Sender.host_for(Discourse.base_url)
post_id_regexp = Regexp.new "topic/\\d+/(\\d+)@#{Regexp.escape(host)}"
topic_id_regexp = Regexp.new "topic/(\\d+)@#{Regexp.escape(host)}"

post_ids = message_ids.map { |message_id| message_id[post_id_regexp, 1] }.compact.map(&:to_i)
post_ids << Post.where(topic_id: message_ids.map { |message_id| message_id[topic_id_regexp, 1] }.compact, post_number: 1).pluck(:id)
post_ids = message_ids.map { |message_id| message_id[message_id_post_id_regexp, 1] }.compact.map(&:to_i)
post_ids << Post.where(topic_id: message_ids.map { |message_id| message_id[message_id_topic_id_regexp, 1] }.compact, post_number: 1).pluck(:id)
post_ids << EmailLog.where(message_id: message_ids).pluck(:post_id)
post_ids << IncomingEmail.where(message_id: message_ids).pluck(:post_id)

Expand All @@ -931,6 +956,18 @@ def find_related_post(force: false)
Post.where(id: post_ids).order(:created_at).last
end

def host
@host ||= Email::Sender.host_for(Discourse.base_url)
end

def message_id_post_id_regexp
@message_id_post_id_regexp ||= Regexp.new "topic/\\d+/(\\d+)@#{Regexp.escape(host)}"
end

def message_id_topic_id_regexp
@message_id_topic_id_regexp ||= Regexp.new "topic/(\\d+)@#{Regexp.escape(host)}"
end

def self.extract_reply_message_ids(mail, max_message_id_count:)
message_ids = [mail.in_reply_to, Email::Receiver.extract_references(mail.references)]
message_ids.flatten!
Expand Down
25 changes: 22 additions & 3 deletions lib/imap/providers/generic.rb
Expand Up @@ -4,6 +4,9 @@

module Imap
module Providers

class WriteDisabledError < StandardError; end

class Generic

def initialize(server, options = {})
Expand Down Expand Up @@ -65,19 +68,31 @@ def labels

def open_mailbox(mailbox_name, write: false)
if write
raise 'two-way IMAP sync is disabled' if !SiteSetting.enable_imap_write
if !SiteSetting.enable_imap_write
raise WriteDisabledError.new("Two-way IMAP sync is disabled! Cannot write to inbox.")
end
imap.select(mailbox_name)
else
imap.examine(mailbox_name)
end

@open_mailbox_name = mailbox_name
@open_mailbox_write = write

{
uid_validity: imap.responses['UIDVALIDITY'][-1]
}
end

def emails(uids, fields, opts = {})
imap.uid_fetch(uids, fields).map do |email|
fetched = imap.uid_fetch(uids, fields)

# This will happen if the email does not exist in the provided mailbox.
# It may have been deleted or otherwise moved, e.g. if deleted in Gmail
# it will end up in "[Gmail]/Bin"
return [] if fetched.nil?

fetched.map do |email|
attributes = {}

fields.each do |field|
Expand Down Expand Up @@ -105,12 +120,16 @@ def tag_to_flag(tag)
end

def tag_to_label(tag)
labels[tag]
tag
end

def list_mailboxes
imap.list('', '*').map(&:name)
end

def archive(uid)
# do nothing by default, just removing the Inbox label should be enough
end
end
end
end
36 changes: 34 additions & 2 deletions lib/imap/providers/gmail.rb
Expand Up @@ -4,13 +4,18 @@ module Imap
module Providers
class Gmail < Generic
X_GM_LABELS = 'X-GM-LABELS'
X_GM_THRID = 'X-GM-THRID'

def imap
@imap ||= super.tap { |imap| apply_gmail_patch(imap) }
end

def emails(uids, fields, opts = {})
fields[fields.index('LABELS')] = X_GM_LABELS

# gmail has a special header for labels
if fields.include?('LABELS')
fields[fields.index('LABELS')] = X_GM_LABELS
end

emails = super(uids, fields, opts)

Expand All @@ -22,7 +27,7 @@ def emails(uids, fields, opts = {})
email['LABELS'].flatten!
end

email['LABELS'] << '\\Inbox' if opts[:mailbox] == 'INBOX'
email['LABELS'] << '\\Inbox' if @open_mailbox_name == 'INBOX'

email['LABELS'].uniq!
end
Expand Down Expand Up @@ -57,6 +62,33 @@ def tag_to_label(tag)
super(tag)
end

def archive(uid)
# all emails in the thread must be archived in Gmail for the thread
# to get removed from the inbox
thread_id = thread_id_from_uid(uid)
emails_to_archive = emails_in_thread(thread_id)
emails_to_archive.each do |email|
labels = email['LABELS']
new_labels = labels.reject { |l| l == "\\Inbox" }
store(email["UID"], "LABELS", labels, new_labels)
end
Imap::Sync::Logger.log("[IMAP] Thread ID #{thread_id} (UID #{uid}) archived in Gmail mailbox for #{@username}")
end

def thread_id_from_uid(uid)
fetched = imap.uid_fetch(uid, [X_GM_THRID])
if !fetched
raise "Thread not found for UID #{uid}!"
end

fetched.last.attr[X_GM_THRID]
end

def emails_in_thread(thread_id)
uids_to_fetch = imap.uid_search("#{X_GM_THRID} #{thread_id}")
emails(uids_to_fetch, ["UID", "LABELS"])
end

private

def apply_gmail_patch(imap)
Expand Down