diff --git a/Cargo.lock b/Cargo.lock index e2c9df9a9b..1380d48fab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,8 +88,8 @@ dependencies = [ "encoding_rs", "flate2", "futures-core", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "httparse", "httpdate", "itoa", @@ -124,7 +124,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66ff4d247d2b160861fa2866457e85706833527840e4133f8f49aa423a38799" dependencies = [ "bytestring", - "http", + "http 0.2.11", "regex", "serde", "tracing", @@ -967,7 +967,7 @@ dependencies = [ "bytes", "fastrand", "hex", - "http", + "http 0.2.11", "hyper 0.14.27", "ring 0.16.20", "time 0.3.30", @@ -1002,7 +1002,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", + "http 0.2.11", "http-body 0.4.5", "lazy_static", "percent-encoding", @@ -1026,7 +1026,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "fastrand", - "http", + "http 0.2.11", "percent-encoding", "tracing", "uuid", @@ -1054,7 +1054,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "http", + "http 0.2.11", "http-body 0.4.5", "once_cell", "percent-encoding", @@ -1082,7 +1082,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", + "http 0.2.11", "regex", "tokio-stream", "tracing", @@ -1107,7 +1107,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "http", + "http 0.2.11", "regex", "tracing", ] @@ -1124,7 +1124,7 @@ dependencies = [ "form_urlencoded", "hex", "hmac", - "http", + "http 0.2.11", "once_cell", "percent-encoding", "regex", @@ -1157,7 +1157,7 @@ dependencies = [ "crc32c", "crc32fast", "hex", - "http", + "http 0.2.11", "http-body 0.4.5", "md-5", "pin-project-lite", @@ -1178,7 +1178,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "http", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-rustls", @@ -1212,7 +1212,7 @@ dependencies = [ "bytes", "bytes-utils", "futures-core", - "http", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "once_cell", @@ -1233,7 +1233,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "bytes", - "http", + "http 0.2.11", "http-body 0.4.5", "pin-project-lite", "tower", @@ -1272,7 +1272,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "http", + "http 0.2.11", "http-body 0.4.5", "once_cell", "pin-project-lite", @@ -1291,7 +1291,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "bytes", - "http", + "http 0.2.11", "tokio", "tracing", ] @@ -1330,7 +1330,7 @@ dependencies = [ "aws-smithy-client", "aws-smithy-http", "aws-smithy-types", - "http", + "http 0.2.11", "rustc_version 0.4.0", "tracing", ] @@ -1346,7 +1346,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "itoa", @@ -1372,7 +1372,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", + "http 0.2.11", "http-body 0.4.5", "mime", "rustversion", @@ -3136,7 +3136,7 @@ dependencies = [ "deno_core", "deno_tls", "dyn-clone", - "http", + "http 0.2.11", "reqwest", "serde", "tokio", @@ -3198,7 +3198,7 @@ dependencies = [ "deno_websocket", "flate2", "fly-accept-encoding", - "http", + "http 0.2.11", "httparse", "hyper 0.14.27", "hyper 1.0.0-rc.4", @@ -3323,10 +3323,10 @@ dependencies = [ "ecb", "elliptic-curve 0.13.8", "errno 0.2.8", - "h2", + "h2 0.3.22", "hex", "hkdf", - "http", + "http 0.2.11", "idna 0.3.0", "indexmap 2.1.0", "lazy-regex", @@ -3419,7 +3419,7 @@ dependencies = [ "filetime", "fs3", "fwdansi", - "http", + "http 0.2.11", "hyper 0.14.27", "libc", "log", @@ -3525,7 +3525,7 @@ dependencies = [ "deno_net", "deno_tls", "fastwebsockets", - "http", + "http 0.2.11", "hyper 0.14.27", "once_cell", "serde", @@ -3767,7 +3767,7 @@ dependencies = [ "genawaiter", "gethostname", "handlebars", - "http", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "jsonwebtoken", @@ -3899,6 +3899,7 @@ dependencies = [ "chrono", "criterion", "dozer-cli", + "dozer-ingestion-aerospike", "dozer-ingestion-connector", "dozer-ingestion-deltalake", "dozer-ingestion-dozer", @@ -3927,6 +3928,15 @@ dependencies = [ "url", ] +[[package]] +name = "dozer-ingestion-aerospike" +version = "0.3.0" +dependencies = [ + "dozer-ingestion-connector", + "h2 0.4.2", + "http 1.0.0", +] + [[package]] name = "dozer-ingestion-connector" version = "0.3.0" @@ -4879,7 +4889,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3afa7516fdcfd8e5e93a938f8fec857785ced190a1f62d842d1fe1ffbe22ba8" dependencies = [ - "http", + "http 0.2.11", "itertools 0.10.5", "thiserror", ] @@ -5373,7 +5383,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util 0.7.10", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap 2.1.0", "slab", "tokio", @@ -5480,7 +5509,7 @@ dependencies = [ "base64 0.21.5", "bytes", "headers-core", - "http", + "http 0.2.11", "httpdate", "mime", "sha1", @@ -5492,7 +5521,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.11", ] [[package]] @@ -5590,6 +5619,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -5597,7 +5637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] @@ -5608,7 +5648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d" dependencies = [ "bytes", - "http", + "http 0.2.11", ] [[package]] @@ -5651,8 +5691,8 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 0.4.5", "httparse", "httpdate", @@ -5674,8 +5714,8 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 1.0.0-rc.2", "httparse", "httpdate", @@ -5693,7 +5733,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", + "http 0.2.11", "hyper 0.14.27", "log", "rustls 0.21.9", @@ -7562,7 +7602,7 @@ checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" dependencies = [ "async-trait", "futures-core", - "http", + "http 0.2.11", "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry_api", @@ -9178,8 +9218,8 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-rustls", @@ -11455,8 +11495,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-timeout", @@ -11482,8 +11522,8 @@ dependencies = [ "axum", "base64 0.21.5", "bytes", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-timeout", @@ -11536,7 +11576,7 @@ checksum = "0fddb2a37b247e6adcb9f239f4e5cefdcc5ed526141a416b943929f13aea2cce" dependencies = [ "base64 0.21.5", "bytes", - "http", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "pin-project", @@ -11580,7 +11620,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", + "http 0.2.11", "http-body 0.4.5", "http-range-header", "httpdate", diff --git a/dozer-api/src/rest/mod.rs b/dozer-api/src/rest/mod.rs index 04e0956976..14e4bf0c6b 100644 --- a/dozer-api/src/rest/mod.rs +++ b/dozer-api/src/rest/mod.rs @@ -235,5 +235,9 @@ async fn list_endpoint_paths(endpoints: web::Data>) -> web::Json, inserted_at_index: Option, + count: usize, + previous_started: Instant, } impl Sink for DummySink { @@ -66,8 +70,22 @@ impl Sink for DummySink { _record_store: &ProcessorRecordStore, op: OperationWithId, ) -> Result<(), BoxedError> { + if self.count % 1000 == 0 { + if self.count > 0 { + info!( + "Rate: {:.0} op/s, Processed {} records. Elapsed {:?}", + 1000.0 / self.previous_started.elapsed().as_secs_f64(), + self.count, + self.previous_started.elapsed(), + ); + } + self.previous_started = Instant::now(); + } + + self.count += 1; if let Some(inserted_at_index) = self.inserted_at_index { if let Operation::Insert { new } = op.op { + info!("Received record: {:?}", new); let value = &new.values[inserted_at_index]; if let Some(inserted_at) = value.to_timestamp() { let latency = Local::now().naive_utc() - inserted_at.naive_utc(); diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 0fa3a31994..f4df11043a 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -19,6 +19,7 @@ dozer-ingestion-mysql = { path = "./mysql" } dozer-ingestion-object-store = { path = "./object-store" } dozer-ingestion-postgres = { path = "./postgres" } dozer-ingestion-snowflake = { path = "./snowflake", optional = true } +dozer-ingestion-aerospike = { path = "./aerospike" } dozer-ingestion-webhook = { path = "./webhook" } dozer-ingestion-oracle = { path = "./oracle" } diff --git a/dozer-ingestion/aerospike/Cargo.toml b/dozer-ingestion/aerospike/Cargo.toml new file mode 100644 index 0000000000..19ab34620c --- /dev/null +++ b/dozer-ingestion/aerospike/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "dozer-ingestion-aerospike" +version = "0.3.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dozer-ingestion-connector = { path = "../connector" } +h2 = "0.4.2" +http = "1" diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs new file mode 100644 index 0000000000..dee468cd18 --- /dev/null +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -0,0 +1,261 @@ +use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError; +use dozer_ingestion_connector::dozer_types::log::{error, info}; +use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection; +use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::types::Operation::Insert; +use dozer_ingestion_connector::dozer_types::types::{Field, FieldDefinition, FieldType, Schema}; +use dozer_ingestion_connector::tokio::net::TcpListener; +use dozer_ingestion_connector::{ + async_trait, dozer_types, Connector, Ingestor, SourceSchema, SourceSchemaResult, + TableIdentifier, TableInfo, +}; +use h2::server::handshake; +use http::StatusCode; +use std::collections::HashMap; + +use dozer_ingestion_connector::dozer_types::serde::Deserialize; +use dozer_ingestion_connector::tokio; + +#[derive(Deserialize, Debug)] +#[serde(crate = "dozer_types::serde")] +pub struct AerospikeEvent { + // msg: String, + key: Vec>, + // gen: u32, + // exp: u32, + // lut: u64, + bins: Vec, +} + +#[derive(Deserialize, Debug)] +#[serde(crate = "dozer_types::serde")] +pub struct Bin { + name: String, + value: Option, + r#type: String, +} + +#[derive(Debug)] +pub struct AerospikeConnector { + pub config: AerospikeConnection, +} + +impl AerospikeConnector { + pub fn new(config: AerospikeConnection) -> Self { + Self { config } + } +} + +#[async_trait] +impl Connector for AerospikeConnector { + fn types_mapping() -> Vec<(String, Option)> + where + Self: Sized, + { + todo!() + } + + async fn validate_connection(&mut self) -> Result<(), BoxedError> { + Ok(()) + } + + async fn list_tables(&mut self) -> Result, BoxedError> { + Ok(self + .config + .sets + .iter() + .map(|set| TableIdentifier { + schema: Some(self.config.namespace.clone()), + name: set.to_string(), + }) + .collect()) + } + + async fn validate_tables(&mut self, _tables: &[TableIdentifier]) -> Result<(), BoxedError> { + Ok(()) + } + + async fn list_columns( + &mut self, + _tables: Vec, + ) -> Result, BoxedError> { + Ok(vec![]) + } + + async fn get_schemas( + &mut self, + table_infos: &[TableInfo], + ) -> Result, BoxedError> { + info!("Table infos: {:?}", table_infos); + let schemas = table_infos + .iter() + .map(|s| { + Ok(SourceSchema { + schema: Schema { + fields: s + .column_names + .iter() + .map(|name| FieldDefinition { + name: name.clone(), + typ: FieldType::String, + nullable: true, + source: Default::default(), + }) + .collect(), + primary_index: vec![s.column_names.iter().position(|n| n == "PK").unwrap()], + }, + cdc_type: Default::default(), + }) + }) + .collect(); + + info!("Schemas {:?}", schemas); + Ok(schemas) + } + + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + + async fn start( + &mut self, + ingestor: &Ingestor, + tables: Vec, + _last_checkpoint: Option, + ) -> Result<(), BoxedError> { + ingestor + .handle_message(IngestionMessage::SnapshottingStarted) + .await + .unwrap(); + ingestor + .handle_message(IngestionMessage::SnapshottingDone { id: None }) + .await + .unwrap(); + + let mut tables_map = HashMap::new(); + tables.iter().for_each(|table| { + let mut columns_map = HashMap::new(); + table.column_names.iter().enumerate().for_each(|(i, name)| { + columns_map.insert(name.clone(), i); + }); + tables_map.insert(table.name.clone(), columns_map); + }); + + let listener = TcpListener::bind("127.0.0.1:5928").await.unwrap(); + + loop { + if let Ok((socket, _peer_addr)) = listener.accept().await { + let t = tables_map.clone(); + let i = ingestor.clone(); + tokio::spawn(async move { + let mut h2 = handshake(socket).await.expect("Connection"); + // Accept all inbound HTTP/2 streams sent over the + // connection. + + 'looop: while let Some(request) = h2.accept().await { + match request { + Ok((request, mut respond)) => { + let mut body = request.into_body(); + + // let mut byes = Vec::new(); + while let Some(chunk) = body.data().await.transpose().unwrap() { + if !chunk.is_empty() { + let event_string = String::from_utf8(Vec::from(chunk)); + + let event = match dozer_types::serde_json::from_str::< + AerospikeEvent, + >( + &event_string.clone().unwrap() + ) { + Ok(event) => event, + Err(e) => { + error!("Error : {:?}", e); + continue; + } + }; + + let table_name = event.key.get(1).unwrap().clone().unwrap(); + if let Some(columns_map) = t.get(table_name.as_str()) { + let mut fields = vec![Field::Null; columns_map.len()]; + if let Some(pk) = columns_map.get("PK") { + fields[*pk] = match event.key.last().unwrap() { + None => Field::Null, + Some(s) => Field::String(s.to_string()), + }; + } + + for bin in event.bins { + if let Some(i) = columns_map.get(bin.name.as_str()) + { + match bin.value { + Some(value) => { + match bin.r#type.as_str() { + "str" => { + if let dozer_types::serde_json::Value::String(s) = value { + fields[*i] = Field::String(s); + } + } + "int" => { + if let dozer_types::serde_json::Value::Number(n) = value { + fields[*i] = Field::String(n.to_string()); + } + } + "float" => { + if let dozer_types::serde_json::Value::Number(n) = value { + fields[*i] = Field::String(n.to_string()); + } + } + unexpected => { + error!( + "Unexpected type: {}", + unexpected + ); + } + } + } + None => { + fields[*i] = Field::Null; + } + } + } + } + + i.handle_message(IngestionMessage::OperationEvent { + table_index: 0, + op: Insert { + new: dozer_types::types::Record::new(fields), + }, + id: None, + }) + .await + .unwrap(); + } else { + // info!("Not found table: {}", table_name); + } + } else { + info!("empty"); + } + } + + // info!("Got HTTP/2 request"); + // Build a response with no body + let response = http::response::Response::builder() + .status(StatusCode::OK) + .body(()) + .unwrap(); + + // Send the response back to the client + respond.send_response(response, true).unwrap(); + } + Err(e) => { + error!("Listiner Error: {:?}", e); + break 'looop; + } + } + } + }); + } + } + } +} diff --git a/dozer-ingestion/aerospike/src/lib.rs b/dozer-ingestion/aerospike/src/lib.rs new file mode 100644 index 0000000000..77bba250b1 --- /dev/null +++ b/dozer-ingestion/aerospike/src/lib.rs @@ -0,0 +1 @@ +pub mod connector; diff --git a/dozer-ingestion/src/lib.rs b/dozer-ingestion/src/lib.rs index c9a5a078a3..f0d2fcf44b 100644 --- a/dozer-ingestion/src/lib.rs +++ b/dozer-ingestion/src/lib.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use dozer_ingestion_aerospike::connector::AerospikeConnector; #[cfg(feature = "ethereum")] use dozer_ingestion_connector::dozer_types::models::ingestion_types::EthProviderConfig; use dozer_ingestion_connector::dozer_types::{ @@ -141,11 +142,11 @@ pub fn get_connector( runtime, javascript_config, ))), + ConnectionConfig::Aerospike(config) => Ok(Box::new(AerospikeConnector::new(config))), ConnectionConfig::Oracle(oracle_config) => Ok(Box::new(OracleConnector::new( connection.name, oracle_config, ))), - ConnectionConfig::Aerospike(_) => Err(ConnectorError::Unsupported("aerospike".to_owned())), } } diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 05ff4f79a9..7729ea128b 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -1316,6 +1316,8 @@ mod tests { .field(f("json", FieldType::Json), false); let connection_config = AerospikeConnection { hosts: "localhost:3000".into(), + namespace: "test".into(), + sets: vec![set.to_owned()], }; let factory = AerospikeSinkFactory::new( connection_config, diff --git a/dozer-tests/src/e2e_tests/runner/running_env.rs b/dozer-tests/src/e2e_tests/runner/running_env.rs index 169bfb300c..6c231b0507 100644 --- a/dozer-tests/src/e2e_tests/runner/running_env.rs +++ b/dozer-tests/src/e2e_tests/runner/running_env.rs @@ -369,11 +369,11 @@ fn write_dozer_config_for_running_in_docker_compose( } ConnectionConfig::JavaScript(_) => {} ConnectionConfig::Webhook(_) => {} + ConnectionConfig::Aerospike(_) => {} ConnectionConfig::Oracle(oracle) => { oracle.host = connection.name.clone(); oracle.port = map_port(oracle.port); } - ConnectionConfig::Aerospike(_) => {} } } diff --git a/dozer-types/src/models/connection.rs b/dozer-types/src/models/connection.rs index 01b5042697..517fa67da5 100644 --- a/dozer-types/src/models/connection.rs +++ b/dozer-types/src/models/connection.rs @@ -211,6 +211,8 @@ fn get_sslmode(mode: String) -> Result { #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone, Hash)] pub struct AerospikeConnection { pub hosts: String, + pub namespace: String, + pub sets: Vec, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 20514968b6..d0d9337dfb 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -626,6 +626,12 @@ impl SchemaExample for WebhookVerb { } } +#[derive(Debug, JsonSchema, Clone, Deserialize, Serialize, Hash, Eq, PartialEq)] +pub struct AerospikeConfig { + pub namespace: String, + pub sets: Vec, +} + #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] pub struct OracleConfig { pub user: String, diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 9a17b7bbd5..5c57e404ca 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -128,11 +128,22 @@ "AerospikeConnection": { "type": "object", "required": [ - "hosts" + "hosts", + "namespace", + "sets" ], "properties": { "hosts": { "type": "string" + }, + "namespace": { + "type": "string" + }, + "sets": { + "type": "array", + "items": { + "type": "string" + } } } },