Skip to content

Commit

Permalink
feat: improve port forward connections and adjust some http logs actions
Browse files Browse the repository at this point in the history
  • Loading branch information
hcavarsan committed Jun 11, 2024
1 parent 858fd93 commit b6aee68
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 81 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/kftray-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ flate2 = "1.0"
image = "0.24.9"
image_ascii = "0.1.1"
httparse = "1.9.2"
uuid = { version = "1.8.0", features = ["v4"] }
bytes = "1.6.0"

[dev-dependencies]
tempfile = "3.9"
Expand Down
199 changes: 124 additions & 75 deletions crates/kftray-tauri/src/kubeforward/logging.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::fs::{
self,
File,
OpenOptions,
};
use std::collections::HashMap;
use std::io::Read;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Context;
use bytes::Bytes;
use flate2::read::GzDecoder;
use httparse::{
Request,
Expand All @@ -18,37 +15,72 @@ use image::DynamicImage;
use image_ascii::TextGenerator;
use k8s_openapi::chrono::Utc;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::fs::{
self,
File,
OpenOptions,
};
use tokio::io::AsyncWriteExt;
use tokio::sync::{
mpsc,
RwLock,
};
use tokio::task::JoinHandle;
use uuid::Uuid;

#[derive(Clone)]
pub struct Logger {
sender: mpsc::Sender<LogEntry>,
handle: Arc<JoinHandle<()>>,
#[allow(dead_code)]
trace_map: Arc<RwLock<HashMap<String, String>>>,
}

enum LogEntry {
Request(Vec<u8>),
Response(Vec<u8>),
Request(Bytes, String),
Response(Bytes, String),
}

impl Logger {
pub fn new(log_file_path: PathBuf) -> anyhow::Result<Self> {
pub async fn new(log_file_path: PathBuf) -> anyhow::Result<Self> {
let (sender, mut receiver) = mpsc::channel(100);
let log_file_path = Arc::new(log_file_path);

let handle = tokio::spawn(async move {
while let Some(entry) = receiver.recv().await {
let log_file_path = log_file_path.clone();
match entry {
LogEntry::Request(buffer) => {
if let Err(e) = log_request(&buffer, &log_file_path).await {
eprintln!("Failed to log request: {:?}", e);
let log_file = Arc::new(RwLock::new(
OpenOptions::new()
.append(true)
.create(true)
.open(&log_file_path)
.await?,
));
let trace_map = Arc::new(RwLock::new(HashMap::new()));

let handle = tokio::spawn({
let log_file = log_file.clone();
let trace_map = trace_map.clone();
async move {
while let Some(entry) = receiver.recv().await {
let log_file = log_file.clone();
let trace_map = trace_map.clone();
match entry {
LogEntry::Request(buffer, request_id) => {
let trace_id = Uuid::new_v4().to_string();
trace_map
.write()
.await
.insert(request_id.clone(), trace_id.clone());
if let Err(e) = log_request(&buffer, &log_file, &trace_id).await {
eprintln!("Failed to log request: {:?}", e);
}
}
}
LogEntry::Response(buffer) => {
if let Err(e) = log_response(&buffer, &log_file_path).await {
eprintln!("Failed to log response: {:?}", e);
LogEntry::Response(buffer, request_id) => {
if let Some(trace_id) = trace_map.read().await.get(&request_id).cloned()
{
if let Err(e) = log_response(&buffer, &log_file, &trace_id).await {
eprintln!("Failed to log response: {:?}", e);
}
trace_map.write().await.remove(&request_id);
} else {
eprintln!("Trace ID not found for request ID: {}", request_id);
}
}
}
}
Expand All @@ -58,15 +90,24 @@ impl Logger {
Ok(Self {
sender,
handle: Arc::new(handle),
trace_map,
})
}

pub async fn log_request(&self, buffer: Vec<u8>) {
let _ = self.sender.send(LogEntry::Request(buffer)).await;
pub async fn log_request(&self, buffer: Bytes) -> String {
let request_id = Uuid::new_v4().to_string();
let _ = self
.sender
.send(LogEntry::Request(buffer, request_id.clone()))
.await;
request_id
}

pub async fn log_response(&self, buffer: Vec<u8>) {
let _ = self.sender.send(LogEntry::Response(buffer)).await;
pub async fn log_response(&self, buffer: Bytes, request_id: String) {
let _ = self
.sender
.send(LogEntry::Response(buffer, request_id))
.await;
}

pub async fn join_handle(&self) {
Expand All @@ -76,79 +117,81 @@ impl Logger {
join_handle.await.unwrap();
}
Err(_) => {
eprintln!("Failed to join handle: multiple references to Arc");
eprintln!("Failed to unwrap join handle");
}
}
}
}

pub fn create_log_file_path(config_id: i64, local_port: u16) -> anyhow::Result<PathBuf> {
let mut path = dirs::home_dir().unwrap();
pub async fn create_log_file_path(config_id: i64, local_port: u16) -> anyhow::Result<PathBuf> {
let mut path = dirs::home_dir().context("Failed to get home directory")?;

path.push(".kftray/http_logs");
fs::create_dir_all(&path)?;
fs::create_dir_all(&path).await?;
path.push(format!("{}_{}.log", config_id, local_port));
Ok(path)
}

async fn log_request(buffer: &[u8], log_file_path: &PathBuf) -> anyhow::Result<()> {
let mut log_file = OpenOptions::new()
.append(true)
.create(true)
.open(log_file_path)?;
writeln!(log_file, "\n----------------------------------------")?;
writeln!(log_file, "Request at: {}", Utc::now().to_rfc3339())?;
async fn log_request(
buffer: &[u8], log_file: &Arc<RwLock<File>>, trace_id: &str,
) -> anyhow::Result<()> {
let mut log_entry = String::new();
log_entry.push_str("\n----------------------------------------\n");
log_entry.push_str(&format!("Trace ID: {}\n", trace_id));
log_entry.push_str(&format!("Request at: {}\n", Utc::now().to_rfc3339()));

let mut headers = [httparse::EMPTY_HEADER; 64];
let mut req = Request::new(&mut headers);
match req.parse(buffer) {
Ok(Status::Complete(_)) => {
writeln!(log_file, "Method: {}", req.method.unwrap_or(""))?;
writeln!(log_file, "Path: {}", req.path.unwrap_or(""))?;
writeln!(log_file, "Version: {}", req.version.unwrap_or(0))?;
writeln!(log_file, "\nHeaders:")?;
log_entry.push_str(&format!("Method: {}\n", req.method.unwrap_or("")));
log_entry.push_str(&format!("Path: {}\n", req.path.unwrap_or("")));
log_entry.push_str(&format!("Version: {}\n", req.version.unwrap_or(0)));
log_entry.push_str("\n\nHeaders:\n");
for header in req.headers.iter() {
writeln!(
log_file,
"{}: {}",
log_entry.push_str(&format!(
"{}: {}\n",
header.name,
std::str::from_utf8(header.value).unwrap_or("")
)?;
));
}
}
Ok(Status::Partial) => {
writeln!(log_file, "Incomplete request")?;
// Do nothing if the request is incomplete
return Ok(());
}
Err(e) => {
writeln!(log_file, "Failed to parse request: {:?}", e)?;
Err(_) => {
// Do nothing if there's an error parsing the request
return Ok(());
}
}

log_file.flush()?;
let mut log_file = log_file.write().await;
log_file.write_all(log_entry.as_bytes()).await?;
log_file.flush().await?;
Ok(())
}

async fn log_response(buffer: &[u8], log_file_path: &PathBuf) -> anyhow::Result<()> {
let mut log_file = OpenOptions::new()
.append(true)
.create(true)
.open(log_file_path)?;
async fn log_response(
buffer: &[u8], log_file: &Arc<RwLock<File>>, trace_id: &str,
) -> anyhow::Result<()> {
let mut log_entry = String::new();
log_entry.push_str("\n----------------------------------------\n");
log_entry.push_str(&format!("Trace ID: {}\n", trace_id));
log_entry.push_str(&format!("Response at: {}\n", Utc::now().to_rfc3339()));

let mut headers = [httparse::EMPTY_HEADER; 64];
let mut res = Response::new(&mut headers);
match res.parse(buffer) {
Ok(Status::Complete(_)) => {
writeln!(log_file, "\n----------------------------------------")?;
writeln!(log_file, "Response at: {}", Utc::now().to_rfc3339())?;
writeln!(log_file, "Status: {}", res.code.unwrap_or(0))?;
writeln!(log_file, "\nHeaders:")?;
log_entry.push_str(&format!("Status: {}\n", res.code.unwrap_or(0)));
log_entry.push_str("\n\nHeaders:\n");
for header in res.headers.iter() {
writeln!(
log_file,
"{}: {}",
log_entry.push_str(&format!(
"{}: {}\n",
header.name,
std::str::from_utf8(header.value).unwrap_or("")
)?;
));
}

let headers_len = buffer
Expand All @@ -163,50 +206,56 @@ async fn log_response(buffer: &[u8], log_file_path: &PathBuf) -> anyhow::Result<
}) {
match decompress_gzip(body) {
Ok(decompressed_body) => {
log_body(&decompressed_body, &mut log_file, res.headers).await?;
log_body(&decompressed_body, &mut log_entry, res.headers).await?;
}
Err(e) => {
log_entry.push_str(&format!("Failed to decompress body: {:?}\n", e))
}
Err(e) => writeln!(log_file, "Failed to decompress body: {:?}", e)?,
}
} else {
log_body(body, &mut log_file, res.headers).await?;
log_body(body, &mut log_entry, res.headers).await?;
}
}
}
Ok(Status::Partial) => {
// Do nothing if the response is incomplete
return Ok(());
}
Err(_) => {
// Do nothing if there's an error parsing the response
return Ok(());
}
}

log_file.flush()?;
let mut log_file = log_file.write().await;
log_file.write_all(log_entry.as_bytes()).await?;
log_file.flush().await?;
Ok(())
}

async fn log_body<'a>(
body: &[u8], log_file: &mut File, headers: &[httparse::Header<'a>],
body: &[u8], log_entry: &mut String, headers: &[httparse::Header<'a>],
) -> anyhow::Result<()> {
if !body.is_empty() {
writeln!(log_file, "\nBody:")?;
log_entry.push_str("\nBody:\n");
if let Ok(body_str) = std::str::from_utf8(body) {
if let Ok(json_value) = serde_json::from_str::<Value>(body_str) {
writeln!(log_file, "{}", serde_json::to_string_pretty(&json_value)?)?;
log_entry.push_str(&format!("{}\n", serde_json::to_string_pretty(&json_value)?));
} else {
writeln!(log_file, "{}", body_str.trim_end())?;
log_entry.push_str(&format!("{}\n", body_str.trim_end()));
}
} else if is_image(headers) {
if let Ok(image) = image::load_from_memory(body) {
let ascii_art = convert_image_to_ascii(&image)?;
writeln!(log_file, "{}", ascii_art)?;
log_entry.push_str(&format!("{}\n", ascii_art));
} else {
writeln!(log_file, "Failed to convert image to ascii")?;
log_entry.push_str("Failed to convert image to ascii\n");
}
} else {
writeln!(log_file, "Body (as bytes): {:?}", body)?;
log_entry.push_str(&format!("Body (as bytes): {:?}\n", body));
}
} else {
writeln!(log_file, "Body is empty")?;
log_entry.push_str("Body is empty\n");
}
Ok(())
}
Expand Down
Loading

0 comments on commit b6aee68

Please sign in to comment.