Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ kotlin {

compilerOptions {
optIn.add("kotlin.uuid.ExperimentalUuidApi")
freeCompilerArgs.add("-Xopt-in=kotlin.time.ExperimentalTime")
optIn.add("kotlin.time.ExperimentalTime")
freeCompilerArgs.add("-Xnested-type-aliases")
freeCompilerArgs.add("-Xcontext-parameters")
}
Expand Down
45 changes: 38 additions & 7 deletions src/main/kotlin/mail/daemon/DaemonManagerPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.babies.overmail.data.model.ImapConfig
import dev.babies.overmail.data.model.ImapConfigs
import io.ktor.server.application.*
import io.ktor.util.*
import io.ktor.util.logging.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
Expand All @@ -17,6 +18,7 @@ typealias ImapConfigId = Int
object DaemonManagerPlugin : BaseApplicationPlugin<Application, Unit, Unit> {
override val key: AttributeKey<Unit> = io.ktor.util.AttributeKey("DaemonManagerPlugin")

private val logger = KtorSimpleLogger("DaemonManagerPlugin")
private val imapDaemons = mutableMapOf<ImapConfigId, ImapDaemon>()
private lateinit var coroutineScope: CoroutineScope

Expand All @@ -29,30 +31,59 @@ object DaemonManagerPlugin : BaseApplicationPlugin<Application, Unit, Unit> {

pipeline.monitor.subscribe(ApplicationStarted) {
coroutineScope.launch {
val imapConfigs = Database.query { ImapConfigs.select(ImapConfigs.id).map { it[ImapConfigs.id].value } }
imapConfigs.forEach { imapConfigId ->
coroutineScope.launch {
initImapConfig(imapConfigId)
try {
val imapConfigs = Database.query { ImapConfigs.select(ImapConfigs.id).map { it[ImapConfigs.id].value } }
logger.info("Starting ${imapConfigs.size} IMAP daemon(s)")
imapConfigs.forEach { imapConfigId ->
coroutineScope.launch {
try {
initImapConfig(imapConfigId)
} catch (e: Exception) {
logger.error("Failed to initialize IMAP config $imapConfigId: ${e.message}")
}
}
}
} catch (e: Exception) {
logger.error("Failed to load IMAP configs: ${e.message}")
}
}
}

pipeline.monitor.subscribe(ApplicationStopped) {
logger.info("Stopping all IMAP daemons")
imapDaemons.values.forEach { daemon ->
try {
daemon.stop()
} catch (e: Exception) {
logger.error("Error stopping daemon: ${e.message}")
}
}
imapDaemons.clear()
}
}

private val imapDaemonChangeMutex = Mutex()
suspend fun initImapConfig(imapConfigId: Int) {
imapDaemonChangeMutex.withLock {
if (imapConfigId in imapDaemons) {
logger.info("Stopping existing daemon for IMAP config $imapConfigId")
val daemon = imapDaemons[imapConfigId]!!
daemon.stop()
try {
daemon.stop()
} catch (e: Exception) {
logger.error("Error stopping daemon: ${e.message}")
}
imapDaemons.remove(imapConfigId)
}

val imapConfig = Database.query { ImapConfig[imapConfigId] }
imapDaemons[imapConfigId] = ImapDaemon(imapConfig, coroutineScope)
try {
val imapConfig = Database.query { ImapConfig[imapConfigId] }
logger.info("Starting daemon for IMAP config $imapConfigId (${imapConfig.email})")
imapDaemons[imapConfigId] = ImapDaemon(imapConfig, coroutineScope)
} catch (e: Exception) {
logger.error("Failed to initialize IMAP config $imapConfigId: ${e.message}")
throw e
}
}
}
}
215 changes: 187 additions & 28 deletions src/main/kotlin/mail/daemon/FolderDaemon.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import dev.babies.overmail.data.Database
import dev.babies.overmail.data.model.ImapConfig
import dev.babies.overmail.data.model.ImapFolder
import dev.babies.overmail.data.model.ImapFolders
import io.ktor.util.logging.*
import jakarta.mail.MessagingException
import jakarta.mail.Store
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
Expand All @@ -24,37 +26,102 @@ class FolderDaemon(
private val imapDaemon: ImapDaemon
) {

private val logger = KtorSimpleLogger("FolderDaemon@${imapConfig.host}")

var defaultFolder: JakartaFolder? = null
private val store: Store = imapDaemon.session.getStore("imap")
private var store: Store? = null

private val idleDaemons = mutableMapOf<ImapConfigId, IdleDaemon>()
val storeInstances = mutableMapOf<FolderId, StoreInstance>()
private val importInstances = mutableMapOf<FolderId, StoreInstance>()
val mailsDaemons = mutableMapOf<FolderId, MailsDaemon>()

val folderImporterMutex = Mutex()

@Volatile
private var isRunning = true
private var connectionJob: Job? = null

init {
coroutineScope.launch {
store.connect(imapConfig.username, imapConfig.password)
defaultFolder = store.defaultFolder
connectionJob = coroutineScope.launch {
connectWithRetry()
}
}

private suspend fun connectWithRetry() {
var reconnectAttempt = 0
val maxReconnectDelay = 300 // 5 minutes max

while (isRunning && coroutineScope.isActive) {
try {
connectAndLoadFolders()
reconnectAttempt = 0 // Reset on successful connection
} catch (e: CancellationException) {
logger.info("Connection job cancelled")
throw e
} catch (e: Exception) {
if (!isRunning || !coroutineScope.isActive) break

val delaySeconds = minOf(5 * (1 shl reconnectAttempt), maxReconnectDelay)
logger.error("Failed to connect to IMAP store (attempt ${reconnectAttempt + 1}), retrying in ${delaySeconds}s: ${e.message}")
reconnectAttempt++

delay(delaySeconds.seconds)
}
}
}

private suspend fun connectAndLoadFolders() {
try {
store = imapDaemon.session.getStore("imap")
store?.connect(imapConfig.username, imapConfig.password)
defaultFolder = store?.defaultFolder
logger.info("Connected to IMAP store and loaded default folder")

suspend fun loadAllFolders() {
store.defaultFolder
.list("*")
.sortedBy { it.fullName }
.forEach { folder ->
upsertFolder(folder)
}
try {
store?.defaultFolder
?.list("*")
?.sortedBy { it.fullName }
?.forEach { folder ->
upsertFolder(folder)
}
} catch (e: MessagingException) {
logger.error("Error loading folders: ${e.message}")
throw e
}
}

while (isActive) {
folderImporterMutex.withLock {
loadAllFolders()
while (isRunning && coroutineScope.isActive && store?.isConnected == true) {
try {
folderImporterMutex.withLock {
loadAllFolders()
}
delay(1.minutes)
} catch (e: MessagingException) {
logger.error("Error during folder sync: ${e.message}")
throw e
}
delay(1.minutes)
}
} finally {
cleanupConnection()
}
}

private fun cleanupConnection() {
try {
defaultFolder?.close(false)
} catch (e: Exception) {
logger.error("Error closing default folder: ${e.message}")
}
defaultFolder = null

try {
store?.close()
} catch (e: Exception) {
logger.error("Error closing store: ${e.message}")
}
store = null
}

private suspend fun upsertFolder(folder: JakartaFolder) {
Expand Down Expand Up @@ -103,12 +170,47 @@ class FolderDaemon(
}

fun stop() {
store.close()
defaultFolder?.close(false)
defaultFolder = null
storeInstances.values.forEach { it.close() }
idleDaemons.values.forEach { it.stop() }
mailsDaemons.values.forEach { it.stop() }
logger.info("Stopping FolderDaemon")
isRunning = false
connectionJob?.cancel()

cleanupConnection()

storeInstances.values.forEach {
try {
it.close()
} catch (e: Exception) {
logger.error("Error closing store instance: ${e.message}")
}
}
storeInstances.clear()

importInstances.values.forEach {
try {
it.close()
} catch (e: Exception) {
logger.error("Error closing import instance: ${e.message}")
}
}
importInstances.clear()

idleDaemons.values.forEach {
try {
it.stop()
} catch (e: Exception) {
logger.error("Error stopping idle daemon: ${e.message}")
}
}
idleDaemons.clear()

mailsDaemons.values.forEach {
try {
it.stop()
} catch (e: Exception) {
logger.error("Error stopping mails daemon: ${e.message}")
}
}
mailsDaemons.clear()
}

private fun startMailsDaemon(separator: Char, dbFolder: ImapFolder) {
Expand Down Expand Up @@ -150,9 +252,17 @@ class FolderDaemon(
coroutineScope = coroutineScope,
onNewMessage = { newMessageUid ->
coroutineScope.launch {
println("New message: $newMessageUid in ${dbFolder.folderPath}")
val mailsDaemon = mailsDaemons[dbFolderId]!!
mailsDaemon.upsertMessage(newMessageUid)
logger.info("New message: $newMessageUid in ${dbFolder.folderPath}")
val mailsDaemon = mailsDaemons[dbFolderId]
if (mailsDaemon != null) {
try {
mailsDaemon.upsertMessage(newMessageUid)
} catch (e: Exception) {
logger.error("Error upserting message: ${e.message}")
}
} else {
logger.warn("MailsDaemon not found for folder $dbFolderId")
}
}
}
)
Expand Down Expand Up @@ -182,6 +292,7 @@ class StoreInstance(
private val coroutineScope: CoroutineScope,
private val defaultFolderName: String? = null
) {
private val logger = KtorSimpleLogger("StoreInstance@${defaultFolderName ?: "default"}")
private var store: Store? = null
private var closeJob: Job? = null
private val mutex = Mutex()
Expand All @@ -191,12 +302,47 @@ class StoreInstance(
suspend fun <T> withStore(block: suspend (store: Store) -> T): T {
return mutex.withLock {
closeJob?.cancel()
if (store?.isConnected != true) store = getStore()
val result = block(store!!)

// Check if store is connected, if not, create new one
if (store?.isConnected != true) {
try {
store?.close()
} catch (e: Exception) {
logger.debug("Error closing old store: ${e.message}")
}

try {
store = getStore()
logger.debug("Created new store connection")
} catch (e: Exception) {
logger.error("Failed to create store: ${e.message}")
throw e
}
}

val result = try {
block(store!!)
} catch (e: MessagingException) {
logger.error("Messaging exception in withStore: ${e.message}")
// Mark store as invalid for next use
try {
store?.close()
} catch (closeEx: Exception) {
logger.debug("Error closing store after exception: ${closeEx.message}")
}
store = null
throw e
}

closeJob = coroutineScope.launch {
delay(30.seconds)
mutex.withLock {
store?.close()
try {
store?.close()
logger.debug("Closed idle store connection")
} catch (e: Exception) {
logger.debug("Error closing store: ${e.message}")
}
store = null
}
}.apply {
Expand All @@ -221,13 +367,26 @@ class StoreInstance(
FolderMode.ReadWrite -> folder.open(JakartaFolder.READ_WRITE)
}
return@withStore block(folder as IMAPFolder)
} catch (e: Exception) {
logger.error("Error in withFolder: ${e.message}")
throw e
} finally {
folder.close(false)
try {
folder.close(false)
} catch (e: Exception) {
logger.debug("Error closing folder: ${e.message}")
}
}
}

fun close() {
closeJob?.cancel()
store?.close()
try {
store?.close()
logger.debug("Closed store in close()")
} catch (e: Exception) {
logger.debug("Error in close(): ${e.message}")
}
store = null
}
}
Loading