Skip to content

Commit

Permalink
feat: add new workload pod and fixed orphan tcp connections
Browse files Browse the repository at this point in the history
  • Loading branch information
hcavarsan committed Jun 19, 2024
1 parent 17406ea commit 32d0dd6
Show file tree
Hide file tree
Showing 15 changed files with 568 additions and 292 deletions.
1 change: 0 additions & 1 deletion a

This file was deleted.

51 changes: 34 additions & 17 deletions crates/kftray-tauri/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use rusqlite::{
};
use serde_json::{
json,
to_value,
Value as JsonValue,
};

Expand All @@ -20,12 +19,21 @@ fn is_value_blank(value: &JsonValue) -> bool {
}
}

fn remove_blank_fields(value: &mut JsonValue) {
fn is_value_default(value: &serde_json::Value, default_config: &serde_json::Value) -> bool {
*value == *default_config
}

fn remove_blank_or_default_fields(value: &mut JsonValue, default_config: &JsonValue) {
match value {
JsonValue::Object(map) => {
let keys_to_remove: Vec<String> = map
.iter()
.filter(|(_, v)| is_value_blank(v))
.filter(|(k, v)| {
let default_v = &default_config[k];
is_value_blank(v)
|| (default_v != &JsonValue::Array(vec![])
&& is_value_default(v, default_v))
})
.map(|(k, _)| k.clone())
.collect();

Expand All @@ -34,12 +42,12 @@ fn remove_blank_fields(value: &mut JsonValue) {
}

for value in map.values_mut() {
remove_blank_fields(value);
remove_blank_or_default_fields(value, default_config);
}
}
JsonValue::Array(arr) => {
for value in arr {
remove_blank_fields(value);
remove_blank_or_default_fields(value, default_config);
}
}
_ => (),
Expand Down Expand Up @@ -278,17 +286,16 @@ pub fn update_config(config: Config) -> Result<(), String> {

// function to export configs to a json file
#[tauri::command]

pub async fn export_configs() -> Result<String, String> {
let mut configs = read_configs().map_err(|e| e.to_string())?;

for config in &mut configs {
config.id = None; // Ensure that the id is None before exporting
}

let mut json_config = to_value(configs).map_err(|e| e.to_string())?;

remove_blank_fields(&mut json_config);
let mut json_config = serde_json::to_value(configs).map_err(|e| e.to_string())?;
let default_config = serde_json::to_value(Config::default()).map_err(|e| e.to_string())?;
remove_blank_or_default_fields(&mut json_config, &default_config);

let json = serde_json::to_string(&json_config).map_err(|e| e.to_string())?;

Expand Down Expand Up @@ -415,8 +422,7 @@ mod tests {

// Test `remove_blank_fields` function
#[test]

fn test_remove_blank_fields() {
fn test_remove_blank_or_default_fields() {
let mut obj = json!({
"name": "Test",
"empty_string": " ",
Expand All @@ -431,7 +437,22 @@ mod tests {
]
});

remove_blank_fields(&mut obj);
// Define the default configuration for comparison
let default_config = json!({
"name": "",
"empty_string": "",
"nested": {
"blank": "",
"non_blank": ""
},
"array": [
{
"blank_field": ""
}
]
});

remove_blank_or_default_fields(&mut obj, &default_config);

assert!(obj.get("empty_string").is_none());

Expand All @@ -442,12 +463,8 @@ mod tests {
Some(&json!("value"))
);

assert_eq!(
obj.get("array").unwrap().as_array().unwrap()[0]["blank_field"],
json!(null)
);
assert!(obj.get("array").unwrap()[0].get("blank_field").is_none());
}

#[test]

fn test_merge_json_values() {
Expand Down
153 changes: 95 additions & 58 deletions crates/kftray-tauri/src/kubeforward/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use rand::{
distributions::Alphanumeric,
Rng,
};
use tokio::sync::Notify;
use tokio::task::JoinHandle;

use crate::kubeforward::kubecontext::create_client_with_specific_context;
Expand All @@ -50,6 +51,7 @@ use crate::{
lazy_static! {
pub static ref CHILD_PROCESSES: Arc<Mutex<HashMap<String, JoinHandle<()>>>> =
Arc::new(Mutex::new(HashMap::new()));
pub static ref CANCEL_NOTIFIER: Arc<Notify> = Arc::new(Notify::new());
}

pub async fn start_port_forward_udp(
Expand All @@ -73,16 +75,32 @@ async fn start_port_forward(
let mut child_handles = Vec::new();

for config in configs.iter() {
let selector = TargetSelector::ServiceName(config.service.clone().unwrap());
// Determine the selector based on the workload type
let selector = match config.workload_type.as_str() {
"pod_label" => {
// Using `target` field as the pod label
TargetSelector::PodLabel(config.target.clone().unwrap_or_default())
}
_ => TargetSelector::ServiceName(config.service.clone().unwrap_or_default()),
};

let remote_port = Port::from(config.remote_port as i32);
let context_name = Some(config.context.clone());
let kubeconfig = Some(config.kubeconfig.clone());
let namespace = config.namespace.clone();
let target = Target::new(selector, remote_port, namespace);
let target = Target::new(selector, remote_port, namespace.clone());

log::info!("Remote Port: {}", config.remote_port);
log::info!("Local Port: {}", config.local_port);
log::debug!("Attempting to forward to service: {:?}", &config.service);
log::debug!(
"Attempting to forward to {}: {:?}",
if config.workload_type.as_str() == "pod_label" {
"pod label"
} else {
"service"
},
&config.service
);

let local_address_clone = config.local_address.clone();

Expand All @@ -108,9 +126,14 @@ async fn start_port_forward(
match forward_result {
Ok((actual_local_port, handle)) => {
log::info!(
"{} port forwarding is set up on local port: {:?} for service: {:?}",
"{} port forwarding is set up on local port: {:?} for {}: {:?}",
protocol.to_uppercase(),
actual_local_port,
if config.workload_type.as_str() == "pod_label" {
"pod label"
} else {
"service"
},
&config.service
);

Expand Down Expand Up @@ -176,7 +199,7 @@ async fn start_port_forward(
responses.push(CustomResponse {
id: config.id,
service: config.service.clone().unwrap(),
namespace: config.namespace.clone(),
namespace: namespace.clone(),
local_port: actual_local_port,
remote_port: config.remote_port,
context: config.context.clone(),
Expand All @@ -194,8 +217,13 @@ async fn start_port_forward(
}
Err(e) => {
let error_message = format!(
"Failed to start {} port forwarding for service {}: {}",
"Failed to start {} port forwarding for {} {}: {}",
protocol.to_uppercase(),
if config.workload_type.as_str() == "pod_label" {
"pod label"
} else {
"service"
},
config.service.clone().unwrap_or_default(),
e
);
Expand All @@ -206,7 +234,12 @@ async fn start_port_forward(
}
Err(e) => {
let error_message = format!(
"Failed to create PortForward for service {}: {}",
"Failed to create PortForward for {} {}: {}",
if config.workload_type.as_str() == "pod_label" {
"pod label"
} else {
"service"
},
config.service.clone().unwrap_or_default(),
e
);
Expand Down Expand Up @@ -234,7 +267,6 @@ async fn start_port_forward(

Ok(responses)
}

#[tauri::command]
pub async fn stop_all_port_forward() -> Result<Vec<CustomResponse>, String> {
log::info!("Attempting to stop all port forwards");
Expand All @@ -246,6 +278,9 @@ pub async fn stop_all_port_forward() -> Result<Vec<CustomResponse>, String> {
e.to_string()
})?;

// Notify all port forwarding tasks to cancel
CANCEL_NOTIFIER.notify_waiters();

let handle_map: HashMap<String, tokio::task::JoinHandle<()>> =
CHILD_PROCESSES.lock().unwrap().drain().collect();

Expand Down Expand Up @@ -305,6 +340,8 @@ pub async fn stop_all_port_forward() -> Result<Vec<CustomResponse>, String> {
"Aborting port forwarding task for config_id: {}",
config_id_str
);

// Abort the handle, which should naturally be cancelled now
handle.abort();

let client_clone = client.clone();
Expand Down Expand Up @@ -387,6 +424,10 @@ pub async fn stop_all_port_forward() -> Result<Vec<CustomResponse>, String> {
pub async fn stop_port_forward(
_service_name: String, config_id: String,
) -> Result<CustomResponse, String> {
let cancellation_notifier = CANCEL_NOTIFIER.clone();
cancellation_notifier.notify_waiters();

// Retrieve composite key representing the child process
let composite_key = {
let child_processes = CHILD_PROCESSES.lock().unwrap();
child_processes
Expand All @@ -396,67 +437,64 @@ pub async fn stop_port_forward(
};

if let Some(composite_key) = composite_key {
let handle = {
// Remove and retrieve child process handle
let join_handle = {
let mut child_processes = CHILD_PROCESSES.lock().unwrap();
println!("child_processes: {:?}", child_processes);
child_processes.remove(&composite_key)
};

if let Some(handle) = handle {
handle.abort();

let (config_id_str, service_name) = composite_key.split_once('_').unwrap_or(("", ""));
let config_id_parsed = config_id_str.parse::<i64>().unwrap_or_default();

let configs_result = config::get_configs().await;
match configs_result {
Ok(configs) => {
let config = configs
.iter()
.find(|c| c.id.map_or(false, |id| id == config_id_parsed));
if let Some(join_handle) = join_handle {
println!("Join handle: {:?}", join_handle);
join_handle.abort();
}

if let Some(config) = config {
if config.domain_enabled.unwrap_or_default() {
let hostfile_comment = format!(
"kftray custom host for {} - {}",
service_name, config_id_str
);
// Split the composite key to get config_id and service_name
let (config_id_str, service_name) = composite_key.split_once('_').unwrap_or(("", ""));
let config_id_parsed = config_id_str.parse::<i64>().unwrap_or_default();

let hosts_builder = HostsBuilder::new(hostfile_comment);
// Retrieve configs and perform necessary updates
match config::get_configs().await {
Ok(configs) => {
if let Some(config) = configs
.iter()
.find(|c| c.id.map_or(false, |id| id == config_id_parsed))
{
if config.domain_enabled.unwrap_or_default() {
let hostfile_comment = format!(
"kftray custom host for {} - {}",
service_name, config_id_str
);

hosts_builder.write().map_err(|e| {
log::error!(
"Failed to remove from the hostfile for {}: {}",
service_name,
e
);
let hosts_builder = HostsBuilder::new(hostfile_comment);

e.to_string()
})?;
if let Err(e) = hosts_builder.write() {
log::error!(
"Failed to remove from the hostfile for {}: {}",
service_name,
e
);
return Err(e.to_string());
}
} else {
log::warn!("Config with id '{}' not found.", config_id_str);
}

Ok(CustomResponse {
id: None,
service: service_name.to_string(),
namespace: String::new(),
local_port: 0,
remote_port: 0,
context: String::new(),
protocol: String::new(),
stdout: String::from("Service port forwarding has been stopped"),
stderr: String::new(),
status: 0,
})
} else {
log::warn!("Config with id '{}' not found.", config_id_str);
}
Err(e) => Err(format!("Failed to retrieve configs: {}", e)),

Ok(CustomResponse {
id: None,
service: service_name.to_string(),
namespace: String::new(),
local_port: 0,
remote_port: 0,
context: String::new(),
protocol: String::new(),
stdout: String::from("Service port forwarding has been stopped"),
stderr: String::new(),
status: 0,
})
}
} else {
Err(format!(
"Failed to stop port forwarding process for config_id '{}'",
config_id
))
Err(e) => Err(format!("Failed to retrieve configs: {}", e)),
}
} else {
Err(format!(
Expand All @@ -465,7 +503,6 @@ pub async fn stop_port_forward(
))
}
}

fn get_pod_manifest_path() -> PathBuf {
let home_dir = dirs::home_dir().expect("Failed to resolve home directory");

Expand Down
Loading

0 comments on commit 32d0dd6

Please sign in to comment.