Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into add_wrapping_crate
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuk committed Jun 20, 2022
2 parents 92a8456 + 80b79c4 commit 6e789c0
Show file tree
Hide file tree
Showing 17 changed files with 244 additions and 249 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ All other sections are for end-users.
- Refactor : Hide detail module structure ([#177](https://github.com/SpringQL/SpringQL/pull/177))
- Make private to internal modules
- When publishing members outside the module, we recommend re-export(`pub use`) rather than `pub(crate)`
- Refactor : refactor test for web-console ([#200](https://github.com/SpringQL/SpringQL/pull/200))
- relates security advisory [RUSTSEC-2020-0071](https://rustsec.org/advisories/RUSTSEC-2020-0071)
- remove test-web-console-mock crate and dependent simple-server
- add `stub_web_console` feature flag : for development and test only

## [v0.12.0]

Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ members = [
"springql-core",
"foreign-service",
"test-logger",
"test-web-console-mock",
]
6 changes: 1 addition & 5 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ notice = "warn"
ignore = [
#"RUSTSEC-0000-0000",

# temporary turn off : [Potential segfault in the time crate](https://rustsec.org/advisories/RUSTSEC-2020-0071)
# tracking issue : https://github.com/SpringQL/SpringQL/issues/173
"RUSTSEC-2020-0071",

]
# Threshold for security vulnerabilities, any vulnerability with a CVSS score
# lower than the range specified will be ignored. Note that ignored advisories
Expand Down Expand Up @@ -200,7 +196,7 @@ unknown-git = "warn"
# if not specified. If it is specified but empty, no registries are allowed.
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
# List of URLs for allowed Git repositories
allow-git = ["https://github.com/laysakura/simple-server"]
allow-git = []

[sources.allow-org]
# 1 or more github.com organizations to allow git sources for
Expand Down
16 changes: 6 additions & 10 deletions springql-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,27 @@ keywords = ["springql", "stream-processing"] # up to 5 keywords, each keyword sh
readme = "../README.md"
repository = "https://github.com/SpringQL/SpringQL"

[features]
stub_web_console=[]

[dependencies]
anyhow = "1.0"
thiserror = "1.0"
serde = {version = "1.0", features = ["derive"], default-features = false}
serde_json = "1.0"

config = {version = "0.13", features = ["toml"], default-features = false}

derive-new = "0.5"

ordered-float = "3.0"

fastrand = "1.5"
rand = {version = "0.8", features = ["small_rng"]}

env_logger = "0.9"
log = "0.4"
log-panics = {version = "2.0", features = ["with-backtrace"]}

petgraph = "0.6"

pest = "2.1"
pest_derive = "2.1"

reqwest = {version = "0.11", features = ["json", "blocking"], default-features = false}

once_cell = "1.8"

parking_lot = "0.12"
time = {version="0.3.9", features = ["formatting", "parsing", "macros"]}

Expand All @@ -52,4 +45,7 @@ socketcan = "1.7"
springql-foreign-service = {path = "../foreign-service"}
springql-test-logger = {path = "../test-logger"}
pretty_assertions = "1.0"
regex = "1.5"
float-cmp = "0.9"
tempfile = "3.3"
serde_derive = "1.0"
83 changes: 83 additions & 0 deletions springql-core/src/http_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use std::time::Duration;

use reqwest::StatusCode;
use serde_json::Value;

#[derive(Debug)]
pub struct ReqwestClient(reqwest::blocking::Client);
#[derive(Debug)]
pub struct Response(reqwest::blocking::Response);

#[derive(Debug)]
pub struct ReqwestError(reqwest::Error);

#[derive(Debug)]
pub struct RequestBuilder {
raw_builder: reqwest::blocking::RequestBuilder,
json_value: Option<serde_json::Value>,
}

impl ReqwestClient {
pub fn with_timeout(timeout: Duration) -> Self {
Self(
reqwest::blocking::Client::builder()
.timeout(Some(timeout))
.build()
.expect("failed to build a reqwest client"),
)
}

pub fn post(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(self.0.post(url))
}
}

impl RequestBuilder {
fn new(builder: reqwest::blocking::RequestBuilder) -> Self {
Self {
raw_builder: builder,
json_value: None,
}
}
pub fn json(self, json: Value) -> Self {
Self {
raw_builder: self.raw_builder,
json_value: Some(json),
}
}

pub fn send(self) -> Result<Response, ReqwestError> {
if let Some(json_value) = self.json_value {
Ok(Response(self.raw_builder.json(&json_value).send()?))
} else {
unreachable!(".json() is not called yet")
}
}
}

impl Response {
pub fn error_for_status_ref(&self) -> Result<&Response, ReqwestError> {
match self.0.error_for_status_ref() {
Ok(_) => Ok(self),
Err(e) => Err(ReqwestError::from(e)),
}
}

pub fn text(self) -> Result<String, ReqwestError> {
Ok(self.0.text()?)
}
}

impl ReqwestError {
pub fn status(&self) -> Option<StatusCode> {
self.0.status()
}
}

impl From<reqwest::Error> for ReqwestError {
fn from(e: reqwest::Error) -> Self {
Self(e)
}
}
9 changes: 9 additions & 0 deletions springql-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,12 @@ mod time;

/// public API for SpringQL
pub mod api;

#[cfg(not(feature = "stub_web_console"))]
mod http_blocking;

#[cfg(feature = "stub_web_console")]
mod stub_http_blocking;

#[cfg(feature = "stub_web_console")]
pub use stub_http_blocking::stubed_requests;
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ pub struct PerformanceMonitorWorkerThread;
#[derive(Debug)]
pub struct PerformanceMonitorWorkerThreadArg {
config: SpringConfig,
web_console_reporter: WebConsoleReporter,
web_console_reporter: Option<WebConsoleReporter>,
}

impl From<&SpringConfig> for PerformanceMonitorWorkerThreadArg {
fn from(config: &SpringConfig) -> Self {
let web_console_reporter = WebConsoleReporter::new(
&config.web_console.host,
config.web_console.port,
WallClockDuration::from_millis(config.web_console.timeout_msec as u64),
);
let web_console_reporter = if config.web_console.enable_report_post {
Some(WebConsoleReporter::new(
&config.web_console.host,
config.web_console.port,
WallClockDuration::from_millis(config.web_console.timeout_msec as u64),
))
} else {
None
};
Self {
config: config.clone(),
web_console_reporter,
Expand Down Expand Up @@ -121,12 +125,12 @@ impl WorkerThread for PerformanceMonitorWorkerThread {
.performance_metrics_summary_report_interval_msec as i32,
);

if thread_arg.config.web_console.enable_report_post {
if let Some(web_console_reporter) = &thread_arg.web_console_reporter {
state = Self::post_web_console(
state,
pipeline_derivatives.as_ref(),
metrics.as_ref(),
&thread_arg.web_console_reporter,
web_console_reporter,
thread_arg.config.web_console.report_interval_msec as i32,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

mod web_console_request;

#[cfg(not(feature = "stub_web_console"))]
use crate::http_blocking::{ReqwestClient, Response};
#[cfg(feature = "stub_web_console")]
use crate::stub_http_blocking::{ReqwestClient, Response};

use crate::stream_engine::{
autonomous_executor::{
performance_metrics::PerformanceMetrics,
Expand All @@ -15,15 +20,12 @@ use crate::stream_engine::{
#[derive(Debug)]
pub struct WebConsoleReporter {
url: String,
client: reqwest::blocking::Client,
client: ReqwestClient,
}

impl WebConsoleReporter {
pub fn new(host: &str, port: u16, timeout: WallClockDuration) -> Self {
let client = reqwest::blocking::Client::builder()
.timeout(Some(*timeout.as_std()))
.build()
.expect("failed to build a reqwest client");
let client = ReqwestClient::with_timeout(*timeout.as_std());

let url = format!("http://{}:{}/task-graph", host, port);

Expand All @@ -33,15 +35,15 @@ impl WebConsoleReporter {
pub fn report(&self, metrics: &PerformanceMetrics, graph: &TaskGraph) {
let request = WebConsoleRequest::from_metrics(metrics, graph);

let res = self.client.post(&self.url).json(&request.to_json()).send();
let res = self.client.post(&self.url).json(request.to_json()).send();

match res {
Ok(resp) => self.handle_response(resp),
Err(e) => log::warn!("failed to POST metrics to web-console: {:?}", e),
}
}

fn handle_response(&self, resp: reqwest::blocking::Response) {
fn handle_response(&self, resp: Response) {
let res_status = resp.error_for_status_ref();
match res_status {
Ok(_) => log::debug!("successfully POSTed metrics to web-console"),
Expand Down
86 changes: 86 additions & 0 deletions springql-core/src/stub_http_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use std::{sync::RwLock, time::Duration};

use once_cell::sync::OnceCell;
use reqwest::StatusCode;
use serde_json::Value;

#[derive(Debug)]
pub struct ReqwestClient;
#[derive(Debug)]
pub struct Response;

#[derive(Debug)]
pub struct ReqwestError;

#[derive(Debug)]
pub struct RequestBuilder {
json_value: Option<serde_json::Value>,
}

impl ReqwestClient {
pub fn with_timeout(_timeout: Duration) -> Self {
Self
}

pub fn post(&self, _url: &str) -> RequestBuilder {
RequestBuilder::new()
}
}

static REQUESTS: OnceCell<RwLock<Vec<Value>>> = OnceCell::new();

/// gets stubed request JsonValues
pub fn stubed_requests() -> Vec<Value> {
let req_store = REQUESTS.get_or_init(|| RwLock::new(Vec::new()));
let mut store = req_store.write().unwrap();
let result = store.clone();
store.clear();
result
}

impl RequestBuilder {
fn new() -> Self {
Self { json_value: None }
}
pub fn json(self, json: Value) -> Self {
Self {
json_value: Some(json),
}
}

pub fn send(self) -> Result<Response, ReqwestError> {
if let Some(json_value) = self.json_value {
let req_store = REQUESTS.get_or_init(|| RwLock::new(Vec::new()));
let mut store = req_store.write().unwrap();
store.push(json_value);

Ok(Response)
} else {
unreachable!()
}
}
}

impl Response {
pub fn error_for_status_ref(&self) -> Result<&Response, ReqwestError> {
Ok(self)
}

pub fn text(self) -> Result<String, ReqwestError> {
Ok("stubbed request".to_string())
}
}

impl ReqwestError {
pub fn status(&self) -> Option<StatusCode> {
Some(StatusCode::INTERNAL_SERVER_ERROR)
}
}

impl From<reqwest::Error> for ReqwestError {
fn from(_e: reqwest::Error) -> Self {
Self
}
}
1 change: 0 additions & 1 deletion springql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ springql-core = {path="../springql-core"}
[dev-dependencies]
springql-foreign-service = {path = "../foreign-service"}
springql-test-logger = {path = "../test-logger"}
springql-test-web-console-mock = {path = "../test-web-console-mock"}
pretty_assertions = "1.0"
regex = "1.5"
float-cmp = "0.9"
Expand Down

0 comments on commit 6e789c0

Please sign in to comment.