diff --git a/Cargo.lock b/Cargo.lock index 7651d42..ceb6391 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1503,9 +1503,11 @@ dependencies = [ name = "harmonia-host" version = "0.1.0" dependencies = [ + "aitesis", "axum", "clap", "epignosis", + "ergasia", "exousia", "harmonia-common", "harmonia-db", @@ -1513,14 +1515,20 @@ dependencies = [ "komide", "kritike", "paroche", + "prostheke", "rand 0.10.0", "reqwest", "rstest", "snafu", + "sqlx", + "syndesmos", + "syntaxis", "taxis", "tokio", + "tokio-util", "tracing", "tracing-subscriber", + "zetesis", ] [[package]] diff --git a/crates/harmonia-host/Cargo.toml b/crates/harmonia-host/Cargo.toml index 673032d..a35097a 100644 --- a/crates/harmonia-host/Cargo.toml +++ b/crates/harmonia-host/Cargo.toml @@ -19,10 +19,18 @@ taxis.workspace = true epignosis.workspace = true kritike.workspace = true komide.workspace = true +zetesis.workspace = true +ergasia.workspace = true +syntaxis.workspace = true +aitesis.workspace = true +syndesmos.workspace = true +prostheke.workspace = true clap.workspace = true tokio.workspace = true +tokio-util.workspace = true axum.workspace = true reqwest.workspace = true +sqlx.workspace = true snafu.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/crates/harmonia-host/src/error.rs b/crates/harmonia-host/src/error.rs index 34b7577..0937195 100644 --- a/crates/harmonia-host/src/error.rs +++ b/crates/harmonia-host/src/error.rs @@ -2,6 +2,7 @@ use snafu::Snafu; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] +#[expect(dead_code, reason = "variants used as subsystems gain route handlers")] pub enum HostError { #[snafu(display("configuration error: {source}"))] Config { @@ -54,4 +55,52 @@ pub enum HostError { #[snafu(implicit)] location: snafu::Location, }, + + #[snafu(display("indexer error: {source}"))] + Indexer { + #[snafu(source(from(zetesis::ZetesisError, Box::new)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + }, + + #[snafu(display("download engine error: {source}"))] + DownloadEngine { + #[snafu(source(from(ergasia::ErgasiaError, Box::new)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + }, + + #[snafu(display("download queue error: {source}"))] + DownloadQueue { + #[snafu(source(from(syntaxis::SyntaxisError, Box::new)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + }, + + #[snafu(display("request service error: {source}"))] + RequestService { + #[snafu(source(from(aitesis::AitesisError, Box::new)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + }, + + #[snafu(display("external integration error: {source}"))] + ExternalIntegration { + #[snafu(source(from(syndesmos::SyndesmodError, Box::new)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + }, + + #[snafu(display("subtitle service error: {source}"))] + SubtitleService { + #[snafu(source(from(prostheke::ProsthekeError, Box::new)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + }, } diff --git a/crates/harmonia-host/src/serve.rs b/crates/harmonia-host/src/serve.rs index 5d12624..b873285 100644 --- a/crates/harmonia-host/src/serve.rs +++ b/crates/harmonia-host/src/serve.rs @@ -1,32 +1,158 @@ +use std::pin::Pin; use std::sync::Arc; use snafu::ResultExt; use tokio::signal::unix::SignalKind; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tracing::{Instrument, info}; use epignosis::{EpignosisService, resolver::ProviderCredentials}; +use ergasia::ErgasiaSession; use exousia::ExousiaServiceImpl; use harmonia_common::create_event_bus; use harmonia_db::init_pools; use horismos::ConfigManager; use komide::{KomideService, scheduler::FeedScheduler}; use kritike::DefaultCurationService; -use paroche::state::{AppState, DynCurationService, DynMetadataResolver}; +use paroche::state::{ + AppState, DynCurationService, DynDownloadEngine, DynExternalIntegration, DynMetadataResolver, + DynQueueManager, DynRequestService, DynSearchService, DynSubtitleService, +}; +use prostheke::ProsthekeService; +use prostheke::providers::Provider; +use syndesmos::{SyndesmosService, SyndesmosServiceBuilder}; +use syntaxis::{CompletedDownload, SyntaxisService}; use taxis::ScannerManager; +use zetesis::ZetesisService; +use zetesis::cf_bypass::noop::NoProxy; use crate::cli::ServeArgs; use crate::error::{ - ConfigSnafu, DatabaseSnafu, FeedSchedulerSnafu, HostError, ScannerSnafu, ServerSnafu, + ConfigSnafu, DatabaseSnafu, DownloadEngineSnafu, DownloadQueueSnafu, FeedSchedulerSnafu, + HostError, ScannerSnafu, ServerSnafu, }; use crate::shutdown::shutdown_signal; use crate::startup::{ensure_admin_user, init_tracing}; +// ── Dyn-trait adapters ────────────────────────────────────────────────────── + struct NullCuration; impl DynCurationService for NullCuration {} struct NullMetadata; impl DynMetadataResolver for NullMetadata {} +// WHY: Adapter structs hold Arc handles to keep acquisition subsystems alive +// for the lifetime of AppState. The inner fields are read once route handlers +// are wired in prompt 102. +struct SearchAdapter(#[expect(dead_code)] Arc); +impl DynSearchService for SearchAdapter {} + +struct EngineAdapter(#[expect(dead_code)] Arc); +impl DynDownloadEngine for EngineAdapter {} + +struct QueueAdapter; +impl DynQueueManager for QueueAdapter {} + +struct RequestAdapter; +impl DynRequestService for RequestAdapter {} + +struct ExternalAdapter(#[expect(dead_code)] Arc); +impl DynExternalIntegration for ExternalAdapter {} + +struct SubtitleAdapter; +impl DynSubtitleService for SubtitleAdapter {} + +// ── DownloadEngine adapter ────────────────────────────────────────────────── + +/// Bridges `ErgasiaSession` (torrent client) to the `DownloadEngine` trait +/// that Syntaxis expects for dispatching downloads. +struct SessionEngine { + session: Arc, +} + +impl ergasia::DownloadEngine for SessionEngine { + async fn start_download( + &self, + request: ergasia::DownloadRequest, + ) -> Result { + self.session + .add_torrent_from_magnet(request.download_id, &request.download_url) + .await?; + Ok(request.download_id) + } + + async fn cancel_download( + &self, + download_id: harmonia_common::ids::DownloadId, + ) -> Result<(), ergasia::ErgasiaError> { + self.session.delete_torrent(download_id).await + } + + async fn get_progress( + &self, + download_id: harmonia_common::ids::DownloadId, + ) -> Result { + let stats = self.session.get_stats(download_id)?; + let total = stats.total_bytes; + let downloaded = stats.progress_bytes; + let pct = if total > 0 { + ((downloaded as f64 / total as f64) * 100.0) as u8 + } else { + 0 + }; + let (dl_speed, ul_speed) = match &stats.live { + Some(live) => ( + live.download_speed.mbps * 125_000.0, + live.upload_speed.mbps * 125_000.0, + ), + None => (0.0, 0.0), + }; + Ok(ergasia::DownloadProgress { + download_id, + state: ergasia::DownloadState::Downloading, + percent_complete: pct, + download_speed_bps: dl_speed as u64, + upload_speed_bps: ul_speed as u64, + peers_connected: 0, + seeders: 0, + eta_seconds: None, + }) + } + + fn extract( + &self, + download_path: &std::path::Path, + output_dir: &std::path::Path, + ) -> Result, ergasia::ErgasiaError> { + ergasia::extract_archives(download_path, output_dir, 3) + } +} + +// ── ImportService stub ────────────────────────────────────────────────────── + +/// Stub ImportService for Syntaxis. The real import pipeline wiring is done +/// in a follow-up prompt; this stub accepts completed downloads and logs them. +struct StubImportService; + +impl syntaxis::ImportService for StubImportService { + fn import( + &self, + completed: CompletedDownload, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + tracing::info!( + download_id = %completed.download_id, + "import stub: download completed, import pipeline not yet wired" + ); + Ok(()) + }) + } +} + +// ── Serve entry point ─────────────────────────────────────────────────────── + pub async fn run_serve(args: ServeArgs) -> Result<(), HostError> { // 1. Load config let (mut config, warnings) = @@ -135,6 +261,71 @@ pub async fn run_serve(args: ServeArgs) -> Result<(), HostError> { .await .context(FeedSchedulerSnafu)?; + // ── Pre-flight: acquisition config validation ───────────────────────── + validate_download_dir(&config)?; + + // ── Acquisition subsystem startup ─────────────────────────────────────── + + let shutdown_token = CancellationToken::new(); + + // Layer 0: Zetesis (indexer protocol) + let zetesis = Arc::new(ZetesisService::new( + db.read.clone(), + db.write.clone(), + Arc::new(NoProxy), + config.zetesis.clone(), + event_tx.clone(), + )); + info!("zetesis (indexer search) initialized"); + + // Layer 1: Ergasia (download execution) + let ergasia_session = Arc::new( + ErgasiaSession::new(&config.ergasia) + .await + .context(DownloadEngineSnafu)?, + ); + ergasia_session.reconcile_persisted_torrents(); + info!("ergasia (download engine) initialized"); + + // Layer 2: Syntaxis (queue orchestration, depends on ergasia) + let engine_adapter = Arc::new(SessionEngine { + session: Arc::clone(&ergasia_session), + }); + let syntaxis_svc = Arc::new( + SyntaxisService::new( + db.write.clone(), + engine_adapter, + Arc::new(StubImportService), + config.syntaxis.clone(), + ) + .await + .context(DownloadQueueSnafu)?, + ); + syntaxis_svc.start(event_tx.subscribe(), shutdown_token.child_token()); + info!("syntaxis (download queue) initialized — event listener started"); + + // Layer 4: Syndesmos (external integrations — Plex, Last.fm, Tidal) + let syndesmos_svc = Arc::new(build_syndesmos(&config, &event_tx)); + let syndesmos_handle = spawn_syndesmos_handler( + Arc::clone(&syndesmos_svc), + event_tx.subscribe(), + shutdown_token.child_token(), + ); + info!("syndesmos (external integrations) initialized — event listener started"); + + // Layer 4: Prostheke (subtitle management) + let providers = Provider::default_providers(config.prostheke.opensubtitles.clone()); + let _prostheke_svc = ProsthekeService::new( + db.read.clone(), + db.write.clone(), + config.prostheke.clone(), + providers, + event_tx.clone(), + ); + info!("prostheke (subtitles) initialized"); + + // ── End acquisition startup ───────────────────────────────────────────── + // 12. Build import service adapter for paroche let import = paroche::state::make_import_service(|| async { Ok(vec![]) }); @@ -147,6 +338,12 @@ pub async fn run_serve(args: ServeArgs) -> Result<(), HostError> { import, metadata: Arc::new(NullMetadata), curation: Arc::new(NullCuration), + search: Arc::new(SearchAdapter(zetesis)), + download_engine: Arc::new(EngineAdapter(ergasia_session)), + queue: Arc::new(QueueAdapter), + requests: Arc::new(RequestAdapter), + external: Arc::new(ExternalAdapter(syndesmos_svc)), + subtitles: Arc::new(SubtitleAdapter), }; let router = paroche::build_router(state); @@ -163,11 +360,95 @@ pub async fn run_serve(args: ServeArgs) -> Result<(), HostError> { .await .context(ServerSnafu)?; - // 16. Cleanup — reverse order + // 16. Cleanup — reverse startup order info!("shutting down subsystems"); + + // Cancel all acquisition background tasks (syndesmos event handler, syntaxis listener) + shutdown_token.cancel(); + + // Wait for syndesmos event handler to drain + if let Err(e) = syndesmos_handle.await { + tracing::warn!(error = %e, "syndesmos event handler panicked during shutdown"); + } + + // Shutdown core subsystems (reverse of startup) feed_scheduler.shutdown(); scanner.shutdown().await; info!("shutdown complete"); Ok(()) } + +// ── Syndesmos construction ────────────────────────────────────────────────── + +fn build_syndesmos( + config: &horismos::Config, + event_tx: &harmonia_common::EventSender, +) -> SyndesmosService { + let mut builder = SyndesmosServiceBuilder::new(event_tx.clone()) + .circuit_break_minutes(config.syndesmos.circuit_break_minutes); + + if let Some(ref plex_config) = config.syndesmos.plex { + let client = syndesmos::plex::PlexClient::new(plex_config.clone()); + builder = builder.with_plex(client); + } + + if let Some(ref lastfm_config) = config.syndesmos.lastfm { + let client = syndesmos::lastfm::LastfmClient::new(lastfm_config.clone()); + builder = builder.with_lastfm(client); + } + + if let Some(ref tidal_config) = config.syndesmos.tidal { + let client = syndesmos::tidal::TidalClient::new(tidal_config.clone()); + builder = builder.with_tidal(client); + } + + builder.build() +} + +fn spawn_syndesmos_handler( + service: Arc, + event_rx: harmonia_common::EventReceiver, + ct: CancellationToken, +) -> JoinHandle<()> { + let span = tracing::info_span!("syndesmos_event_handler"); + tokio::spawn( + async move { + syndesmos::events::run_event_handler(service, event_rx, ct).await; + } + .instrument(span), + ) +} + +// ── Config pre-flight ─────────────────────────────────────────────────────── + +fn validate_download_dir(config: &horismos::Config) -> Result<(), HostError> { + let dir = &config.ergasia.download_dir; + if !dir.exists() { + return Err(HostError::Config { + source: horismos::HorismosError::Validation { + message: format!( + "ergasia.download_dir '{}' does not exist — create it before starting", + dir.display() + ), + location: snafu::location!(), + }, + location: snafu::location!(), + }); + } + let test_file = dir.join(".harmonia-write-test"); + if let Err(e) = std::fs::write(&test_file, b"") { + return Err(HostError::Config { + source: horismos::HorismosError::Validation { + message: format!( + "ergasia.download_dir '{}' is not writable: {e}", + dir.display() + ), + location: snafu::location!(), + }, + location: snafu::location!(), + }); + } + let _ = std::fs::remove_file(&test_file); + Ok(()) +} diff --git a/crates/paroche/src/state.rs b/crates/paroche/src/state.rs index 7e00f4f..75f58b2 100644 --- a/crates/paroche/src/state.rs +++ b/crates/paroche/src/state.rs @@ -23,6 +23,16 @@ pub trait DynCurationService: Send + Sync {} pub trait DynMetadataResolver: Send + Sync {} +/// Dyn-compatible placeholders for acquisition subsystems. Route handlers +/// will add methods in a follow-up prompt; for now these carry the handles +/// through AppState so serve.rs can wire startup and shutdown. +pub trait DynSearchService: Send + Sync {} +pub trait DynDownloadEngine: Send + Sync {} +pub trait DynQueueManager: Send + Sync {} +pub trait DynRequestService: Send + Sync {} +pub trait DynExternalIntegration: Send + Sync {} +pub trait DynSubtitleService: Send + Sync {} + /// Adapter around a closure for import queue retrieval. pub struct ImportQueueFn(pub Arc ImportQueueFut + Send + Sync>); @@ -49,6 +59,24 @@ impl DynCurationService for NullCuration {} struct NullMetadata; impl DynMetadataResolver for NullMetadata {} +struct NullSearch; +impl DynSearchService for NullSearch {} + +struct NullDownloadEngine; +impl DynDownloadEngine for NullDownloadEngine {} + +struct NullQueueManager; +impl DynQueueManager for NullQueueManager {} + +struct NullRequestService; +impl DynRequestService for NullRequestService {} + +struct NullExternalIntegration; +impl DynExternalIntegration for NullExternalIntegration {} + +struct NullSubtitleService; +impl DynSubtitleService for NullSubtitleService {} + #[derive(Clone)] pub struct AppState { pub db: Arc, @@ -58,6 +86,12 @@ pub struct AppState { pub import: Arc, pub metadata: Arc, pub curation: Arc, + pub search: Arc, + pub download_engine: Arc, + pub queue: Arc, + pub requests: Arc, + pub external: Arc, + pub subtitles: Arc, } impl AppState { @@ -83,6 +117,12 @@ impl AppState { import, metadata: Arc::new(NullMetadata), curation: Arc::new(NullCuration), + search: Arc::new(NullSearch), + download_engine: Arc::new(NullDownloadEngine), + queue: Arc::new(NullQueueManager), + requests: Arc::new(NullRequestService), + external: Arc::new(NullExternalIntegration), + subtitles: Arc::new(NullSubtitleService), } } }