Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: pass lint #37

Merged
merged 3 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ impl FileParser {
match file_type {
FileType::Tsv => {
let reader = csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(true)
.delimiter(b'\t')
.has_headers(true)
.from_path(file_path)?;
Ok(FileParser::Tsv(reader))
}
Expand Down
4 changes: 1 addition & 3 deletions springql-core/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ pub enum SpringError {
ThreadPoisoned(anyhow::Error),

#[error("invalid config")]
InvalidConfig {
source: anyhow::Error,
},
InvalidConfig { source: anyhow::Error },

#[error("invalid option (key `{key:?}`, value `{value:?}`)")]
InvalidOption {
Expand Down
14 changes: 8 additions & 6 deletions springql-core/src/stream_engine/autonomous_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ pub(in crate::stream_engine) struct AutonomousExecutor {
event_queue: Arc<EventQueue>,

task_executor: TaskExecutor,
memory_state_machine_worker: MemoryStateMachineWorker,
performance_monitor_worker: PerformanceMonitorWorker,
purger_worker: PurgerWorker,

// just holds these ownership
_memory_state_machine_worker: MemoryStateMachineWorker,
_performance_monitor_worker: PerformanceMonitorWorker,
_purger_worker: PurgerWorker,
}

impl AutonomousExecutor {
Expand Down Expand Up @@ -88,9 +90,9 @@ impl AutonomousExecutor {
Self {
event_queue,
task_executor,
memory_state_machine_worker,
performance_monitor_worker,
purger_worker,
_memory_state_machine_worker: memory_state_machine_worker,
_performance_monitor_worker: performance_monitor_worker,
_purger_worker: purger_worker,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{

#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct MemoryStateMachineWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl MemoryStateMachineWorker {
Expand All @@ -47,6 +47,6 @@ impl MemoryStateMachineWorker {
memory_config.memory_state_transition_interval_msec,
),
);
Self { handle }
Self { _handle: handle }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
/// 2. Report the performance to [AutonomousExecutor](crate::stream_processor::autonomous_executor::AutonomousExecutor) and web-console.
#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct PerformanceMonitorWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl PerformanceMonitorWorker {
Expand All @@ -37,6 +37,6 @@ impl PerformanceMonitorWorker {
worker_stop_coordinate,
PerformanceMonitorWorkerThreadArg::from(config),
);
Self { handle }
Self { _handle: handle }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl WebConsoleReporter {
match resp.text() {
Ok(body) => log::warn!("error response from web-console: {:?} - {}", e_status, body),
Err(e_resp) => log::warn!("error response (status {}) from web-console but failed to read response body: {:?}", e_status.status().unwrap(), e_resp),
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use self::purger_worker_thread::{PurgerWorkerThread, PurgerWorkerThreadArg};
/// Worker to execute pump and sink tasks.
#[derive(Debug)]
pub(super) struct PurgerWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl PurgerWorker {
Expand All @@ -28,6 +28,6 @@ impl PurgerWorker {
worker_stop_coordinate,
thread_arg,
);
Self { handle }
Self { _handle: handle }
}
}
4 changes: 2 additions & 2 deletions springql-core/src/stream_engine/autonomous_executor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::{

#[derive(Debug)]
pub(crate) enum Task {
Pump(PumpTask),
Pump(Box<PumpTask>),
Source(SourceTask),
Sink(SinkTask),
}
Expand All @@ -33,7 +33,7 @@ impl Task {
pub(super) fn new(edge: &Edge, pipeline_graph: &PipelineGraph) -> Self {
match edge {
Edge::Pump { pump_model, .. } => {
Self::Pump(PumpTask::new(pump_model.as_ref(), pipeline_graph))
Self::Pump(Box::new(PumpTask::new(pump_model.as_ref(), pipeline_graph)))
}
Edge::Source(s) => Self::Source(SourceTask::new(s)),
Edge::Sink(s) => Self::Sink(SinkTask::new(s)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub(in crate::stream_engine) struct TaskExecutor {
task_executor_lock: Arc<TaskExecutorLock>,
repos: Arc<Repositories>,

generic_worker_pool: GenericWorkerPool,
source_worker_pool: SourceWorkerPool,
_generic_worker_pool: GenericWorkerPool,
_source_worker_pool: SourceWorkerPool,
}

impl TaskExecutor {
Expand All @@ -45,14 +45,14 @@ impl TaskExecutor {
task_executor_lock: task_executor_lock.clone(),
repos: repos.clone(),

generic_worker_pool: GenericWorkerPool::new(
_generic_worker_pool: GenericWorkerPool::new(
config.worker.n_generic_worker_threads,
event_queue.clone(),
worker_stop_coordinate.clone(),
task_executor_lock.clone(),
repos.clone(),
),
source_worker_pool: SourceWorkerPool::new(
_source_worker_pool: SourceWorkerPool::new(
config.worker.n_source_worker_threads,
event_queue,
worker_stop_coordinate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(super) struct GenericWorkerPool {
///
/// Mutation to workers only happens inside task executor lock like `PipelineUpdateLockGuard`,
/// so here uses RefCell instead of Mutex nor RwLock to avoid lock cost to workers.
workers: RefCell<Vec<GenericWorker>>,
_workers: RefCell<Vec<GenericWorker>>,
}

impl GenericWorkerPool {
Expand All @@ -46,7 +46,7 @@ impl GenericWorkerPool {
})
.collect();
Self {
workers: RefCell::new(workers),
_workers: RefCell::new(workers),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use self::generic_worker_thread::GenericWorkerThread;
/// Worker to execute pump and sink tasks.
#[derive(Debug)]
pub(super) struct GenericWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl GenericWorker {
Expand All @@ -29,6 +29,6 @@ impl GenericWorker {
worker_stop_coordinate,
thread_arg,
);
Self { handle }
Self { _handle: handle }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

//! Source Scheduler dedicating to schedule source tasks eagerly at Moderate and Severe state.

use std::{cell::RefCell, collections::HashSet};

use rand::prelude::ThreadRng;
use std::collections::HashSet;

use crate::stream_engine::autonomous_executor::{
performance_metrics::PerformanceMetrics,
Expand All @@ -19,9 +17,7 @@ struct SourceTask {
}

#[derive(Debug, Default)]
pub(in crate::stream_engine::autonomous_executor) struct SourceScheduler {
rng: RefCell<ThreadRng>,
}
pub(in crate::stream_engine::autonomous_executor) struct SourceScheduler {}

impl Scheduler for SourceScheduler {
/// TODO [prioritize source with lower source-miss rate](https://gh01.base.toyota-tokyo.tech/SpringQL-internal/SpringQL/issues/122)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(super) struct SourceWorkerPool {
///
/// Mutation to workers only happens inside task executor lock like `PipelineUpdateLockGuard`,
/// so here uses RefCell instead of Mutex nor RwLock to avoid lock cost to workers.
workers: RefCell<Vec<SourceWorker>>,
_workers: RefCell<Vec<SourceWorker>>,
}

impl SourceWorkerPool {
Expand All @@ -46,7 +46,7 @@ impl SourceWorkerPool {
})
.collect();
Self {
workers: RefCell::new(workers),
_workers: RefCell::new(workers),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use self::source_worker_thread::SourceWorkerThread;
/// Worker to execute pump and sink tasks.
#[derive(Debug)]
pub(super) struct SourceWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl SourceWorker {
Expand All @@ -29,6 +29,6 @@ impl SourceWorker {
worker_stop_coordinate,
thread_arg,
);
Self { handle }
Self { _handle: handle }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl TaskExecutorLock {
.0
.write()
.expect("another thread sharing the same TaskExecutorLock must not panic");
TaskExecutionBarrierGuard(write_lock)
TaskExecutionBarrierGuard(write_lock)
}

/// # Returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl QueueId {
pump: &PumpModel,
upstream: &StreamName,
) -> Self {
let name = format!("{}-{}", pump.name().to_string(), upstream);
let name = format!("{}-{}", pump.name(), upstream);
match pump.input_type() {
PumpInputType::Row => Self::Row(RowQueueId::new(name)),
PumpInputType::Window => Self::Window(WindowQueueId::new(name)),
Expand Down
2 changes: 1 addition & 1 deletion test-web-console-mock/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;

use crate::{WebConsoleMock, request_body::PostTaskGraphBody};
use crate::{request_body::PostTaskGraphBody, WebConsoleMock};

#[derive(Default)]
pub struct WebConsoleMockBuilder {
Expand Down