Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 118 additions & 23 deletions stationapi/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,25 @@ fn download_gtfs() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Downloading GTFS data from ODPT API...");

// Download the ZIP file
let request_start = std::time::Instant::now();
let response = reqwest::blocking::get(TOEI_BUS_GTFS_URL)?;
info!(
"[gtfs-download] response received in {:?} (status={})",
request_start.elapsed(),
response.status()
);

if !response.status().is_success() {
return Err(format!("Failed to download GTFS: HTTP {}", response.status()).into());
}

let body_start = std::time::Instant::now();
let bytes = response.bytes()?;
info!("Downloaded {} bytes, extracting...", bytes.len());
info!(
"[gtfs-download] body read in {:?} ({} bytes), extracting...",
body_start.elapsed(),
bytes.len()
);

// Create the target directory
fs::create_dir_all(gtfs_path)?;
Expand Down Expand Up @@ -248,17 +259,26 @@ fn download_gtfs() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// Import GTFS data from ToeiBus-GTFS directory
/// All imports are wrapped in a transaction - if any step fails, all changes are rolled back
pub async fn import_gtfs() -> Result<(), Box<dyn std::error::Error>> {
let total_start = std::time::Instant::now();
info!("[gtfs] entering import_gtfs");

// Check if bus feature is disabled
if is_bus_feature_disabled() {
info!("Bus feature is disabled, skipping GTFS import.");
return Ok(());
}
info!("[gtfs] bus feature enabled, starting download/extract");

// Download GTFS data if not present (use spawn_blocking to avoid blocking async runtime)
let download_start = std::time::Instant::now();
tokio::task::spawn_blocking(download_gtfs)
.await
.map_err(|e| format!("Failed to spawn blocking task: {}", e))?
.map_err(|e| -> Box<dyn std::error::Error> { e })?;
info!(
"[gtfs] download/extract finished in {:?}",
download_start.elapsed()
);

let gtfs_path = Path::new("data/ToeiBus-GTFS");

Expand All @@ -268,10 +288,19 @@ pub async fn import_gtfs() -> Result<(), Box<dyn std::error::Error>> {
}

// Load translations for multi-language support (before transaction to avoid holding lock)
let translations_start = std::time::Instant::now();
let translations = load_gtfs_translations(gtfs_path)?;
info!(
"[gtfs] loaded {} translation entries in {:?}",
translations.len(),
translations_start.elapsed()
);

info!("[gtfs] connecting to database");
let connect_start = std::time::Instant::now();
let db_url = fetch_database_url();
let mut conn = PgConnection::connect(&db_url).await?;
info!("[gtfs] connected in {:?}", connect_start.elapsed());

info!(
"Starting GTFS import from {:?} (using transaction)...",
Expand All @@ -280,8 +309,10 @@ pub async fn import_gtfs() -> Result<(), Box<dyn std::error::Error>> {

// Begin transaction - all changes will be rolled back if any step fails
let mut tx = conn.begin().await?;
info!("[gtfs] transaction begun, clearing existing data");

// First, clear existing GTFS data (in reverse order of dependencies)
let clear_start = std::time::Instant::now();
sqlx::query("DELETE FROM gtfs_stop_times")
.execute(&mut *tx)
.await?;
Expand Down Expand Up @@ -309,38 +340,62 @@ pub async fn import_gtfs() -> Result<(), Box<dyn std::error::Error>> {
sqlx::query("DELETE FROM gtfs_feed_info")
.execute(&mut *tx)
.await?;
info!(
"[gtfs] cleared existing data in {:?}",
clear_start.elapsed()
);

// Import agencies
let step_start = std::time::Instant::now();
import_gtfs_agencies(&mut tx, gtfs_path).await?;
info!("[gtfs] agencies imported in {:?}", step_start.elapsed());

// Import routes
let step_start = std::time::Instant::now();
import_gtfs_routes(&mut tx, gtfs_path).await?;
info!("[gtfs] routes imported in {:?}", step_start.elapsed());

// Import stops with translations
let step_start = std::time::Instant::now();
import_gtfs_stops(&mut tx, gtfs_path, &translations).await?;
info!("[gtfs] stops imported in {:?}", step_start.elapsed());

// Import calendar
let step_start = std::time::Instant::now();
import_gtfs_calendar(&mut tx, gtfs_path).await?;
info!("[gtfs] calendar imported in {:?}", step_start.elapsed());

// Import calendar_dates
let step_start = std::time::Instant::now();
import_gtfs_calendar_dates(&mut tx, gtfs_path).await?;
info!(
"[gtfs] calendar_dates imported in {:?}",
step_start.elapsed()
);

// Import shapes
let step_start = std::time::Instant::now();
import_gtfs_shapes(&mut tx, gtfs_path).await?;
info!("[gtfs] shapes imported in {:?}", step_start.elapsed());

// Import trips
let step_start = std::time::Instant::now();
import_gtfs_trips(&mut tx, gtfs_path).await?;
info!("[gtfs] trips imported in {:?}", step_start.elapsed());

// Import stop_times (largest file, needs batch processing)
let step_start = std::time::Instant::now();
import_gtfs_stop_times(&mut tx, gtfs_path).await?;
info!("[gtfs] stop_times imported in {:?}", step_start.elapsed());

// Import feed_info
let step_start = std::time::Instant::now();
import_gtfs_feed_info(&mut tx, gtfs_path).await?;
info!("[gtfs] feed_info imported in {:?}", step_start.elapsed());

// Commit transaction - all changes are now permanent
info!("[gtfs] committing transaction");
let commit_start = std::time::Instant::now();
tx.commit().await?;
info!(
"[gtfs] transaction committed in {:?}",
commit_start.elapsed()
);

info!("GTFS import completed successfully (transaction committed).");
info!(
"GTFS import completed successfully (transaction committed). total={:?}",
total_start.elapsed()
);

Ok(())
}
Expand Down Expand Up @@ -1481,18 +1536,27 @@ struct GtfsStopRow {
/// This function wraps all integration operations in a single database transaction.
/// If any step fails, all changes are rolled back to maintain database consistency.
pub async fn integrate_gtfs_to_stations() -> Result<(), Box<dyn std::error::Error>> {
let total_start = std::time::Instant::now();
info!("[integrate] entering integrate_gtfs_to_stations");

if is_bus_feature_disabled() {
info!("Bus feature is disabled, skipping GTFS integration.");
return Ok(());
}

info!("[integrate] connecting to database");
let db_url = fetch_database_url();
let mut conn = PgConnection::connect(&db_url).await?;

// Check if GTFS data exists (outside transaction for quick exit)
let gtfs_route_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM gtfs_routes")
.fetch_one(&mut conn)
.await?;
info!(
"[integrate] gtfs_routes count = {}, total_elapsed = {:?}",
gtfs_route_count.0,
total_start.elapsed()
);

if gtfs_route_count.0 == 0 {
info!("No GTFS routes found, skipping integration.");
Expand All @@ -1503,10 +1567,12 @@ pub async fn integrate_gtfs_to_stations() -> Result<(), Box<dyn std::error::Erro

// Begin transaction - all changes will be rolled back if any step fails
let mut tx = conn.begin().await?;
info!("[integrate] transaction begun, clearing existing bus data");

// Step 1: Clear existing bus data from stations/lines/types/sst.
// station_station_types references both types (FK) and stations (FK), so delete
// bus sst rows before bus types and before stations.
let step_start = std::time::Instant::now();
sqlx::query(
"DELETE FROM station_station_types WHERE type_cd IN (SELECT type_cd FROM types WHERE kind = $1)",
)
Expand All @@ -1523,30 +1589,59 @@ pub async fn integrate_gtfs_to_stations() -> Result<(), Box<dyn std::error::Erro
sqlx::query("DELETE FROM lines WHERE transport_type = 1")
.execute(&mut *tx)
.await?;
info!("Cleared existing bus data from stations/lines/types/station_station_types tables.");
info!(
"[integrate] cleared existing bus data in {:?}",
step_start.elapsed()
);

// Step 2: Insert bus routes as lines
let step_start = std::time::Instant::now();
integrate_gtfs_routes_to_lines(&mut tx).await?;
info!(
"[integrate] routes_to_lines done in {:?}",
step_start.elapsed()
);

// Step 3: Build stop-route mapping from stop_times
let step_start = std::time::Instant::now();
let stop_route_map = build_stop_route_mapping(&mut tx).await?;
info!(
"[integrate] build_stop_route_mapping done in {:?} ({} entries)",
step_start.elapsed(),
stop_route_map.len()
);

// Step 4: Insert bus stops as stations
let step_start = std::time::Instant::now();
integrate_gtfs_stops_to_stations(&mut tx, &stop_route_map).await?;
info!(
"[integrate] stops_to_stations done in {:?}",
step_start.elapsed()
);

// Step 5: Update cross-references in GTFS tables
let step_start = std::time::Instant::now();
update_gtfs_crossreferences(&mut tx, &stop_route_map).await?;
info!(
"[integrate] crossreferences done in {:?}",
step_start.elapsed()
);

// Step 6: Register each (route_id, shape_id) trip variation as a TrainType
// (kind=BusRoute) so clients can switch between bus operation patterns
// (e.g. 池86 のフルループ / サンシャインシティ経由 / 短ターン).
let step_start = std::time::Instant::now();
integrate_gtfs_trip_variations_to_types(&mut tx).await?;
info!(
"[integrate] trip_variations_to_types done in {:?}",
step_start.elapsed()
);

// Commit the transaction - all changes are now permanent
// ANALYZE is run separately in main.rs after all GTFS imports complete
info!("[integrate] committing transaction");
let commit_start = std::time::Instant::now();
tx.commit().await?;
info!(
"[integrate] transaction committed in {:?}",
commit_start.elapsed()
);

info!("GTFS integration completed successfully (transaction committed).");
info!(
"GTFS integration completed successfully (transaction committed). total={:?}",
total_start.elapsed()
);
Ok(())
}

Expand Down
46 changes: 45 additions & 1 deletion stationapi/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,15 @@ async fn run() -> std::result::Result<(), anyhow::Error> {
// After CSV completes, wait for GTFS in background and run integration.
// This task lives past server startup; health check passes as soon as CSV is done.
tokio::spawn(async move {
info!("[post-csv] awaiting GTFS import handle...");
let gtfs_wait_start = std::time::Instant::now();
match gtfs_handle.await {
Ok(Ok(())) => {}
Ok(Ok(())) => {
info!(
"[post-csv] GTFS import completed, waited {:?}",
gtfs_wait_start.elapsed()
);
}
Ok(Err(e)) => {
warn!(
"Failed to import GTFS data: {}. Continuing without GTFS data.",
Expand All @@ -118,8 +125,45 @@ async fn run() -> std::result::Result<(), anyhow::Error> {
}
}

// Refresh statistics on freshly-populated gtfs_* tables before running
// the heavy CTE query in integrate_gtfs_to_stations. Without this,
// PostgreSQL plans against empty-table statistics (from the CSV-side
// ANALYZE that ran while GTFS was still importing) and picks a plan
// that effectively hangs on build_stop_route_mapping.
info!("[post-csv] running ANALYZE on gtfs_* tables before integration");
let analyze_start = std::time::Instant::now();
let pre_db_url = fetch_database_url();
match sqlx::PgConnection::connect(&pre_db_url).await {
Ok(mut conn) => {
for table in [
"gtfs_agencies",
"gtfs_routes",
"gtfs_stops",
"gtfs_calendar",
"gtfs_calendar_dates",
"gtfs_shapes",
"gtfs_trips",
"gtfs_stop_times",
"gtfs_feed_info",
] {
let sql = format!("ANALYZE {table}");
if let Err(e) = sqlx::query(&sql).execute(&mut conn).await {
warn!("Failed to ANALYZE {}: {}", table, e);
}
}
info!(
"[post-csv] gtfs_* ANALYZE done in {:?}",
analyze_start.elapsed()
);
}
Err(e) => {
warn!("Failed to connect for pre-integrate ANALYZE: {}", e);
}
}

// Integrate GTFS data into stations/lines tables
// This is wrapped in a transaction - if any step fails, all changes are rolled back
info!("[post-csv] starting GTFS integration");
if let Err(e) = import::integrate_gtfs_to_stations().await {
tracing::error!(
"Failed to integrate GTFS to stations (transaction rolled back): {}",
Expand Down
Loading