diff --git a/Cargo.toml b/Cargo.toml index acab7c5ac..b5a60fbd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,11 +10,12 @@ members = [ "libra", "vault", "neptune", + "aries", "saturn", "lunar/src-tauri", "atlas", ] -default-members = ["mega", "libra"] +default-members = ["mega", "libra","aries"] exclude = ["craft"] resolver = "1" diff --git a/aries/Cargo.toml b/aries/Cargo.toml new file mode 100644 index 000000000..9adf07b86 --- /dev/null +++ b/aries/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "aries" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "aries" +path = "src/main.rs" + +[dependencies] +common = { workspace = true } +callisto = { workspace = true } +gemini = { workspace = true } +jupiter = { workspace = true } +tokio = { workspace = true } +clap = { workspace = true, features = ["derive"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-appender = { workspace = true } +axum = { workspace = true } +serde_json = { workspace = true } +tower = { workspace = true } +tower-http = { workspace = true, features = [ + "cors", + "trace", + "decompression-full", +] } +ctrlc = "3.4.4" +regex = "1.10.4" \ No newline at end of file diff --git a/aries/src/main.rs b/aries/src/main.rs new file mode 100644 index 000000000..e521bc524 --- /dev/null +++ b/aries/src/main.rs @@ -0,0 +1,83 @@ +use clap::Parser; +use common::config::{Config, LogConfig}; +use gemini::ztm::hub::LocalZTMHub; +use service::{ + ca_server::run_ca_server, + relay_server::{run_relay_server, RelayOptions}, +}; +use std::{env, thread, time}; +use tracing_subscriber::fmt::writer::MakeWriterExt; + +pub mod service; + +#[tokio::main] +async fn main() { + // Get the current directory + let current_dir = env::current_dir().unwrap(); + // Get the path to the config file in the current directory + let config_path = current_dir.join("config.toml"); + + let config = if config_path.exists() { + Config::new(config_path.to_str().unwrap()).unwrap() + } else { + eprintln!("can't find config.toml under {:?}, you can manually set config.toml path with --config parameter", env::current_dir().unwrap()); + Config::default() + }; + + init_log(&config.log); + + ctrlc::set_handler(move || { + tracing::info!("Received Ctrl-C signal, exiting..."); + std::process::exit(0); + }) + .unwrap(); + + let option = RelayOptions::parse(); + tracing::info!("{:?}", option); + + //Start a sub thread to ca server + let config_clone = config.clone(); + let ca_port = option.ca_port; + tokio::spawn(async move { run_ca_server(config_clone, ca_port).await }); + thread::sleep(time::Duration::from_secs(5)); + + //Start a sub thread to run ztm-hub + let ca = format!("127.0.0.1:{ca_port}"); + let ztm_hub: LocalZTMHub = LocalZTMHub { + hub_port: option.ztm_hub_port, + ca, + name: vec!["relay".to_string()], + }; + ztm_hub.clone().start_ztm_hub(); + thread::sleep(time::Duration::from_secs(5)); + + //Start relay server + run_relay_server(config, option).await; +} + +fn init_log(config: &LogConfig) { + let log_level = match config.level.as_str() { + "trace" => tracing::Level::TRACE, + "debug" => tracing::Level::DEBUG, + "info" => tracing::Level::INFO, + "warn" => tracing::Level::WARN, + "error" => tracing::Level::ERROR, + _ => tracing::Level::INFO, + }; + + let file_appender = + tracing_appender::rolling::hourly(config.log_path.clone(), "mega-relay-logs"); + + if config.print_std { + let stdout = std::io::stdout; + tracing_subscriber::fmt() + .with_writer(stdout.and(file_appender)) + .with_max_level(log_level) + .init(); + } else { + tracing_subscriber::fmt() + .with_writer(file_appender) + .with_max_level(log_level) + .init(); + } +} diff --git a/gateway/src/ca_server.rs b/aries/src/service/ca_server.rs similarity index 88% rename from gateway/src/ca_server.rs rename to aries/src/service/ca_server.rs index 82277bbc1..1c9158f53 100755 --- a/gateway/src/ca_server.rs +++ b/aries/src/service/ca_server.rs @@ -15,10 +15,7 @@ use tower_http::cors::{Any, CorsLayer}; use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; -use crate::api::api_router::{self}; -use crate::api::ApiServiceState; - -pub async fn run_ca_server(config: Config, _host: String, port: u16) { +pub async fn run_ca_server(config: Config, port: u16) { let host = "127.0.0.1".to_string(); let app = app(config.clone(), host.clone(), port).await; @@ -45,15 +42,7 @@ pub async fn app(config: Config, host: String, port: u16) -> Router { context: Context::new(config.clone()).await, }; - let api_state = ApiServiceState { - context: Context::new(config).await, - }; - - // add RequestDecompressionLayer for handle gzip encode - // add TraceLayer for log record - // add CorsLayer to add cors header Router::new() - .nest("/api/", api_router::routers().with_state(api_state)) .route( "/*path", get(get_method_router) @@ -93,11 +82,10 @@ async fn get_method_router( } async fn post_method_router( - state: State, + _state: State, uri: Uri, req: Request, ) -> Result, (StatusCode, String)> { - let _ztm_config = state.context.config.ztm.clone(); if Regex::new(r"/certificates/[a-zA-Z0-9]+$") .unwrap() .is_match(uri.path()) diff --git a/aries/src/service/mod.rs b/aries/src/service/mod.rs new file mode 100644 index 000000000..eda55b33b --- /dev/null +++ b/aries/src/service/mod.rs @@ -0,0 +1,5 @@ +pub mod ca_server; +pub mod relay_server; + +#[cfg(test)] +mod tests {} diff --git a/gateway/src/relay_server.rs b/aries/src/service/relay_server.rs similarity index 50% rename from gateway/src/relay_server.rs rename to aries/src/service/relay_server.rs index bb276d9b8..4914309d7 100644 --- a/gateway/src/relay_server.rs +++ b/aries/src/service/relay_server.rs @@ -2,140 +2,128 @@ use std::net::SocketAddr; use std::str::FromStr; use std::time::{Duration, SystemTime}; -use axum::body::Body; -use axum::extract::{FromRequest, Query, State}; -use axum::http::{Request, Response, StatusCode, Uri}; -use axum::routing::get; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::{get, post}; use axum::{Json, Router}; use callisto::{ztm_node, ztm_repo_info}; +use clap::Parser; use common::config::Config; -use common::model::CommonOptions; -use gemini::ztm::hub::{LocalHub, ZTMCA}; +use gemini::ztm::hub::{LocalHub, ZTMUserPermit, ZTMCA}; use gemini::{Node, RelayGetParams, RelayResultRes, RepoInfo}; use jupiter::context::Context; -use regex::Regex; use tower::ServiceBuilder; use tower_http::cors::{Any, CorsLayer}; use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; -use crate::api::api_router::{self}; -use crate::api::ApiServiceState; +#[derive(Clone, Debug, Parser)] +pub struct RelayOptions { + #[arg(long, default_value_t = String::from("127.0.0.1"))] + pub host: String, -pub async fn run_relay_server(config: Config, common: CommonOptions) { - let host = common.host.clone(); - let relay_port = common.relay_port; - let hub_port = common.ztm_hub_port; - let ca_port = common.ca_port; - let app = app(config.clone(), host.clone(), relay_port, hub_port, ca_port).await; + #[arg(long, default_value_t = String::from("127.0.0.1"))] + pub hub_host: String, - let server_url = format!("{}:{}", host, relay_port); - tracing::info!("start relay server: {server_url}"); - let addr = SocketAddr::from_str(&server_url).unwrap(); - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - axum::serve(listener, app.into_make_service()) - .await - .unwrap(); + #[arg(long, default_value_t = 8001)] + pub relay_port: u16, + + #[arg(long, default_value_t = 8888)] + pub ztm_hub_port: u16, + + #[arg(long, default_value_t = 9999)] + pub ca_port: u16, } #[derive(Clone)] pub struct AppState { pub context: Context, pub host: String, + pub hub_host: String, pub relay_port: u16, pub hub_port: u16, pub ca_port: u16, } +pub async fn run_relay_server(config: Config, option: RelayOptions) { + let host = option.host.clone(); + let relay_port = option.relay_port; + let hub_host = option.hub_host; + let hub_port = option.ztm_hub_port; + let ca_port = option.ca_port; + let app = app( + config.clone(), + host.clone(), + hub_host, + relay_port, + hub_port, + ca_port, + ) + .await; + + let server_url = format!("{}:{}", host, relay_port); + tracing::info!("start relay server: {server_url}"); + let addr = SocketAddr::from_str(&server_url).unwrap(); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + axum::serve(listener, app.into_make_service()) + .await + .unwrap(); +} + pub async fn app( config: Config, host: String, + hub_host: String, relay_port: u16, hub_port: u16, ca_port: u16, ) -> Router { let state = AppState { host, - relay_port, + hub_host, hub_port, + relay_port, ca_port, context: Context::new(config.clone()).await, }; - let api_state = ApiServiceState { - context: Context::new(config).await, - }; - - let context = api_state.context.clone(); + let context = Context::new(config.clone()).await; tokio::spawn(async move { loop_running(context).await }); - // add RequestDecompressionLayer for handle gzip encode - // add TraceLayer for log record - // add CorsLayer to add cors header Router::new() - .nest("/api/v1", api_router::routers().with_state(api_state)) - .route( - "/*path", - get(get_method_router).post(post_method_router), - // .put(put_method_router), - ) + .nest("/api/v1", routers().with_state(state)) .layer(ServiceBuilder::new().layer(CorsLayer::new().allow_origin(Any))) .layer(TraceLayer::new_for_http()) .layer(RequestDecompressionLayer::new()) - .with_state(state) } -async fn get_method_router( - state: State, - Query(params): Query, - uri: Uri, -) -> Result, (StatusCode, String)> { - if Regex::new(r"/hello$").unwrap().is_match(uri.path()) { - return hello_relay(params).await; - } else if Regex::new(r"/certificate$").unwrap().is_match(uri.path()) { - return certificate(state, params).await; - } else if Regex::new(r"/ping$").unwrap().is_match(uri.path()) { - return ping(state, params).await; - } else if Regex::new(r"/node_list$").unwrap().is_match(uri.path()) { - return node_list(state, params).await; - } else if Regex::new(r"/repo_list$").unwrap().is_match(uri.path()) { - return repo_list(state, params).await; - } - Err(( - StatusCode::NOT_FOUND, - String::from("Operation not supported\n"), - )) -} +pub fn routers() -> Router { + let router = Router::new() + .route("/hello", get(hello)) + .route("/certificate", get(certificate)) + .route("/ping", get(ping)) + .route("/node_list", get(node_list)) + .route("/repo_provide", post(repo_provide)) + .route("/repo_list", get(repo_list)); -async fn post_method_router( - state: State, - uri: Uri, - req: Request, -) -> Result, (StatusCode, String)> { - let _ztm_config = state.context.config.ztm.clone(); - if Regex::new(r"/repo_provide$").unwrap().is_match(uri.path()) { - return repo_provide(state, req).await; - } - Err(( - StatusCode::NOT_FOUND, - String::from("Operation not supported\n"), - )) + Router::new().merge(router) } -pub async fn hello_relay(_params: RelayGetParams) -> Result, (StatusCode, String)> { - Ok(Response::builder().body(Body::from("hello relay")).unwrap()) +async fn hello() -> Result { + Ok(Json("hello relay")) } -pub async fn certificate( +async fn certificate( + Query(query): Query, state: State, - params: RelayGetParams, -) -> Result, (StatusCode, String)> { - if params.name.is_none() { +) -> Result, (StatusCode, String)> { + if query.name.is_none() { return Err((StatusCode::BAD_REQUEST, "not enough paras".to_string())); } - let name = params.name.unwrap(); + let name = query.name.unwrap(); let ztm: LocalHub = LocalHub { - host: state.host.clone(), + hub_host: state.hub_host.clone(), hub_port: state.hub_port, ca_port: state.ca_port, }; @@ -145,38 +133,22 @@ pub async fn certificate( return Err((StatusCode::INTERNAL_SERVER_ERROR, e)); } }; - - let permit_json = serde_json::to_string(&permit).unwrap(); - tracing::info!("new permit [{name}]: {permit_json}"); - - Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(permit_json)) - .unwrap()) + Ok(Json(permit)) } pub async fn ping( + Query(query): Query, state: State, - params: RelayGetParams, -) -> Result, (StatusCode, String)> { +) -> Result, (StatusCode, String)> { let storage = state.context.services.ztm_storage.clone(); - let node: ztm_node::Model = match params.try_into() { + let node: ztm_node::Model = match query.try_into() { Ok(n) => n, Err(_) => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "invalid paras".to_string(), - )); + return Err((StatusCode::BAD_REQUEST, "invalid paras".to_string())); } }; match storage.insert_or_update_node(node).await { - Ok(_) => { - let res = serde_json::to_string(&RelayResultRes { success: true }).unwrap(); - Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(res)) - .unwrap()) - } + Ok(_) => Ok(Json(RelayResultRes { success: true })), Err(_) => Err(( StatusCode::INTERNAL_SERVER_ERROR, "invalid paras".to_string(), @@ -185,9 +157,9 @@ pub async fn ping( } pub async fn node_list( + Query(_query): Query, state: State, - _params: RelayGetParams, -) -> Result, (StatusCode, String)> { +) -> Result>, (StatusCode, String)> { let storage = state.context.services.ztm_storage.clone(); let nodelist: Vec = storage .get_all_node() @@ -196,34 +168,20 @@ pub async fn node_list( .into_iter() .map(|x| x.into()) .collect(); - let json_string = serde_json::to_string(&nodelist).unwrap(); - Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(json_string)) - .unwrap()) + Ok(Json(nodelist)) } pub async fn repo_provide( state: State, - req: Request, -) -> Result, (StatusCode, String)> { - let storage = state.context.services.ztm_storage.clone(); - let request = Json::from_request(req, &state) - .await - .unwrap_or_else(|_| Json(RepoInfo::default())); - let repo_info: RepoInfo = request.0; + Json(repo_info): Json, +) -> Result, (StatusCode, String)> { if repo_info.identifier.is_empty() { return Err((StatusCode::BAD_REQUEST, "paras invalid".to_string())); } let repo_info_model: ztm_repo_info::Model = repo_info.into(); + let storage = state.context.services.ztm_storage.clone(); match storage.insert_or_update_repo_info(repo_info_model).await { - Ok(_) => { - let res = serde_json::to_string(&RelayResultRes { success: true }).unwrap(); - Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(res)) - .unwrap()) - } + Ok(_) => Ok(Json(RelayResultRes { success: true })), Err(_) => Err(( StatusCode::INTERNAL_SERVER_ERROR, "invalid paras".to_string(), @@ -232,9 +190,9 @@ pub async fn repo_provide( } pub async fn repo_list( + Query(_query): Query, state: State, - _params: RelayGetParams, -) -> Result, (StatusCode, String)> { +) -> Result>, (StatusCode, String)> { let storage = state.context.services.ztm_storage.clone(); let repo_info_list: Vec = storage .get_all_repo_info() @@ -259,11 +217,7 @@ pub async fn repo_list( } repo_info_list_result.push(repo.clone()); } - let json_string = serde_json::to_string(&repo_info_list_result).unwrap(); - Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(json_string)) - .unwrap()) + Ok(Json(repo_info_list_result)) } async fn loop_running(context: Context) { diff --git a/common/src/config.rs b/common/src/config.rs index e1d6215cb..6424b46f9 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -17,7 +17,6 @@ pub struct Config { pub storage: StorageConfig, pub monorepo: MonoConfig, pub pack: PackConfig, - pub ztm: ZTMConfig, pub lfs: LFSConfig, pub oauth: OauthConfig, } @@ -227,23 +226,6 @@ impl Default for PackConfig { } } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ZTMConfig { - pub ca: String, - pub hub: String, - pub agent: String, -} - -impl Default for ZTMConfig { - fn default() -> Self { - Self { - ca: String::from("127.0.0.1:9999"), - hub: String::from("127.0.0.1:8888"), - agent: String::from("127.0.0.1:7777"), - } - } -} - #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LFSConfig { pub enable_split: bool, diff --git a/common/src/enums.rs b/common/src/enums.rs index 7e15bf2d9..640389c00 100644 --- a/common/src/enums.rs +++ b/common/src/enums.rs @@ -9,27 +9,6 @@ use std::str::FromStr; -use clap::ValueEnum; - -/// An enum representing different ZTM types. -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)] -pub enum ZtmType { - Agent, - Relay, -} - -impl FromStr for ZtmType { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "agent" => Ok(ZtmType::Agent), - "relay" => Ok(ZtmType::Relay), - _ => Err(format!("'{}' is not a valid ztm type", s)), - } - } -} - /// An enum representing different oauth types. #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] pub enum SupportOauthType { diff --git a/common/src/model.rs b/common/src/model.rs index c5aba44be..58180b561 100644 --- a/common/src/model.rs +++ b/common/src/model.rs @@ -1,28 +1,14 @@ use clap::Args; use serde::{Deserialize, Serialize}; -use crate::enums::ZtmType; - #[derive(Args, Clone, Debug)] pub struct CommonOptions { #[arg(long, default_value_t = String::from("127.0.0.1"))] pub host: String, - #[arg(long)] - pub ztm: Option, - - #[arg(long, default_value_t = 8001)] - pub relay_port: u16, - #[arg(long, default_value_t = 7777)] pub ztm_agent_port: u16, - #[arg(long, default_value_t = 8888)] - pub ztm_hub_port: u16, - - #[arg(long, default_value_t = 9999)] - pub ca_port: u16, - #[arg(long)] pub bootstrap_node: Option, } diff --git a/gateway/src/api/api_router.rs b/gateway/src/api/api_router.rs index cb16c26eb..c5f91c29e 100644 --- a/gateway/src/api/api_router.rs +++ b/gateway/src/api/api_router.rs @@ -17,6 +17,8 @@ use common::model::CommonResult; use crate::api::mr_router; use crate::api::ApiServiceState; +use super::ztm_router; + pub fn routers() -> Router { let router = Router::new() .route("/status", get(life_cycle_check)) @@ -27,7 +29,10 @@ pub fn routers() -> Router { .route("/blob", get(get_blob_object)) .route("/publish", post(publish_path_to_repo)); - Router::new().merge(router).merge(mr_router::routers()) + Router::new() + .merge(router) + .merge(mr_router::routers()) + .merge(ztm_router::routers()) } async fn get_blob_object( diff --git a/gateway/src/api/mod.rs b/gateway/src/api/mod.rs index a47fa161e..070abfda8 100644 --- a/gateway/src/api/mod.rs +++ b/gateway/src/api/mod.rs @@ -3,16 +3,20 @@ use std::path::PathBuf; use ceres::api_service::{ import_api_service::ImportApiService, mono_api_service::MonoApiService, ApiHandler, }; +use common::model::CommonOptions; use jupiter::context::Context; use venus::import_repo::repo::Repo; pub mod api_router; pub mod mr_router; pub mod oauth; +pub mod ztm_router; #[derive(Clone)] pub struct ApiServiceState { pub context: Context, + pub port: u16, + pub common: CommonOptions, } impl ApiServiceState { diff --git a/gateway/src/api/ztm_router.rs b/gateway/src/api/ztm_router.rs new file mode 100644 index 000000000..d933c4641 --- /dev/null +++ b/gateway/src/api/ztm_router.rs @@ -0,0 +1,90 @@ +use std::collections::HashMap; + +use axum::{ + extract::{Query, State}, + http::StatusCode, + routing::get, + Json, Router, +}; + +use common::model::CommonResult; + +use crate::api::ApiServiceState; + +pub fn routers() -> Router { + Router::new() + .route("/ztm/repo_provide", get(repo_provide)) + .route("/ztm/repo_folk", get(repo_folk)) +} + +async fn repo_provide( + Query(query): Query>, + state: State, +) -> Result>, (StatusCode, String)> { + let path = match query.get("path") { + Some(p) => p, + None => { + return Err((StatusCode::BAD_REQUEST, String::from("Path not provide\n"))); + } + }; + let bootstrap_node = match state.common.bootstrap_node.clone() { + Some(b) => b.clone(), + None => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + String::from("Bootstrap node not provide\n"), + )); + } + }; + let res = match gemini::http::handler::repo_provide( + state.port, + bootstrap_node, + state.context.clone(), + path.to_string(), + ) + .await + { + Ok(s) => CommonResult::success(Some(s)), + Err(err) => CommonResult::failed(err.as_str()), + }; + Ok(Json(res)) +} + +async fn repo_folk( + Query(query): Query>, + state: State, +) -> Result>, (StatusCode, String)> { + let identifier = match query.get("identifier") { + Some(i) => i, + None => { + return Err(( + StatusCode::BAD_REQUEST, + String::from("Identifier not provide\n"), + )); + } + }; + let local_port = match query.get("port") { + Some(i) => i, + None => { + return Err((StatusCode::BAD_REQUEST, String::from("Port not provide\n"))); + } + }; + let local_port = match local_port.parse::() { + Ok(i) => i, + Err(_) => { + return Err((StatusCode::BAD_REQUEST, String::from("Port not valid\n"))); + } + }; + + let res = gemini::http::handler::repo_folk( + state.common.ztm_agent_port, + identifier.to_string(), + local_port, + ) + .await; + let res = match res { + Ok(data) => CommonResult::success(Some(data)), + Err(err) => CommonResult::failed(&err.to_string()), + }; + Ok(Json(res)) +} diff --git a/gateway/src/https_server.rs b/gateway/src/https_server.rs index 6d0cad362..adaaba025 100644 --- a/gateway/src/https_server.rs +++ b/gateway/src/https_server.rs @@ -25,19 +25,15 @@ use tower_http::trace::TraceLayer; use ceres::lfs::LfsConfig; use ceres::protocol::{ServiceType, SmartProtocol, TransportProtocol}; use common::config::Config; -use common::enums::ZtmType; use common::model::{CommonOptions, GetParams}; use gemini::ztm::agent::{run_ztm_client, LocalZTMAgent}; -use gemini::ztm::hub::LocalZTMHub; use jupiter::context::Context; use jupiter::raw_storage::local_storage::LocalStorage; use crate::api::api_router::{self}; use crate::api::oauth::{self, OauthServiceState}; use crate::api::ApiServiceState; -use crate::ca_server::run_ca_server; use crate::lfs; -use crate::relay_server::run_relay_server; #[derive(Args, Clone, Debug)] pub struct HttpOptions { @@ -145,6 +141,8 @@ pub async fn app(config: Config, host: String, port: u16, common: CommonOptions) let api_state = ApiServiceState { context: context.clone(), + port, + common: common.clone(), }; // add RequestDecompressionLayer for handle gzip encode @@ -200,19 +198,6 @@ async fn get_method_router( TransportProtocol::Http, ); return crate::git_protocol::http::git_info_refs(params, pack_protocol).await; - } else if Regex::new(r"/ztm/repo_provide$") - .unwrap() - .is_match(uri.path()) - { - return gemini::http::handler::repo_provide( - state.port, - state.common.bootstrap_node.clone(), - state.context.clone(), - params, - ) - .await; - } else if Regex::new(r"/ztm/repo_folk$").unwrap().is_match(uri.path()) { - return gemini::http::handler::repo_folk(state.common.ztm_agent_port, params).await; } else { return Err(( StatusCode::NOT_FOUND, @@ -293,22 +278,13 @@ async fn put_method_router( } pub fn check_run_with_ztm(config: Config, common: CommonOptions) { - let ztm_type = match common.ztm { - Some(z) => z, - None => { - return; - } - }; - match ztm_type { - ZtmType::Agent => { - //Mega server join a ztm mesh - let bootstrap_node = match common.bootstrap_node { - Some(n) => n, - None => { - tracing::error!("bootstrap node is not provide"); - return; - } - }; + //Mega server join a ztm mesh + match common.bootstrap_node { + Some(bootstrap_node) => { + tracing::info!( + "The bootstrap node is {}, prepare to join ztm network", + bootstrap_node + ); let (peer_id, _) = vault::init(); let ztm_agent: LocalZTMAgent = LocalZTMAgent { agent_port: common.ztm_agent_port, @@ -319,30 +295,10 @@ pub fn check_run_with_ztm(config: Config, common: CommonOptions) { run_ztm_client(bootstrap_node, config, peer_id, ztm_agent).await }); } - ZtmType::Relay => { - //Start a sub thread to ca server - let config_clone = config.clone(); - let host_clone = common.host.clone(); - let ca_port = common.ca_port; - tokio::spawn(async move { run_ca_server(config_clone, host_clone, ca_port).await }); - thread::sleep(time::Duration::from_secs(5)); - - //Start a sub thread to run ztm-hub - let ca = format!("localhost:{ca_port}"); - let ztm_hub: LocalZTMHub = LocalZTMHub { - hub_port: common.ztm_hub_port, - ca, - name: vec!["relay".to_string()], - }; - ztm_hub.clone().start_ztm_hub(); - thread::sleep(time::Duration::from_secs(5)); - - //Start a sub thread to run relay server - let config_clone = config.clone(); - let common: CommonOptions = common.clone(); - tokio::spawn(async move { run_relay_server(config_clone, common).await }); + None => { + tracing::info!("The bootstrap node is not set, prepare to start mega sever locally"); } - } + }; } #[cfg(test)] diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index d267a80d6..f3c066a51 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -1,10 +1,8 @@ pub mod api; -pub mod ca_server; mod git_protocol; pub mod https_server; pub mod init; mod lfs; -pub mod relay_server; pub mod ssh_server; #[cfg(test)] diff --git a/gemini/src/http/handler.rs b/gemini/src/http/handler.rs index 35e033410..9e14b4336 100644 --- a/gemini/src/http/handler.rs +++ b/gemini/src/http/handler.rs @@ -1,7 +1,4 @@ -use axum::{body::Body, http::Response}; -use common::model::{CommonResult, GetParams}; use jupiter::context::Context; -use reqwest::StatusCode; use venus::import_repo::repo::Repo; use crate::{ @@ -11,26 +8,11 @@ use crate::{ pub async fn repo_provide( port: u16, - bootstrap_node: Option, + bootstrap_node: String, context: Context, - params: GetParams, -) -> Result, (StatusCode, String)> { - let bootstrap_node_clone = match bootstrap_node { - Some(b) => b.clone(), - None => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - String::from("Bootstrap node not provide\n"), - )); - } - }; - let path = match params.path.clone() { - Some(p) => p, - None => { - return Err((StatusCode::BAD_REQUEST, String::from("Path not provide\n"))); - } - }; - let url = format!("{bootstrap_node_clone}/api/v1/repo_provide"); + path: String, +) -> Result { + let url = format!("{bootstrap_node}/api/v1/repo_provide"); let git_model = context .services .git_db_storage @@ -42,10 +24,10 @@ pub async fn repo_provide( if let Some(m) = r { m } else { - return Err((StatusCode::BAD_REQUEST, String::from("Repo not found"))); + return Err(String::from("Repo not found")); } } - Err(_) => return Err((StatusCode::BAD_REQUEST, String::from("Repo not found"))), + Err(_) => return Err(String::from("Repo not found")), }; let repo: Repo = git_model.clone().into(); let git_ref = context @@ -70,40 +52,25 @@ pub async fn repo_provide( peer_online: true, }; share_repo(url.clone(), repo_info).await; - Ok(Response::builder().body(Body::from("success")).unwrap()) + Ok("success".to_string()) } pub async fn repo_folk( ztm_agent_port: u16, - params: GetParams, -) -> Result, (StatusCode, String)> { - tracing::info!("params:{:?}", params); - let identifier = match params.identifier.clone() { - Some(i) => i, - None => { - return Err(( - StatusCode::BAD_REQUEST, - String::from("Identifier not provide\n"), - )); - } - }; - let port = match params.port { - Some(i) => i, - None => { - return Err((StatusCode::BAD_REQUEST, String::from("Port not provide\n"))); - } - }; + identifier: String, + local_port: u16, +) -> Result { let remote_peer_id = match get_peer_id_from_identifier(identifier.clone()) { Ok(p) => p, - Err(e) => return Err((StatusCode::BAD_REQUEST, e)), + Err(e) => return Err(e), }; let remote_port = match get_remote_port_from_identifier(identifier.clone()) { Ok(p) => p, - Err(e) => return Err((StatusCode::BAD_REQUEST, e)), + Err(e) => return Err(e), }; let git_path = match get_git_path_from_identifier(identifier) { Ok(p) => p, - Err(e) => return Err((StatusCode::BAD_REQUEST, e)), + Err(e) => return Err(e), }; let agent: LocalZTMAgent = LocalZTMAgent { @@ -111,12 +78,12 @@ pub async fn repo_folk( }; let local_ep = match agent.get_ztm_local_endpoint().await { Ok(ep) => ep, - Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + Err(e) => return Err(e), }; let remote_ep = match agent.get_ztm_remote_endpoint(remote_peer_id.clone()).await { Ok(ep) => ep, - Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + Err(e) => return Err(e), }; let (peer_id, _) = vault::init(); @@ -132,14 +99,14 @@ pub async fn repo_folk( ZTM_APP_PROVIDER.to_string(), "tunnel".to_string(), bound_name.clone(), - port, + local_port, ) .await { Ok(_) => (), Err(s) => { tracing::error!("create app inbound, {s}"); - return Err((StatusCode::INTERNAL_SERVER_ERROR, s)); + return Err(s); } } tracing::info!("create app inbound successfully"); @@ -160,15 +127,11 @@ pub async fn repo_folk( } Err(s) => { tracing::error!("create app outbound, {s}"); - return Err((StatusCode::INTERNAL_SERVER_ERROR, s)); + return Err(s); } } - let msg = format!("git clone http://localhost:{port}/{git_path}"); - let json_string = serde_json::to_string(&CommonResult::success(Some(msg))).unwrap(); - Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(json_string)) - .unwrap()) + let msg = format!("git clone http://localhost:{local_port}/{git_path}"); + Ok(msg) } pub fn get_peer_id_from_identifier(identifier: String) -> Result { diff --git a/gemini/src/ztm/hub.rs b/gemini/src/ztm/hub.rs index 7f5c9a97d..c131a5dd4 100644 --- a/gemini/src/ztm/hub.rs +++ b/gemini/src/ztm/hub.rs @@ -54,7 +54,7 @@ pub trait ZTMCA { } pub struct LocalHub { - pub host: String, + pub hub_host: String, pub hub_port: u16, pub ca_port: u16, } @@ -103,7 +103,7 @@ impl ZTMCA for LocalHub { private_key: user_key.clone(), }; - let hub_address = format!("{}:{}", self.host, self.hub_port); + let hub_address = format!("{}:{}", self.hub_host, self.hub_port); let permit = ZTMUserPermit { ca: ca_certificate.clone(), agent, diff --git a/mega/config.toml b/mega/config.toml index 8c1b6429f..3c3060ddf 100644 --- a/mega/config.toml +++ b/mega/config.toml @@ -79,12 +79,6 @@ clean_cache_after_decode = true # The maximum meesage size in channel buffer while decode channel_message_size = 1_000_000 - -[ztm] -ca = "http://127.0.0.1:9999" -hub = "http://127.0.0.1:8888" -agent = "http://127.0.0.1:7777" - [lfs] ## IMPORTANT: The 'enable_split' feature can only be enabled for new databases. Existing databases do not support this feature. # Enable or disable splitting large files into smaller chunks diff --git a/mega/src/commands/service/relay.rs b/mega/src/commands/service/relay.rs deleted file mode 100644 index 21c7a8beb..000000000 --- a/mega/src/commands/service/relay.rs +++ /dev/null @@ -1,21 +0,0 @@ -// use clap::{ArgMatches, Args, Command, FromArgMatches}; - -// use common::{config::Config, errors::MegaResult}; -// use gateway::relay_server::{self, RelayOptions}; - -// pub fn cli() -> Command { -// RelayOptions::augment_args_for_update(Command::new("relay").about("Start Mega RELAY server")) -// } - -// pub(crate) async fn exec(config: Config, args: &ArgMatches) -> MegaResult { -// let relay_matchers = RelayOptions::from_arg_matches(args) -// .map_err(|err| err.exit()) -// .unwrap(); - -// tracing::info!("{relay_matchers:#?}"); -// relay_server::http_server(config, relay_matchers).await; -// Ok(()) -// } - -// #[cfg(test)] -// mod tests {}