Skip to content

Commit

Permalink
Merge pull request #37 from SpringQL/ci/pass
Browse files Browse the repository at this point in the history
ci: pass lint
  • Loading branch information
laysakura committed Feb 25, 2022
2 parents 1a54979 + 9358506 commit 690f22e
Show file tree
Hide file tree
Showing 17 changed files with 37 additions and 41 deletions.
Expand Up @@ -19,8 +19,8 @@ impl FileParser {
match file_type {
FileType::Tsv => {
let reader = csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(true)
.delimiter(b'\t')
.has_headers(true)
.from_path(file_path)?;
Ok(FileParser::Tsv(reader))
}
Expand Down
4 changes: 1 addition & 3 deletions springql-core/src/api/error.rs
Expand Up @@ -40,9 +40,7 @@ pub enum SpringError {
ThreadPoisoned(anyhow::Error),

#[error("invalid config")]
InvalidConfig {
source: anyhow::Error,
},
InvalidConfig { source: anyhow::Error },

#[error("invalid option (key `{key:?}`, value `{value:?}`)")]
InvalidOption {
Expand Down
14 changes: 8 additions & 6 deletions springql-core/src/stream_engine/autonomous_executor.rs
Expand Up @@ -51,9 +51,11 @@ pub(in crate::stream_engine) struct AutonomousExecutor {
event_queue: Arc<EventQueue>,

task_executor: TaskExecutor,
memory_state_machine_worker: MemoryStateMachineWorker,
performance_monitor_worker: PerformanceMonitorWorker,
purger_worker: PurgerWorker,

// just holds these ownership
_memory_state_machine_worker: MemoryStateMachineWorker,
_performance_monitor_worker: PerformanceMonitorWorker,
_purger_worker: PurgerWorker,
}

impl AutonomousExecutor {
Expand Down Expand Up @@ -88,9 +90,9 @@ impl AutonomousExecutor {
Self {
event_queue,
task_executor,
memory_state_machine_worker,
performance_monitor_worker,
purger_worker,
_memory_state_machine_worker: memory_state_machine_worker,
_performance_monitor_worker: performance_monitor_worker,
_purger_worker: purger_worker,
}
}

Expand Down
Expand Up @@ -28,7 +28,7 @@ use super::{

#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct MemoryStateMachineWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl MemoryStateMachineWorker {
Expand All @@ -47,6 +47,6 @@ impl MemoryStateMachineWorker {
memory_config.memory_state_transition_interval_msec,
),
);
Self { handle }
Self { _handle: handle }
}
}
Expand Up @@ -23,7 +23,7 @@ use super::{
/// 2. Report the performance to [AutonomousExecutor](crate::stream_processor::autonomous_executor::AutonomousExecutor) and web-console.
#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct PerformanceMonitorWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl PerformanceMonitorWorker {
Expand All @@ -37,6 +37,6 @@ impl PerformanceMonitorWorker {
worker_stop_coordinate,
PerformanceMonitorWorkerThreadArg::from(config),
);
Self { handle }
Self { _handle: handle }
}
}
Expand Up @@ -47,7 +47,7 @@ impl WebConsoleReporter {
match resp.text() {
Ok(body) => log::warn!("error response from web-console: {:?} - {}", e_status, body),
Err(e_resp) => log::warn!("error response (status {}) from web-console but failed to read response body: {:?}", e_status.status().unwrap(), e_resp),
}
}
}
}
}
Expand Down
Expand Up @@ -14,7 +14,7 @@ use self::purger_worker_thread::{PurgerWorkerThread, PurgerWorkerThreadArg};
/// Worker to execute pump and sink tasks.
#[derive(Debug)]
pub(super) struct PurgerWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl PurgerWorker {
Expand All @@ -28,6 +28,6 @@ impl PurgerWorker {
worker_stop_coordinate,
thread_arg,
);
Self { handle }
Self { _handle: handle }
}
}
4 changes: 2 additions & 2 deletions springql-core/src/stream_engine/autonomous_executor/task.rs
Expand Up @@ -24,7 +24,7 @@ use super::{

#[derive(Debug)]
pub(crate) enum Task {
Pump(PumpTask),
Pump(Box<PumpTask>),
Source(SourceTask),
Sink(SinkTask),
}
Expand All @@ -33,7 +33,7 @@ impl Task {
pub(super) fn new(edge: &Edge, pipeline_graph: &PipelineGraph) -> Self {
match edge {
Edge::Pump { pump_model, .. } => {
Self::Pump(PumpTask::new(pump_model.as_ref(), pipeline_graph))
Self::Pump(Box::new(PumpTask::new(pump_model.as_ref(), pipeline_graph)))
}
Edge::Source(s) => Self::Source(SourceTask::new(s)),
Edge::Sink(s) => Self::Sink(SinkTask::new(s)),
Expand Down
Expand Up @@ -29,8 +29,8 @@ pub(in crate::stream_engine) struct TaskExecutor {
task_executor_lock: Arc<TaskExecutorLock>,
repos: Arc<Repositories>,

generic_worker_pool: GenericWorkerPool,
source_worker_pool: SourceWorkerPool,
_generic_worker_pool: GenericWorkerPool,
_source_worker_pool: SourceWorkerPool,
}

impl TaskExecutor {
Expand All @@ -45,14 +45,14 @@ impl TaskExecutor {
task_executor_lock: task_executor_lock.clone(),
repos: repos.clone(),

generic_worker_pool: GenericWorkerPool::new(
_generic_worker_pool: GenericWorkerPool::new(
config.worker.n_generic_worker_threads,
event_queue.clone(),
worker_stop_coordinate.clone(),
task_executor_lock.clone(),
repos.clone(),
),
source_worker_pool: SourceWorkerPool::new(
_source_worker_pool: SourceWorkerPool::new(
config.worker.n_source_worker_threads,
event_queue,
worker_stop_coordinate,
Expand Down
Expand Up @@ -24,7 +24,7 @@ pub(super) struct GenericWorkerPool {
///
/// Mutation to workers only happens inside task executor lock like `PipelineUpdateLockGuard`,
/// so here uses RefCell instead of Mutex nor RwLock to avoid lock cost to workers.
workers: RefCell<Vec<GenericWorker>>,
_workers: RefCell<Vec<GenericWorker>>,
}

impl GenericWorkerPool {
Expand All @@ -46,7 +46,7 @@ impl GenericWorkerPool {
})
.collect();
Self {
workers: RefCell::new(workers),
_workers: RefCell::new(workers),
}
}
}
Expand Up @@ -15,7 +15,7 @@ use self::generic_worker_thread::GenericWorkerThread;
/// Worker to execute pump and sink tasks.
#[derive(Debug)]
pub(super) struct GenericWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl GenericWorker {
Expand All @@ -29,6 +29,6 @@ impl GenericWorker {
worker_stop_coordinate,
thread_arg,
);
Self { handle }
Self { _handle: handle }
}
}
Expand Up @@ -2,9 +2,7 @@

//! Source Scheduler dedicating to schedule source tasks eagerly at Moderate and Severe state.

use std::{cell::RefCell, collections::HashSet};

use rand::prelude::ThreadRng;
use std::collections::HashSet;

use crate::stream_engine::autonomous_executor::{
performance_metrics::PerformanceMetrics,
Expand All @@ -19,9 +17,7 @@ struct SourceTask {
}

#[derive(Debug, Default)]
pub(in crate::stream_engine::autonomous_executor) struct SourceScheduler {
rng: RefCell<ThreadRng>,
}
pub(in crate::stream_engine::autonomous_executor) struct SourceScheduler {}

impl Scheduler for SourceScheduler {
/// TODO [prioritize source with lower source-miss rate](https://gh01.base.toyota-tokyo.tech/SpringQL-internal/SpringQL/issues/122)
Expand Down
Expand Up @@ -24,7 +24,7 @@ pub(super) struct SourceWorkerPool {
///
/// Mutation to workers only happens inside task executor lock like `PipelineUpdateLockGuard`,
/// so here uses RefCell instead of Mutex nor RwLock to avoid lock cost to workers.
workers: RefCell<Vec<SourceWorker>>,
_workers: RefCell<Vec<SourceWorker>>,
}

impl SourceWorkerPool {
Expand All @@ -46,7 +46,7 @@ impl SourceWorkerPool {
})
.collect();
Self {
workers: RefCell::new(workers),
_workers: RefCell::new(workers),
}
}
}
Expand Up @@ -15,7 +15,7 @@ use self::source_worker_thread::SourceWorkerThread;
/// Worker to execute pump and sink tasks.
#[derive(Debug)]
pub(super) struct SourceWorker {
handle: WorkerHandle,
_handle: WorkerHandle,
}

impl SourceWorker {
Expand All @@ -29,6 +29,6 @@ impl SourceWorker {
worker_stop_coordinate,
thread_arg,
);
Self { handle }
Self { _handle: handle }
}
}
Expand Up @@ -20,7 +20,7 @@ impl TaskExecutorLock {
.0
.write()
.expect("another thread sharing the same TaskExecutorLock must not panic");
TaskExecutionBarrierGuard(write_lock)
TaskExecutionBarrierGuard(write_lock)
}

/// # Returns
Expand Down
Expand Up @@ -25,7 +25,7 @@ impl QueueId {
pump: &PumpModel,
upstream: &StreamName,
) -> Self {
let name = format!("{}-{}", pump.name().to_string(), upstream);
let name = format!("{}-{}", pump.name(), upstream);
match pump.input_type() {
PumpInputType::Row => Self::Row(RowQueueId::new(name)),
PumpInputType::Window => Self::Window(WindowQueueId::new(name)),
Expand Down
2 changes: 1 addition & 1 deletion test-web-console-mock/src/builder.rs
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;

use crate::{WebConsoleMock, request_body::PostTaskGraphBody};
use crate::{request_body::PostTaskGraphBody, WebConsoleMock};

#[derive(Default)]
pub struct WebConsoleMockBuilder {
Expand Down

0 comments on commit 690f22e

Please sign in to comment.