Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ class AppDelegate: NSObject, NSApplicationDelegate {
override init() {
vpn = CoderVPNService()
state = AppState(onChange: vpn.configureTunnelProviderProtocol)
fileSyncDaemon = MutagenDaemon()
if state.startVPNOnLaunch {
vpn.startWhenReady = true
}
vpn.installSystemExtension()
fileSyncDaemon = MutagenDaemon(
mutagenPath: Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be CPU arch-specific?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True!

)
}

func applicationDidFinishLaunching(_: Notification) {
Expand Down Expand Up @@ -73,10 +75,6 @@ class AppDelegate: NSObject, NSApplicationDelegate {
state.reconfigure()
}
}
// TODO: Start the daemon only once a file sync is configured
Task {
await fileSyncDaemon.start()
}
}

deinit {
Expand Down
190 changes: 165 additions & 25 deletions Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@ import Foundation
import GRPC
import NIO
import os
import Semaphore
import Subprocess
import SwiftUI

@MainActor
public protocol FileSyncDaemon: ObservableObject {
var state: DaemonState { get }
func start() async
func start() async throws(DaemonError)
func stop() async
func listSessions() async throws -> [FileSyncSession]
func createSession(with: FileSyncSession) async throws
}

public struct FileSyncSession {
public let id: String
public let name: String
public let localPath: URL
public let workspace: String
public let agent: String
public let remotePath: URL
}

@MainActor
Expand All @@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon {

@Published public var state: DaemonState = .stopped {
didSet {
logger.info("daemon state changed: \(self.state.description, privacy: .public)")
logger.info("daemon state set: \(self.state.description, privacy: .public)")
if case .failed = state {
Task {
try? await cleanupGRPC()
}
mutagenProcess?.kill()
mutagenProcess = nil
}
}
}

Expand All @@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon {
private let mutagenDataDirectory: URL
private let mutagenDaemonSocket: URL

// Non-nil when the daemon is running
private var group: MultiThreadedEventLoopGroup?
private var channel: GRPCChannel?
private var client: Daemon_DaemonAsyncClient?

public init() {
#if arch(arm64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil)
#elseif arch(x86_64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil)
#else
fatalError("unknown architecture")
#endif
mutagenDataDirectory = FileManager.default.urls(
for: .applicationSupportDirectory,
in: .userDomainMask
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")
private var client: DaemonClient?

// Protect start & stop transitions against re-entrancy
private let transition = AsyncSemaphore(value: 1)

public init(mutagenPath: URL? = nil,
mutagenDataDirectory: URL = FileManager.default.urls(
for: .applicationSupportDirectory,
in: .userDomainMask
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen"))
{
self.mutagenPath = mutagenPath
self.mutagenDataDirectory = mutagenDataDirectory
mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock")
// It shouldn't be fatal if the app was built without Mutagen embedded,
// but file sync will be unavailable.
if mutagenPath == nil {
logger.warning("Mutagen not embedded in app, file sync will be unavailable")
state = .unavailable
return
}

// If there are sync sessions, the daemon should be running
Task {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
return
}
await stopIfNoSessions()
}
}

public func start() async {
public func start() async throws(DaemonError) {
if case .unavailable = state { return }

// Stop an orphaned daemon, if there is one
try? await connect()
await stop()

await transition.wait()
defer { transition.signal() }
logger.info("starting mutagen daemon")

mutagenProcess = createMutagenProcess()
// swiftlint:disable:next large_tuple
let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void)
do {
(standardOutput, standardError, waitForExit) = try mutagenProcess!.run()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
throw .daemonStartFailure(error)
}

Task {
Expand All @@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon {
do {
try await connect()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
throw .daemonStartFailure(error)
}

try await waitForDaemonStart()

state = .running
logger.info(
"""
Expand All @@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon {
)
}

// The daemon takes a moment to open the socket, and we don't want to hog the main actor
// so poll for it on a background thread
private func waitForDaemonStart(
maxAttempts: Int = 5,
attemptInterval: Duration = .milliseconds(100)
) async throws(DaemonError) {
do {
try await Task.detached(priority: .background) {
for attempt in 0 ... maxAttempts {
do {
_ = try await self.client!.mgmt.version(
Daemon_VersionRequest(),
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
)
return
} catch {
if attempt == maxAttempts {
throw error
}
try? await Task.sleep(for: attemptInterval)
}
}
}.value
} catch {
throw .daemonStartFailure(error)
}
}

private func connect() async throws(DaemonError) {
guard client == nil else {
// Already connected
Expand All @@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon {
transportSecurity: .plaintext,
eventLoopGroup: group!
)
client = Daemon_DaemonAsyncClient(channel: channel!)
client = DaemonClient(
mgmt: Daemon_DaemonAsyncClient(channel: channel!),
sync: Synchronization_SynchronizationAsyncClient(channel: channel!)
)
logger.info(
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)"
)
} catch {
logger.error("Failed to connect to gRPC: \(error)")
try? await cleanupGRPC()
throw DaemonError.connectionFailure(error)
throw .connectionFailure(error)
}
}

Expand All @@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon {

public func stop() async {
if case .unavailable = state { return }
await transition.wait()
defer { transition.signal() }
logger.info("stopping mutagen daemon")

state = .stopped
guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else {
// Already stopped
Expand All @@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon {

// "We don't check the response or error, because the daemon
// may terminate before it has a chance to send the response."
_ = try? await client?.terminate(
_ = try? await client?.mgmt.terminate(
Daemon_TerminateRequest(),
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
)
Expand Down Expand Up @@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon {
"""
)
state = .failed(.terminatedUnexpectedly)
return
}
}

Expand All @@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon {
logger.info("\(line, privacy: .public)")
}
}

public func listSessions() async throws -> [FileSyncSession] {
guard case .running = state else {
return []
}
// TODO: Implement
return []
}

public func createSession(with _: FileSyncSession) async throws {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
return
}
}
// TODO: Add Session
}

public func deleteSession() async throws {
// TODO: Delete session
await stopIfNoSessions()
}

private func stopIfNoSessions() async {
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.daemonStartFailure(error))
return
}
// If there's no configured sessions, the daemon doesn't need to be running
if sessions.sessionStates.isEmpty {
logger.info("No sync sessions found")
await stop()
}
}
}

struct DaemonClient {
let mgmt: Daemon_DaemonAsyncClient
let sync: Synchronization_SynchronizationAsyncClient
}

public enum DaemonState {
Expand All @@ -191,7 +312,7 @@ public enum DaemonState {
case failed(DaemonError)
case unavailable

var description: String {
public var description: String {
switch self {
case .running:
"Running"
Expand All @@ -203,12 +324,27 @@ public enum DaemonState {
"Unavailable"
}
}

public var color: Color {
switch self {
case .running:
.green
case .stopped:
.gray
case .failed:
.red
case .unavailable:
.gray
}
}
}

public enum DaemonError: Error {
case daemonNotRunning
case daemonStartFailure(Error)
case connectionFailure(Error)
case terminatedUnexpectedly
case grpcFailure(Error)

var description: String {
switch self {
Expand All @@ -218,6 +354,10 @@ public enum DaemonError: Error {
"Connection failure: \(error)"
case .terminatedUnexpectedly:
"Daemon terminated unexpectedly"
case .daemonNotRunning:
"The daemon must be started first"
case let .grpcFailure(error):
"Failed to communicate with daemon: \(error)"
}
}

Expand Down
6 changes: 5 additions & 1 deletion Coder-Desktop/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ packages:
exactVersion: 1.24.2
Subprocess:
url: https://github.com/jamf/Subprocess
revision: 9d67b79
revision: 9d67b79
Semaphore:
url: https://github.com/groue/Semaphore/
exactVersion: 0.1.0

targets:
Coder Desktop:
Expand Down Expand Up @@ -276,6 +279,7 @@ targets:
product: SwiftProtobufPluginLibrary
- package: GRPC
- package: Subprocess
- package: Semaphore
- target: CoderSDK
embed: false

Expand Down
Loading