-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
plumber.rs
166 lines (138 loc) · 4.38 KB
/
plumber.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
use crate::start::generate_random_port;
// for manager struct
use async_trait::async_trait;
use deadpool::managed;
use std::{
io::{BufRead, BufReader},
process::{Command, Stdio},
time::Duration,
};
use axum::{
body::Body,
extract::{Extension, State},
http::Request,
response::{IntoResponse, Response},
};
use hyper::{client::HttpConnector, Uri};
type Client = hyper::client::Client<HttpConnector, Body>;
// Define the Plumber Struct
pub struct Plumber {
pub host: String,
pub port: u16,
pub process: std::process::Child,
}
// Plumber methods for spawning, checking alive status and killing
impl Plumber {
pub fn spawn(host: &str, filepath: &str) -> Self {
let port = generate_random_port(host);
#[cfg(debug_assertions)]
println!("about to spawn plumber");
let process = spawn_plumber(host, port, filepath);
// #[cfg(debug_assertions)]
println!("Spawning plumber API at {host}:{port}");
Self {
host: host.to_string(),
port,
process,
}
}
pub fn is_alive(&mut self) -> bool {
let status = self.process.try_wait();
match status {
Ok(Some(_)) => true,
Ok(None) => false,
Err(_) => false,
}
}
pub async fn proxy_request(&mut self, client: Client, mut req: Request<Body>) -> Response {
let mut uri = req.uri().clone().into_parts(); // get the URI
//let host = self.host.as_str();
uri.authority = Some(
format!("{}:{}", self.host, self.port)
.as_str()
.parse()
.unwrap(),
);
#[cfg(debug_assertions)]
println!("about to proxy");
// TODO enable https or other schemes
uri.scheme = Some("http".parse().unwrap());
*req.uri_mut() = Uri::from_parts(uri).unwrap();
client.request(req).await.unwrap().into_response()
}
}
// This struct will contain the iterator that is used in the axum
// app to cycle through ports. though that might not be necessary
// since the Plumber struct contains the port
// the plumber struct will be returned by the pool and
// can be used in the axum route directly
pub struct PrManager {
// ports: Arc<Mutex<Cycle<std::vec::IntoIter<u16>>>>
pub host: String,
pub pr_file: String,
}
#[derive(Debug)]
pub enum Error {
Fail,
}
#[async_trait]
impl managed::Manager for PrManager {
type Type = Plumber;
type Error = Error;
async fn create(&self) -> Result<Plumber, Error> {
let host = self.host.as_str();
let filepath = self.pr_file.as_str();
Ok(Plumber::spawn(host, filepath))
}
async fn recycle(&self, _conn: &mut Plumber) -> managed::RecycleResult<Error> {
Ok(())
}
fn detach(&self, obj: &mut Plumber) {
let _killed_process = obj.process.kill();
}
}
// spawn plumber
use std::process::Child;
pub fn spawn_plumber(host: &str, port: u16, filepath: &str) -> Child {
// start the R processes
let mut pr_child = Command::new("R")
.arg("-e")
// the defines the R command that is used to start plumber
.arg(format!(
"plumber::plumb('{filepath}')$run(host = '{host}', port = {port})"
))
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to start R process");
#[cfg(debug_assertions)]
println!("theoretically have spawned plumber");
// capture stderr
let stderr = pr_child.stderr.take().expect("stdout to be read");
let reader = BufReader::new(stderr);
// read lines from buffer. when "Running swagger" is captured
// then we sleep for 1/10th of a second to let the api start and continue
for line in reader.lines().flatten() {
if line.contains("Running swagger") || line.contains("Running rapidoc") {
std::thread::sleep(Duration::from_millis(100));
break;
}
}
pr_child
}
type Pool = managed::Pool<PrManager>;
pub async fn plumber_handler(
State(client): State<Client>,
Extension(pr_pool): Extension<Pool>,
req: Request<Body>,
) -> Response {
#[cfg(debug_assertions)]
println!("accessing handler");
pr_pool
.get()
.await
.unwrap()
.proxy_request(client, req)
.await
}