diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 955b36215852..e9276ae54123 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -149,7 +149,7 @@ dependencies = [ "cfg_aliases", "chrono", "csv", - "flatbuffers", + "flatbuffers 0.8.3", "hex", "indexmap", "lazy_static", @@ -1086,10 +1086,12 @@ dependencies = [ "datafusion", "deadqueue", "enum_primitive", + "flatbuffers 0.7.0", "flexbuffers", "futures", "futures-timer 3.0.2", "hex", + "http-auth-basic", "ipc-channel", "itertools 0.9.0", "lazy_static", @@ -1291,12 +1293,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" -[[package]] -name = "dtoa" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d7ed2934d741c6b37e33e3832298e8850b53fd2d2bea03873375596c7cea4e" - [[package]] name = "either" version = "1.6.1" @@ -1399,6 +1395,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +[[package]] +name = "flatbuffers" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf606a9d6674ad82094652789cf69b0e6e598c2d3c9137c28970eec13f11644" +dependencies = [ + "bitflags", + "smallvec", +] + [[package]] name = "flatbuffers" version = "0.8.3" @@ -1851,6 +1857,15 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-auth-basic" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "887a45f9bf1e5e7023b8c02bcb9494f278dd7c0116a5a9cb6fb0f9952ffe8562" +dependencies = [ + "base64 0.12.3", +] + [[package]] name = "http-body" version = "0.3.1" @@ -1914,7 +1929,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.5", + "pin-project", "socket2", "tokio 0.2.25", "tower-service", @@ -1938,7 +1953,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.5", + "pin-project", "socket2", "tokio 1.2.0", "tower-service", @@ -2010,11 +2025,11 @@ dependencies = [ [[package]] name = "input_buffer" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" dependencies = [ - "bytes 0.5.6", + "bytes 1.0.1", ] [[package]] @@ -2929,33 +2944,13 @@ dependencies = [ "indexmap", ] -[[package]] -name = "pin-project" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffbc8e94b38ea3d2d8ba92aea2983b503cd75d0888d75b86bb37970b5698e15" -dependencies = [ - "pin-project-internal 0.4.27", -] - [[package]] name = "pin-project" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63" dependencies = [ - "pin-project-internal 1.0.5", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65ad2ae56b6abe3a1ee25f15ee605bacadb9a764edaba9c2bf4103800d4a1895" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -3465,7 +3460,7 @@ dependencies = [ "pin-project-lite 0.2.4", "serde", "serde_json", - "serde_urlencoded 0.7.0", + "serde_urlencoded", "tokio 0.2.25", "tokio-tls", "url", @@ -3502,7 +3497,7 @@ dependencies = [ "rustls", "serde", "serde_json", - "serde_urlencoded 0.7.0", + "serde_urlencoded", "tokio 1.2.0", "tokio-native-tls", "tokio-rustls", @@ -3765,18 +3760,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_urlencoded" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" -dependencies = [ - "dtoa", - "itoa", - "serde", - "url", -] - [[package]] name = "serde_urlencoded" version = "0.7.0" @@ -3994,7 +3977,7 @@ dependencies = [ "futures", "humantime 2.1.0", "log", - "pin-project 1.0.5", + "pin-project", "rand 0.7.3", "static_assertions", "tarpc-plugins", @@ -4274,14 +4257,14 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.11.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d9e878ad426ca286e4dcae09cbd4e1973a7f8987d97570e2469703dd7f5720c" +checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" dependencies = [ "futures-util", "log", - "pin-project 0.4.27", - "tokio 0.2.25", + "pin-project", + "tokio 1.2.0", "tungstenite", ] @@ -4340,7 +4323,7 @@ dependencies = [ "http-body 0.4.0", "hyper 0.14.4", "percent-encoding", - "pin-project 1.0.5", + "pin-project", "prost", "prost-derive", "tokio 1.2.0", @@ -4373,7 +4356,7 @@ dependencies = [ "futures-core", "futures-util", "indexmap", - "pin-project 1.0.5", + "pin-project", "rand 0.8.3", "slab", "tokio 1.2.0", @@ -4435,7 +4418,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 1.0.5", + "pin-project", "tracing", ] @@ -4453,18 +4436,18 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" [[package]] name = "tungstenite" -version = "0.11.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23" +checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" dependencies = [ - "base64 0.12.3", + "base64 0.13.0", "byteorder", - "bytes 0.5.6", + "bytes 1.0.1", "http", "httparse", "input_buffer", "log", - "rand 0.7.3", + "rand 0.8.3", "sha-1 0.9.4", "url", "utf-8", @@ -4559,12 +4542,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "urlencoding" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9232eb53352b4442e40d7900465dfc534e8cb2dc8f18656fcb2ac16112b5593" - [[package]] name = "utf-8" version = "0.7.5" @@ -4629,30 +4606,31 @@ dependencies = [ [[package]] name = "warp" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f41be6df54c97904af01aa23e613d4521eed7ab23537cede692d4058f6449407" +version = "0.3.0" +source = "git+https://github.com/seanmonstar/warp#553bd491d5a7327e470a65b0904bfc8942e7b926" dependencies = [ - "bytes 0.5.6", + "bytes 1.0.1", "futures", "headers", "http", - "hyper 0.13.10", + "hyper 0.14.4", "log", "mime", "mime_guess", "multipart", - "pin-project 0.4.27", + "percent-encoding", + "pin-project", "scoped-tls", "serde", "serde_json", - "serde_urlencoded 0.6.1", - "tokio 0.2.25", + "serde_urlencoded", + "tokio 1.2.0", + "tokio-stream", "tokio-tungstenite", + "tokio-util 0.6.3", "tower-service", "tracing", "tracing-futures", - "urlencoding", ] [[package]] diff --git a/rust/cubestore/Cargo.toml b/rust/cubestore/Cargo.toml index e524712720b6..7fdcd1a227b4 100644 --- a/rust/cubestore/Cargo.toml +++ b/rust/cubestore/Cargo.toml @@ -17,7 +17,7 @@ ipc-channel = { version = "0.14.1" } base64 = "0.13.0" bumpalo = "3.6.1" tokio = { version = "1.0", features = ["full", "rt"] } -warp = "0.2" +warp = { git = 'https://github.com/seanmonstar/warp', version = "0.3.0" } sqlparser = "0.7.0" serde_derive = "1.0.115" serde = "1.0.115" @@ -72,3 +72,5 @@ tarpc = { version = "0.24", features = ["tokio1"] } pin-project-lite = "0.2.4" paste = "1.0.4" mysql_common = "0.26.0" +flatbuffers = "0.7.0" +http-auth-basic = "0.1.2" diff --git a/rust/cubestore/flatbuffers-codegen.sh b/rust/cubestore/flatbuffers-codegen.sh new file mode 100755 index 000000000000..f0196a790e0e --- /dev/null +++ b/rust/cubestore/flatbuffers-codegen.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +cd src/codegen +flatc --rust http_message.fbs diff --git a/rust/cubestore/src/codegen/http_message.fbs b/rust/cubestore/src/codegen/http_message.fbs new file mode 100644 index 000000000000..6c55a1ff3690 --- /dev/null +++ b/rust/cubestore/src/codegen/http_message.fbs @@ -0,0 +1,34 @@ +union HttpCommand { + HttpQuery, + HttpResultSet, + HttpError +} + +table HttpMessage { + message_id: uint; + command: HttpCommand; +} + +table HttpQuery { + query: string; +} + +table HttpError { + error: string; +} + +table HttpResultSet { + columns: [string]; + rows: [HttpRow]; +} + +table HttpRow { + values: [HttpColumnValue]; +} + +table HttpColumnValue { + string_value: string; +} + + +root_type HttpMessage; diff --git a/rust/cubestore/src/codegen/http_message_generated.rs b/rust/cubestore/src/codegen/http_message_generated.rs new file mode 100644 index 000000000000..eab20a099de8 --- /dev/null +++ b/rust/cubestore/src/codegen/http_message_generated.rs @@ -0,0 +1,679 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +#[allow(unused_imports)] +use std::cmp::Ordering; +#[allow(unused_imports)] +use std::mem; + +extern crate flatbuffers; +#[allow(unused_imports)] +use self::flatbuffers::EndianScalar; + +#[allow(non_camel_case_types)] +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub enum HttpCommand { + NONE = 0, + HttpQuery = 1, + HttpResultSet = 2, + HttpError = 3, +} + +pub const ENUM_MIN_HTTP_COMMAND: u8 = 0; +pub const ENUM_MAX_HTTP_COMMAND: u8 = 3; + +impl<'a> flatbuffers::Follow<'a> for HttpCommand { + type Inner = Self; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + flatbuffers::read_scalar_at::(buf, loc) + } +} + +impl flatbuffers::EndianScalar for HttpCommand { + #[inline] + fn to_little_endian(self) -> Self { + let n = u8::to_le(self as u8); + let p = &n as *const u8 as *const HttpCommand; + unsafe { *p } + } + #[inline] + fn from_little_endian(self) -> Self { + let n = u8::from_le(self as u8); + let p = &n as *const u8 as *const HttpCommand; + unsafe { *p } + } +} + +impl flatbuffers::Push for HttpCommand { + type Output = HttpCommand; + #[inline] + fn push(&self, dst: &mut [u8], _rest: &[u8]) { + flatbuffers::emplace_scalar::(dst, *self); + } +} + +#[allow(non_camel_case_types)] +pub const ENUM_VALUES_HTTP_COMMAND: [HttpCommand; 4] = [ + HttpCommand::NONE, + HttpCommand::HttpQuery, + HttpCommand::HttpResultSet, + HttpCommand::HttpError, +]; + +#[allow(non_camel_case_types)] +pub const ENUM_NAMES_HTTP_COMMAND: [&'static str; 4] = + ["NONE", "HttpQuery", "HttpResultSet", "HttpError"]; + +pub fn enum_name_http_command(e: HttpCommand) -> &'static str { + let index = e as u8; + ENUM_NAMES_HTTP_COMMAND[index as usize] +} + +pub struct HttpCommandUnionTableOffset {} +pub enum HttpMessageOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct HttpMessage<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for HttpMessage<'a> { + type Inner = HttpMessage<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> HttpMessage<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + HttpMessage { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args HttpMessageArgs, + ) -> flatbuffers::WIPOffset> { + let mut builder = HttpMessageBuilder::new(_fbb); + if let Some(x) = args.command { + builder.add_command(x); + } + builder.add_message_id(args.message_id); + builder.add_command_type(args.command_type); + builder.finish() + } + + pub const VT_MESSAGE_ID: flatbuffers::VOffsetT = 4; + pub const VT_COMMAND_TYPE: flatbuffers::VOffsetT = 6; + pub const VT_COMMAND: flatbuffers::VOffsetT = 8; + + #[inline] + pub fn message_id(&self) -> u32 { + self._tab + .get::(HttpMessage::VT_MESSAGE_ID, Some(0)) + .unwrap() + } + #[inline] + pub fn command_type(&self) -> HttpCommand { + self._tab + .get::(HttpMessage::VT_COMMAND_TYPE, Some(HttpCommand::NONE)) + .unwrap() + } + #[inline] + pub fn command(&self) -> Option> { + self._tab + .get::>>( + HttpMessage::VT_COMMAND, + None, + ) + } + #[inline] + #[allow(non_snake_case)] + pub fn command_as_http_query(&self) -> Option> { + if self.command_type() == HttpCommand::HttpQuery { + self.command().map(|u| HttpQuery::init_from_table(u)) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn command_as_http_result_set(&self) -> Option> { + if self.command_type() == HttpCommand::HttpResultSet { + self.command().map(|u| HttpResultSet::init_from_table(u)) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn command_as_http_error(&self) -> Option> { + if self.command_type() == HttpCommand::HttpError { + self.command().map(|u| HttpError::init_from_table(u)) + } else { + None + } + } +} + +pub struct HttpMessageArgs { + pub message_id: u32, + pub command_type: HttpCommand, + pub command: Option>, +} +impl<'a> Default for HttpMessageArgs { + #[inline] + fn default() -> Self { + HttpMessageArgs { + message_id: 0, + command_type: HttpCommand::NONE, + command: None, + } + } +} +pub struct HttpMessageBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> HttpMessageBuilder<'a, 'b> { + #[inline] + pub fn add_message_id(&mut self, message_id: u32) { + self.fbb_ + .push_slot::(HttpMessage::VT_MESSAGE_ID, message_id, 0); + } + #[inline] + pub fn add_command_type(&mut self, command_type: HttpCommand) { + self.fbb_.push_slot::( + HttpMessage::VT_COMMAND_TYPE, + command_type, + HttpCommand::NONE, + ); + } + #[inline] + pub fn add_command(&mut self, command: flatbuffers::WIPOffset) { + self.fbb_ + .push_slot_always::>(HttpMessage::VT_COMMAND, command); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> HttpMessageBuilder<'a, 'b> { + let start = _fbb.start_table(); + HttpMessageBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum HttpQueryOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct HttpQuery<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for HttpQuery<'a> { + type Inner = HttpQuery<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> HttpQuery<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + HttpQuery { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args HttpQueryArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = HttpQueryBuilder::new(_fbb); + if let Some(x) = args.query { + builder.add_query(x); + } + builder.finish() + } + + pub const VT_QUERY: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn query(&self) -> Option<&'a str> { + self._tab + .get::>(HttpQuery::VT_QUERY, None) + } +} + +pub struct HttpQueryArgs<'a> { + pub query: Option>, +} +impl<'a> Default for HttpQueryArgs<'a> { + #[inline] + fn default() -> Self { + HttpQueryArgs { query: None } + } +} +pub struct HttpQueryBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> HttpQueryBuilder<'a, 'b> { + #[inline] + pub fn add_query(&mut self, query: flatbuffers::WIPOffset<&'b str>) { + self.fbb_ + .push_slot_always::>(HttpQuery::VT_QUERY, query); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> HttpQueryBuilder<'a, 'b> { + let start = _fbb.start_table(); + HttpQueryBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum HttpErrorOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct HttpError<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for HttpError<'a> { + type Inner = HttpError<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> HttpError<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + HttpError { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args HttpErrorArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = HttpErrorBuilder::new(_fbb); + if let Some(x) = args.error { + builder.add_error(x); + } + builder.finish() + } + + pub const VT_ERROR: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn error(&self) -> Option<&'a str> { + self._tab + .get::>(HttpError::VT_ERROR, None) + } +} + +pub struct HttpErrorArgs<'a> { + pub error: Option>, +} +impl<'a> Default for HttpErrorArgs<'a> { + #[inline] + fn default() -> Self { + HttpErrorArgs { error: None } + } +} +pub struct HttpErrorBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> HttpErrorBuilder<'a, 'b> { + #[inline] + pub fn add_error(&mut self, error: flatbuffers::WIPOffset<&'b str>) { + self.fbb_ + .push_slot_always::>(HttpError::VT_ERROR, error); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> HttpErrorBuilder<'a, 'b> { + let start = _fbb.start_table(); + HttpErrorBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum HttpResultSetOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct HttpResultSet<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for HttpResultSet<'a> { + type Inner = HttpResultSet<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> HttpResultSet<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + HttpResultSet { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args HttpResultSetArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = HttpResultSetBuilder::new(_fbb); + if let Some(x) = args.rows { + builder.add_rows(x); + } + if let Some(x) = args.columns { + builder.add_columns(x); + } + builder.finish() + } + + pub const VT_COLUMNS: flatbuffers::VOffsetT = 4; + pub const VT_ROWS: flatbuffers::VOffsetT = 6; + + #[inline] + pub fn columns( + &self, + ) -> Option>> { + self._tab.get::>, + >>(HttpResultSet::VT_COLUMNS, None) + } + #[inline] + pub fn rows( + &self, + ) -> Option>>> { + self._tab.get::>>, + >>(HttpResultSet::VT_ROWS, None) + } +} + +pub struct HttpResultSetArgs<'a> { + pub columns: Option< + flatbuffers::WIPOffset>>, + >, + pub rows: Option< + flatbuffers::WIPOffset>>>, + >, +} +impl<'a> Default for HttpResultSetArgs<'a> { + #[inline] + fn default() -> Self { + HttpResultSetArgs { + columns: None, + rows: None, + } + } +} +pub struct HttpResultSetBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> HttpResultSetBuilder<'a, 'b> { + #[inline] + pub fn add_columns( + &mut self, + columns: flatbuffers::WIPOffset< + flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset<&'b str>>, + >, + ) { + self.fbb_ + .push_slot_always::>(HttpResultSet::VT_COLUMNS, columns); + } + #[inline] + pub fn add_rows( + &mut self, + rows: flatbuffers::WIPOffset< + flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, + >, + ) { + self.fbb_ + .push_slot_always::>(HttpResultSet::VT_ROWS, rows); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> HttpResultSetBuilder<'a, 'b> { + let start = _fbb.start_table(); + HttpResultSetBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum HttpRowOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct HttpRow<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for HttpRow<'a> { + type Inner = HttpRow<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> HttpRow<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + HttpRow { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args HttpRowArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = HttpRowBuilder::new(_fbb); + if let Some(x) = args.values { + builder.add_values(x); + } + builder.finish() + } + + pub const VT_VALUES: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn values( + &self, + ) -> Option>>> { + self._tab.get::>>, + >>(HttpRow::VT_VALUES, None) + } +} + +pub struct HttpRowArgs<'a> { + pub values: Option< + flatbuffers::WIPOffset< + flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>>, + >, + >, +} +impl<'a> Default for HttpRowArgs<'a> { + #[inline] + fn default() -> Self { + HttpRowArgs { values: None } + } +} +pub struct HttpRowBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> HttpRowBuilder<'a, 'b> { + #[inline] + pub fn add_values( + &mut self, + values: flatbuffers::WIPOffset< + flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, + >, + ) { + self.fbb_ + .push_slot_always::>(HttpRow::VT_VALUES, values); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> HttpRowBuilder<'a, 'b> { + let start = _fbb.start_table(); + HttpRowBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum HttpColumnValueOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct HttpColumnValue<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for HttpColumnValue<'a> { + type Inner = HttpColumnValue<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> HttpColumnValue<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + HttpColumnValue { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args HttpColumnValueArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = HttpColumnValueBuilder::new(_fbb); + if let Some(x) = args.string_value { + builder.add_string_value(x); + } + builder.finish() + } + + pub const VT_STRING_VALUE: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn string_value(&self) -> Option<&'a str> { + self._tab + .get::>(HttpColumnValue::VT_STRING_VALUE, None) + } +} + +pub struct HttpColumnValueArgs<'a> { + pub string_value: Option>, +} +impl<'a> Default for HttpColumnValueArgs<'a> { + #[inline] + fn default() -> Self { + HttpColumnValueArgs { string_value: None } + } +} +pub struct HttpColumnValueBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> HttpColumnValueBuilder<'a, 'b> { + #[inline] + pub fn add_string_value(&mut self, string_value: flatbuffers::WIPOffset<&'b str>) { + self.fbb_.push_slot_always::>( + HttpColumnValue::VT_STRING_VALUE, + string_value, + ); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> HttpColumnValueBuilder<'a, 'b> { + let start = _fbb.start_table(); + HttpColumnValueBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +#[inline] +pub fn get_root_as_http_message<'a>(buf: &'a [u8]) -> HttpMessage<'a> { + flatbuffers::get_root::>(buf) +} + +#[inline] +pub fn get_size_prefixed_root_as_http_message<'a>(buf: &'a [u8]) -> HttpMessage<'a> { + flatbuffers::get_size_prefixed_root::>(buf) +} + +#[inline] +pub fn finish_http_message_buffer<'a, 'b>( + fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, + root: flatbuffers::WIPOffset>, +) { + fbb.finish(root, None); +} + +#[inline] +pub fn finish_size_prefixed_http_message_buffer<'a, 'b>( + fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, + root: flatbuffers::WIPOffset>, +) { + fbb.finish_size_prefixed(root, None); +} diff --git a/rust/cubestore/src/codegen/mod.rs b/rust/cubestore/src/codegen/mod.rs new file mode 100644 index 000000000000..8f7e61233544 --- /dev/null +++ b/rust/cubestore/src/codegen/mod.rs @@ -0,0 +1 @@ +pub mod http_message_generated; diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index 5b7a51a10f4d..e5381e940cec 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -4,10 +4,11 @@ pub mod processing_loop; use crate::cluster::{Cluster, ClusterImpl, ClusterMetaStoreClient}; use crate::config::injection::{get_service, get_service_typed, DIService, Injector, InjectorRef}; use crate::config::processing_loop::ProcessingLoop; +use crate::http::HttpServer; use crate::import::limits::ConcurrencyLimits; use crate::import::{ImportService, ImportServiceImpl}; use crate::metastore::{MetaStore, MetaStoreRpcClient, RocksMetaStore}; -use crate::mysql::{MySqlAuth, MySqlAuthDefaultImpl, MySqlServer}; +use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService}; use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl}; use crate::queryplanner::{QueryPlanner, QueryPlannerImpl}; use crate::remotefs::gcs::GCSRemoteFs; @@ -90,6 +91,10 @@ impl CubeServices { async move { mysql_server.processing_loop().await }, )); } + if self.injector.has_service_typed::().await { + let http_server = self.injector.get_service_typed::().await; + futures.push(tokio::spawn(async move { http_server.run_server().await })); + } } else { let cluster = self.cluster.clone(); futures.push(tokio::spawn(async move { @@ -124,6 +129,13 @@ impl CubeServices { .stop_processing() .await?; } + if self.injector.has_service_typed::().await { + self.injector + .get_service_typed::() + .await + .stop_processing() + .await; + } self.scheduler.stop_processing_loops()?; stop_track_event_loop().await; Ok(()) @@ -168,6 +180,8 @@ pub trait ConfigObj: DIService { fn bind_address(&self) -> &Option; + fn http_bind_address(&self) -> &Option; + fn query_timeout(&self) -> u64; fn not_used_timeout(&self) -> u64; @@ -204,6 +218,7 @@ pub struct ConfigObjImpl { pub select_worker_pool_size: usize, pub job_runners_count: usize, pub bind_address: Option, + pub http_bind_address: Option, pub query_timeout: u64, pub select_workers: Vec, pub worker_bind_address: Option, @@ -248,6 +263,10 @@ impl ConfigObj for ConfigObjImpl { &self.bind_address } + fn http_bind_address(&self) -> &Option { + &self.http_bind_address + } + fn query_timeout(&self) -> u64 { self.query_timeout } @@ -351,6 +370,12 @@ impl Config { .map(|v| v.parse::().unwrap()) .unwrap_or(3306u16)), )), + http_bind_address: Some(env::var("CUBESTORE_HTTP_BIND_ADDR").ok().unwrap_or( + format!("0.0.0.0:{}", env::var("CUBESTORE_HTTP_PORT") + .ok() + .map(|v| v.parse::().unwrap()) + .unwrap_or(3030u16)), + )), query_timeout: env::var("CUBESTORE_QUERY_TIMEOUT") .ok() .map(|v| v.parse::().unwrap()) @@ -406,6 +431,7 @@ impl Config { select_worker_pool_size: 0, job_runners_count: 4, bind_address: None, + http_bind_address: None, query_timeout: 15, select_workers: Vec::new(), worker_bind_address: None, @@ -757,8 +783,8 @@ impl Config { if self.config_obj.bind_address().is_some() { self.injector - .register_typed::(async move |_| { - Arc::new(MySqlAuthDefaultImpl) + .register_typed::(async move |_| { + Arc::new(SqlAuthDefaultImpl) }) .await; @@ -776,6 +802,21 @@ impl Config { ) }) .await; + + self.injector + .register_typed::(async move |i| { + HttpServer::new( + i.get_service_typed::() + .await + .http_bind_address() + .as_ref() + .unwrap() + .to_string(), + i.get_service_typed().await, + i.get_service_typed().await, + ) + }) + .await; } } diff --git a/rust/cubestore/src/http/mod.rs b/rust/cubestore/src/http/mod.rs index 839f52a661a0..4bae4e79e043 100644 --- a/rust/cubestore/src/http/mod.rs +++ b/rust/cubestore/src/http/mod.rs @@ -1,45 +1,403 @@ use std::sync::Arc; -use log::debug; -use serde_derive::Deserialize; use warp::{Filter, Rejection}; -use crate::sql::SqlService; +use crate::codegen::http_message_generated::{ + get_root_as_http_message, HttpColumnValue, HttpColumnValueArgs, HttpError, HttpErrorArgs, + HttpMessageArgs, HttpQuery, HttpQueryArgs, HttpResultSet, HttpResultSetArgs, HttpRow, + HttpRowArgs, +}; +use crate::mysql::SqlAuthService; +use crate::sql::{SqlQueryContext, SqlService}; +use crate::store::DataFrame; +use crate::table::TableValue; +use crate::util::WorkerLoop; use crate::CubeError; +use futures::{SinkExt, StreamExt}; +use hex::ToHex; +use http_auth_basic::Credentials; +use log::error; +use log::info; +use std::net::SocketAddr; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use warp::filters::ws::{Message, Ws}; +use warp::http::StatusCode; +use warp::reject::Reject; -#[derive(Deserialize, Debug)] -pub struct SqlQueryBody { - query: String, +pub struct HttpServer { + bind_address: String, + sql_service: Arc, + auth: Arc, + worker_loop: WorkerLoop, + cancel_token: CancellationToken, } -pub async fn run_server(sql_service: Arc) -> Result<(), CubeError> { - let sql_service_filter = warp::any().map(move || sql_service.clone()); +crate::di_service!(HttpServer, []); + +#[derive(Debug)] +pub enum WsError { + NotAuthorized, +} + +impl Reject for WsError {} + +impl HttpServer { + pub fn new( + bind_address: String, + auth: Arc, + sql_service: Arc, + ) -> Arc { + Arc::new(Self { + bind_address, + auth, + sql_service, + worker_loop: WorkerLoop::new("HttpServer message processing"), + cancel_token: CancellationToken::new(), + }) + } + + pub async fn run_server(&self) -> Result<(), CubeError> { + let (tx, mut rx) = + mpsc::channel::<(mpsc::Sender, SqlQueryContext, HttpMessage)>(10000); + let auth_service = self.auth.clone(); + let tx_to_move_filter = warp::any().map(move || tx.clone()); + + let auth_filter = warp::any() + .and(warp::header::optional("authorization")) + .and_then(move |auth_header: Option| { + let auth_service = auth_service.clone(); + async move { + let res = HttpServer::authorize(auth_service, auth_header).await; + match res { + Ok(user) => Ok(SqlQueryContext { user }), + Err(_) => Err(warp::reject::custom(WsError::NotAuthorized)), + } + } + }); + let query_route = warp::path!("ws") + .and(tx_to_move_filter) + .and(auth_filter) + .and(warp::ws::ws()) + .and_then(|tx: mpsc::Sender<(mpsc::Sender, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move { + let tx_to_move = tx.clone(); + let sql_query_context = sql_query_context.clone(); + Result::<_, Rejection>::Ok(ws.on_upgrade(async move |mut web_socket| { + let (response_tx, mut response_rx) = mpsc::channel::(1000); + loop { + tokio::select! { + Some(res) = response_rx.recv() => { + let send_res = web_socket.send(Message::binary(res.bytes())).await; + if let Err(e) = send_res { + error!("Websocket message send error: {:?}", e) + } + } + Some(msg) = web_socket.next() => { + match msg { + Err(e) => { + error!("Websocket error: {:?}", e); + break; + } + Ok(msg) => { + if msg.is_binary() { + match HttpMessage::read(msg.into_bytes()) { + Err(e) => error!("Websocket message read error: {:?}", e), + Ok(msg) => { + if let Err(e) = tx_to_move.send((response_tx.clone(), sql_query_context.clone(), msg)).await { + error!("Websocket channel error: {:?}", e); + break; + } + } + }; + } else if msg.is_ping() { + let send_res = web_socket.send(Message::pong(Vec::new())).await; + if let Err(e) = send_res { + error!("Websocket ping send error: {:?}", e) + } + } else if msg.is_close() { + break; + } else { + error!("Websocket received non binary msg: {:?}", msg); + break; + } + } + } + } + }; + }; + })) + }).recover(|err: Rejection| async move { + if let Some(ws_error) = err.find::() { + match ws_error { + WsError::NotAuthorized => Ok(warp::reply::with_status("Not authorized", StatusCode::FORBIDDEN)) + } + } else { + Err(err) + } + + }); + + let sql_service = self.sql_service.clone(); + + let addr: SocketAddr = self.bind_address.parse().unwrap(); + info!("Http Server is listening on {}", self.bind_address); + let process_loop = self.worker_loop.process_channel( + sql_service, + &mut rx, + async move |sql_service, + ( + sender, + sql_query_context, + HttpMessage { + message_id, + command, + }, + )| { + let res = + HttpServer::process_command(sql_service, sql_query_context, command).await; + let message = match res { + Ok(command) => HttpMessage { + message_id, + command, + }, + Err(e) => HttpMessage { + message_id, + command: HttpCommand::Error { + error: e.to_string(), + }, + }, + }; + sender + .send(message) + .await + .map_err(|e| CubeError::from_error(e)) + }, + ); + let cancel_token = self.cancel_token.clone(); + let (_, server_future) = warp::serve( + query_route, // .or(import_route) + ) + .bind_with_graceful_shutdown(addr, async move { cancel_token.cancelled().await }); + let _ = tokio::join!(process_loop, server_future); - let query_route = warp::path!("query") - .and(warp::body::json()) - .and(sql_service_filter.clone()) - .and_then(post_query); + Ok(()) + } - // let import_route = warp::path!("insert" / String) - // .and(warp::body::aggregate()) - // .and_then(post_insert); + pub async fn process_command( + sql_service: Arc, + sql_query_context: SqlQueryContext, + command: HttpCommand, + ) -> Result { + match command { + HttpCommand::Query { query } => Ok(HttpCommand::ResultSet { + data_frame: sql_service + .exec_query_with_context(sql_query_context, &query) + .await?, + }), + x => Err(CubeError::user(format!("Unexpected command: {:?}", x))), + } + } - warp::serve( - query_route, // .or(import_route) - ) - .run(([127, 0, 0, 1], 3030)) - .await; + pub async fn authorize( + auth: Arc, + auth_header: Option, + ) -> Result, CubeError> { + let credentials = auth_header + .map(|auth_header| Credentials::from_header(auth_header)) + .transpose() + .map_err(|e| CubeError::from_error(e))?; + if let Some(password) = auth + .authenticate(credentials.as_ref().map(|c| c.user_id.to_string())) + .await? + { + if Some(password) != credentials.as_ref().map(|c| c.password.to_string()) { + Err(CubeError::user( + "User or password doesn't match".to_string(), + )) + } else { + Ok(credentials.as_ref().map(|c| c.user_id.to_string())) + } + } else { + Ok(credentials.as_ref().map(|c| c.user_id.to_string())) + } + } - Ok(()) + pub async fn stop_processing(&self) { + self.worker_loop.stop(); + self.cancel_token.cancel(); + } } -// curl -X POST -d '{"query":"create schema boo"}' -H "Content-Type: application/json" http://127.0.0.1:3030/query -pub async fn post_query( - query_body: SqlQueryBody, - sql_service: Arc, -) -> Result { - let res = sql_service.exec_query(&query_body.query).await?; - debug!("Query result is {:?}", res); - debug!("Post query: {:?}", query_body); - Ok(format!("{:?}", res)) +#[derive(Debug)] +pub struct HttpMessage { + message_id: u32, + command: HttpCommand, +} + +#[derive(Debug)] +pub enum HttpCommand { + Query { query: String }, + ResultSet { data_frame: DataFrame }, + Error { error: String }, +} + +impl HttpMessage { + pub fn bytes(&self) -> Vec { + let mut builder = flatbuffers::FlatBufferBuilder::new_with_capacity(1024); + let args = HttpMessageArgs { + message_id: self.message_id, + command_type: match self.command { + HttpCommand::Query { .. } => { + crate::codegen::http_message_generated::HttpCommand::HttpQuery + } + HttpCommand::ResultSet { .. } => { + crate::codegen::http_message_generated::HttpCommand::HttpResultSet + } + HttpCommand::Error { .. } => { + crate::codegen::http_message_generated::HttpCommand::HttpError + } + }, + command: match &self.command { + HttpCommand::Query { query } => { + let query_offset = builder.create_string(&query); + Some( + HttpQuery::create( + &mut builder, + &HttpQueryArgs { + query: Some(query_offset), + }, + ) + .as_union_value(), + ) + } + HttpCommand::Error { error } => { + let error_offset = builder.create_string(&error); + Some( + HttpError::create( + &mut builder, + &HttpErrorArgs { + error: Some(error_offset), + }, + ) + .as_union_value(), + ) + } + HttpCommand::ResultSet { data_frame } => { + let columns = data_frame + .get_columns() + .iter() + .map(|c| c.get_name().as_str()) + .collect::>(); + let columns_vec = builder.create_vector_of_strings(columns.as_slice()); + + let mut row_offsets = Vec::with_capacity(data_frame.get_rows().len()); + for row in data_frame.get_rows().iter() { + let mut value_offsets = Vec::with_capacity(row.values().len()); + for value in row.values().iter() { + let value = match value { + TableValue::Null => HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value: None }, + ), + TableValue::String(v) => { + let string_value = Some(builder.create_string(v)); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + TableValue::Int(v) => { + let string_value = Some(builder.create_string(&v.to_string())); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + TableValue::Decimal(v) => { + let string_value = Some(builder.create_string(&v.to_string())); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + TableValue::Float(v) => { + let string_value = Some(builder.create_string(&v.to_string())); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + TableValue::Bytes(v) => { + let string_value = Some(builder.create_string(&format!( + "0x{}", + v.encode_hex_upper::() + ))); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + TableValue::Timestamp(v) => { + let string_value = Some(builder.create_string(&v.to_string())); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + TableValue::Boolean(v) => { + let string_value = Some(builder.create_string(&v.to_string())); + HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { string_value }, + ) + } + }; + value_offsets.push(value); + } + let values = Some(builder.create_vector(value_offsets.as_slice())); + let row = HttpRow::create(&mut builder, &HttpRowArgs { values }); + row_offsets.push(row); + } + + let rows = Some(builder.create_vector(row_offsets.as_slice())); + + Some( + HttpResultSet::create( + &mut builder, + &HttpResultSetArgs { + columns: Some(columns_vec), + rows, + }, + ) + .as_union_value(), + ) + } + }, + }; + let message = + crate::codegen::http_message_generated::HttpMessage::create(&mut builder, &args); + builder.finish(message, None); + builder.finished_data().to_vec() // TODO copy + } + + pub fn read(buffer: Vec) -> Result { + let http_message = get_root_as_http_message(buffer.as_slice()); + Ok(HttpMessage { + message_id: http_message.message_id(), + command: match http_message.command_type() { + crate::codegen::http_message_generated::HttpCommand::HttpQuery => { + let query = http_message.command_as_http_query().unwrap(); + HttpCommand::Query { + query: query.query().unwrap().to_string(), + } + } + command => { + return Err(CubeError::internal(format!( + "Unexpected command: {:?}", + command + ))); + } + }, + }) + } } diff --git a/rust/cubestore/src/lib.rs b/rust/cubestore/src/lib.rs index d1114fd09ce5..2aab7123851b 100644 --- a/rust/cubestore/src/lib.rs +++ b/rust/cubestore/src/lib.rs @@ -34,6 +34,7 @@ use tokio::sync::mpsc::error::SendError; use tokio::time::error::Elapsed; pub mod cluster; +pub mod codegen; pub mod config; pub mod http; pub mod import; diff --git a/rust/cubestore/src/mysql/mod.rs b/rust/cubestore/src/mysql/mod.rs index 10676033924f..1834c50756ca 100644 --- a/rust/cubestore/src/mysql/mod.rs +++ b/rust/cubestore/src/mysql/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::{watch, RwLock}; struct Backend { sql_service: Arc, - auth: Arc, + auth: Arc, user: Option, } @@ -120,12 +120,15 @@ impl AsyncMysqlShim for Backend { where W: 'async_trait, { - if !user.is_empty() { - self.user = Some(String::from_utf8_lossy(user.as_slice()).to_string()) - } + self.user = if !user.is_empty() { + Some(String::from_utf8_lossy(user.as_slice()).to_string()) + } else { + None + }; self.auth - .authenticate(user) + .authenticate(self.user.clone()) .await + .map(|p| p.map(|p| p.as_bytes().to_vec())) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } } @@ -133,7 +136,7 @@ impl AsyncMysqlShim for Backend { pub struct MySqlServer { address: String, sql_service: Arc, - auth: Arc, + auth: Arc, close_socket_rx: RwLock>, close_socket_tx: watch::Sender, } @@ -197,7 +200,7 @@ impl MySqlServer { pub fn new( address: String, sql_service: Arc, - auth: Arc, + auth: Arc, ) -> Arc { let (close_socket_tx, close_socket_rx) = watch::channel(false); Arc::new(Self { @@ -211,17 +214,17 @@ impl MySqlServer { } #[async_trait] -pub trait MySqlAuth: Send + Sync { - async fn authenticate(&self, user: Vec) -> Result>, CubeError>; +pub trait SqlAuthService: Send + Sync { + async fn authenticate(&self, user: Option) -> Result, CubeError>; } -pub struct MySqlAuthDefaultImpl; +pub struct SqlAuthDefaultImpl; -crate::di_service!(MySqlAuthDefaultImpl, [MySqlAuth]); +crate::di_service!(SqlAuthDefaultImpl, [SqlAuthService]); #[async_trait] -impl MySqlAuth for MySqlAuthDefaultImpl { - async fn authenticate(&self, _user: Vec) -> Result>, CubeError> { +impl SqlAuthService for SqlAuthDefaultImpl { + async fn authenticate(&self, _user: Option) -> Result, CubeError> { Ok(None) } } diff --git a/rust/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/src/queryplanner/query_executor.rs index 9dee322f845c..b910c1004e71 100644 --- a/rust/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/src/queryplanner/query_executor.rs @@ -36,7 +36,7 @@ use datafusion::physical_plan::parquet::ParquetExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning, RecordBatchStream}; use itertools::Itertools; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, trace, warn}; use mockall::automock; use num::BigInt; use regex::Regex; diff --git a/rust/cubestore/src/sql/mod.rs b/rust/cubestore/src/sql/mod.rs index 75b711bf407d..f7687d136ac8 100644 --- a/rust/cubestore/src/sql/mod.rs +++ b/rust/cubestore/src/sql/mod.rs @@ -58,7 +58,7 @@ pub trait SqlService: DIService + Send + Sync { ) -> Result; } -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct SqlQueryContext { pub user: Option, } diff --git a/rust/cubestore/src/util/mod.rs b/rust/cubestore/src/util/mod.rs index d507a19be2a3..6186449ba4eb 100644 --- a/rust/cubestore/src/util/mod.rs +++ b/rust/cubestore/src/util/mod.rs @@ -5,6 +5,7 @@ use crate::CubeError; use log::error; use std::future::Future; use std::sync::Arc; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; pub struct WorkerLoop { @@ -57,6 +58,41 @@ impl WorkerLoop { } } + pub async fn process_channel( + &self, + service: Arc, + rx: &mut mpsc::Receiver, + loop_fn: impl Fn(Arc, T) -> FR + Send + Sync + 'static, + ) where + T: Send + Sync + 'static, + S: Send + Sync + 'static, + FR: Future> + Send + 'static, + { + let token = self.stopped_token.child_token(); + let loop_name = self.name.clone(); + loop { + let res = tokio::select! { + _ = token.cancelled() => { + return; + } + res = rx.recv() => { + res + } + }; + match res { + Some(r) => { + let loop_res = loop_fn(service.clone(), r).await; + if let Err(e) = loop_res { + error!("Error during {}: {:?}", loop_name, e); + } + } + None => { + return; + } + }; + } + } + pub fn stop(&self) { self.stopped_token.cancel() }