Skip to content

Commit

Permalink
feat: inference_fn changed from asynchronous to synchronous (#7)
Browse files Browse the repository at this point in the history
* feat: inference_fn changed from asynchronous to synchronous

- fix bugs
- complete unit test
- remove the directory parameter in TaskConfig

Resolve #1

* feat: add parameter `available_status` in inference

* perf: using taos new api `query_one`

* perf: Modify the data type of parameter for status to `&[&str]`

* fix: add error message

* test: finish test_concurrent_inference()

* test: get connections from the pool

* refactor: modify the order of SQL statements

* fix(inference): delete target_type in fn inference()
  • Loading branch information
lazyky committed Jul 26, 2023
1 parent dfe6c05 commit 8dd10b3
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 58 deletions.
11 changes: 5 additions & 6 deletions cml-core/src/core/inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::metadata::MetaData;
use anyhow::Result;
use deadpool::managed::{Manager, Pool};
use derive_getters::Getters;
use std::{future::Future, path::PathBuf};
use std::path::PathBuf;

#[derive(Builder, Getters)]
pub struct NewSample<F> {
Expand All @@ -15,23 +15,22 @@ pub struct NewSample<F> {
optional_tags: Option<Vec<F>>,
}

pub trait Inference<M, F, C: Manager> {
pub trait Inference<M, F, T, C: Manager> {
async fn init_inference(
&self,
target_type: M,
optional_fields: Option<Vec<M>>,
optional_tags: Option<Vec<M>>,
) -> Result<()>;

async fn inference<FN, R>(
async fn inference<FN>(
&self,
metadata: MetaData<F>,
target_type: M,
available_status: &[&str],
data: &mut Vec<NewSample<F>>,
pool: &Pool<C>,
inference_fn: FN,
) -> Result<()>
where
FN: FnOnce(&mut Vec<NewSample<F>>, &Pool<C>) -> R,
R: Future<Output = Result<Vec<NewSample<F>>>>;
FN: FnOnce(&mut Vec<NewSample<F>>, &str, T) -> Vec<NewSample<F>>;
}
9 changes: 3 additions & 6 deletions cml-core/src/core/task.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use anyhow::Result;
use chrono::Duration;
use derive_getters::Getters;
use std::path::PathBuf;

#[derive(Builder, Getters, Clone)]
pub struct TaskConfig {
pub struct TaskConfig<'a> {
min_start_count: usize,
min_update_count: usize,
work_dir: PathBuf,
local_dir: Option<PathBuf>,
working_status: Vec<String>,
working_status: Vec<&'a str>,
limit_time: Duration,
}

Expand All @@ -27,5 +24,5 @@ pub trait Task<M> {
fining_build_fn: FN,
) -> Result<()>
where
FN: Fn(&TaskConfig, &str) -> Result<()> + Send + Sync;
FN: Fn(&str) -> Result<()> + Send + Sync;
}
243 changes: 229 additions & 14 deletions cml-tdengine/src/core/inference.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::{models::stables::STable, TDengine};
use anyhow::Result;
use cml_core::{
core::inference::{Inference, NewSample},
handler::Handler,
metadata::MetaData,
};
use std::future::Future;
use std::time::{Duration, SystemTime};
use taos::{taos_query::Manager, *};

use crate::{models::stables::STable, TDengine};

impl<D: IntoDsn + Clone> Inference<Field, Value, Manager<TaosBuilder>> for TDengine<D> {
impl<D: IntoDsn + Clone> Inference<Field, Value, i64, Manager<TaosBuilder>> for TDengine<D> {
async fn init_inference(
&self,
target_type: Field,
Expand Down Expand Up @@ -39,24 +37,34 @@ impl<D: IntoDsn + Clone> Inference<Field, Value, Manager<TaosBuilder>> for TDeng
Ok(())
}

async fn inference<FN, R>(
async fn inference<FN>(
&self,
metadata: MetaData<Value>,
target_type: Field,
available_status: &[&str],
data: &mut Vec<NewSample<Value>>,
pool: &Pool<TaosBuilder>,
inference_fn: FN,
) -> Result<()>
where
FN: FnOnce(&mut Vec<NewSample<Value>>, &Pool<TaosBuilder>) -> R,
R: Future<Output = Result<Vec<NewSample<Value>>>>,
FN: FnOnce(&mut Vec<NewSample<Value>>, &str, i64) -> Vec<NewSample<Value>>,
{
let samples_with_res = inference_fn(data, pool).await?;

let taos = pool.get().await?;
taos.use_database("training_data").await?;
let mut stmt = Stmt::init(&taos)?;

taos.use_database("task").await?;
let last_task_time = taos
.query_one(format!(
"SELECT LAST(ts) FROM task.`{}` WHERE status IN ({})",
metadata.batch(),
available_status
.iter()
.map(|s| format!("'{}'", s))
.collect::<Vec<String>>()
.join(", ")
))
.await?
.unwrap_or_else(|| panic!("There is no task in batch: {}", metadata.batch()));
let samples_with_res = inference_fn(data, metadata.batch(), last_task_time);
taos.use_database("inference").await?;
let (tag_placeholder, field_placeholder) = metadata.get_placeholders();

stmt.prepare(format!(
Expand All @@ -78,7 +86,15 @@ impl<D: IntoDsn + Clone> Inference<Field, Value, Manager<TaosBuilder>> for TDeng
for sample in &samples_with_res {
let output = match sample.output() {
Some(value) => ColumnView::from(value.clone()),
None => ColumnView::null(1, target_type.ty()),
None => ColumnView::null(
1,
taos.query("SELECT * FROM inference LIMIT 0")
.await?
.fields()
.get(2)
.unwrap()
.ty(),
),
};

let mut values = vec![
Expand All @@ -98,8 +114,8 @@ impl<D: IntoDsn + Clone> Inference<Field, Value, Manager<TaosBuilder>> for TDeng
stmt.bind(&values)?;
current_ts += Duration::from_nanos(1).as_nanos() as i64;
}

stmt.add_batch()?;

stmt.execute()?;
Ok(())
}
Expand All @@ -112,6 +128,11 @@ mod tests {
options::{CacheModel, ReplicaNum, SingleSTable},
DatabaseBuilder,
};
use cml_core::{
core::inference::NewSampleBuilder, handler::Handler, metadata::MetaDataBuilder,
};
use std::fs;
use std::time::{Duration, SystemTime};

#[tokio::test]
async fn test_inference_init() -> Result<()> {
Expand Down Expand Up @@ -149,6 +170,200 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_inference() -> Result<()> {
let cml = TDengine::from_dsn("taos://");
let pool = cml.build_pool();
let taos = pool.get().await?;

taos.exec("DROP DATABASE IF EXISTS inference").await?;
taos.exec("DROP DATABASE IF EXISTS task").await?;
taos.exec(
"CREATE DATABASE IF NOT EXISTS inference
PRECISION 'ns'",
)
.await?;
taos.exec("CREATE DATABASE IF NOT EXISTS task PRECISION 'ns'")
.await?;
taos.exec(
"CREATE STABLE IF NOT EXISTS inference.inference
(ts TIMESTAMP, data_path NCHAR(255), output FLOAT)
TAGS (model_update_time TIMESTAMP)",
)
.await?;
taos.exec(
"CREATE STABLE IF NOT EXISTS task.task
(ts TIMESTAMP, status BINARY(8))
TAGS (model_update_time TIMESTAMP)",
)
.await?;
taos.exec(
"INSERT INTO task.`FUCK`
USING task.task
TAGS ('2022-08-08 18:18:18.518')
VALUES (NOW, 'TRAIN')",
)
.await?;
taos.exec(
"INSERT INTO task.`FUCK`
USING task.task
TAGS ('2022-08-08 18:18:18.518')
VALUES (NOW-2s, 'SUCCESS')",
)
.await?;
taos.exec(
"INSERT INTO task.`FUCK8`
USING task.task
TAGS ('2022-08-08 18:18:18.518')
VALUES (NOW, 'SUCCESS')",
)
.await?;

let model_update_time = (SystemTime::now() - Duration::from_secs(86400))
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
let batch_meta_1: MetaData<Value> = MetaDataBuilder::default()
.model_update_time(model_update_time)
.batch("FUCK".to_owned())
.inherent_field_num(3)
.inherent_tag_num(1)
.optional_field_num(0)
.build()?;
let batch_meta_2: MetaData<Value> = MetaDataBuilder::default()
.model_update_time(model_update_time)
.batch("FUCK8".to_owned())
.inherent_field_num(3)
.inherent_tag_num(1)
.optional_field_num(0)
.build()?;

fs::create_dir_all("/tmp/inference_dir/")?;
fs::write("/tmp/inference_dir/inference_data1.txt", b"8.8")?;
fs::write("/tmp/inference_dir/inference_data2.txt", b"98.8")?;
let mut batch_data_1 = vec![
NewSampleBuilder::default()
.data_path("/tmp/inference_dir/inference_data1.txt".into())
.build()?,
NewSampleBuilder::default()
.data_path("/tmp/inference_dir/inference_data2.txt".into())
.build()?,
];
let mut batch_data_2 = vec![NewSampleBuilder::default()
.data_path("/tmp/inference_dir/inference_data1.txt".into())
.build()?];

let available_status = vec!["SUCCESS"];
let last_batch_time_1: i64 = taos
.query_one(format!(
"SELECT LAST(ts) FROM task.`{}` WHERE status IN ({}) ",
batch_meta_1.batch(),
available_status
.iter()
.map(|s| format!("'{}'", s))
.collect::<Vec<String>>()
.join(", ")
))
.await?
.unwrap();
let last_batch_time_2: i64 = taos
.query_one(format!(
"SELECT LAST(ts) FROM task.`{}` WHERE status IN ({}) ",
batch_meta_2.batch(),
available_status
.iter()
.map(|s| format!("'{}'", s))
.collect::<Vec<String>>()
.join(", ")
))
.await?
.unwrap();
fs::write(
"/tmp/inference_dir/".to_string()
+ batch_meta_1.batch()
+ &last_batch_time_1.to_string()
+ ".txt",
b"10",
)?;
fs::write(
"/tmp/inference_dir/".to_string()
+ batch_meta_2.batch()
+ &last_batch_time_2.to_string()
+ ".txt",
b"20",
)?;
let inference_fn = |vec_data: &mut Vec<NewSample<Value>>,
batch: &str,
task_time: i64|
-> Vec<NewSample<Value>> {
let mut result: Vec<NewSample<Value>> = Vec::new();
let working_dir = "/tmp/inference_dir/".to_string();
let model_inference =
fs::read_to_string(working_dir + batch + &task_time.to_string() + ".txt")
.unwrap()
.parse::<f32>()
.unwrap();
for inference_data in vec_data.iter() {
// inference
let inference_result = fs::read_to_string(inference_data.data_path())
.unwrap()
.parse::<f32>()
.unwrap()
+ model_inference;
let output = if inference_result > 25.0 {
Some(Value::Float(inference_result))
} else {
None
};
result.push(
NewSampleBuilder::default()
.data_path(inference_data.data_path().to_path_buf())
.output(output)
.build()
.unwrap(),
);
}
result
};

tokio::spawn({
async move {
cml.inference(
batch_meta_1,
&available_status,
&mut batch_data_1,
&pool,
inference_fn,
)
.await
.unwrap();
cml.inference(
batch_meta_2,
&available_status,
&mut batch_data_2,
&pool,
inference_fn,
)
.await
.unwrap();
}
})
.await?;

let mut result = taos
.query("SELECT output FROM inference.inference ORDER BY output ASC")
.await?;
let records = result.to_records().await?;
assert_eq!(
vec![
vec![Value::Null(Ty::Float)],
vec![Value::Float(28.8)],
vec![Value::Float(108.8)]
],
records
);

fs::remove_dir_all("/tmp/inference_dir/")?;
taos.exec("DROP DATABASE IF EXISTS inference").await?;
taos.exec("DROP DATABASE IF EXISTS task").await?;
Ok(())
}
}

0 comments on commit 8dd10b3

Please sign in to comment.