Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Aerospike ingestion #2366

Merged
merged 4 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 86 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions dozer-api/src/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,9 @@ async fn list_endpoint_paths(endpoints: web::Data<Vec<String>>) -> web::Json<Vec
web::Json(endpoints.get_ref().clone())
}

// docker run \
// -v ~/aerospike_esp/aerospike-esp-outbound.yml /etc/aerospike-esp-outbound/aerospike-esp-outbound.yml \
// aerospike/aerospike-esp-outbound:2.3.0

#[cfg(test)]
mod tests;
22 changes: 20 additions & 2 deletions dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ impl SinkFactory for DummySinkFactory {
.map(|(index, _)| index);
Ok(Box::new(DummySink {
inserted_at_index,
..Default::default()
previous_started: Instant::now(),
count: 0,
snapshotting_started_instant: HashMap::new(),
}))
}

Expand All @@ -53,10 +55,12 @@ impl SinkFactory for DummySinkFactory {
}
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct DummySink {
snapshotting_started_instant: HashMap<String, Instant>,
inserted_at_index: Option<usize>,
count: usize,
previous_started: Instant,
}

impl Sink for DummySink {
Expand All @@ -66,8 +70,22 @@ impl Sink for DummySink {
_record_store: &ProcessorRecordStore,
op: OperationWithId,
) -> Result<(), BoxedError> {
if self.count % 1000 == 0 {
if self.count > 0 {
info!(
"Rate: {:.0} op/s, Processed {} records. Elapsed {:?}",
1000.0 / self.previous_started.elapsed().as_secs_f64(),
self.count,
self.previous_started.elapsed(),
);
}
self.previous_started = Instant::now();
}

self.count += 1;
if let Some(inserted_at_index) = self.inserted_at_index {
if let Operation::Insert { new } = op.op {
info!("Received record: {:?}", new);
let value = &new.values[inserted_at_index];
if let Some(inserted_at) = value.to_timestamp() {
let latency = Local::now().naive_utc() - inserted_at.naive_utc();
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dozer-ingestion-mysql = { path = "./mysql" }
dozer-ingestion-object-store = { path = "./object-store" }
dozer-ingestion-postgres = { path = "./postgres" }
dozer-ingestion-snowflake = { path = "./snowflake", optional = true }
dozer-ingestion-aerospike = { path = "./aerospike" }
dozer-ingestion-webhook = { path = "./webhook" }
dozer-ingestion-oracle = { path = "./oracle" }

Expand Down
11 changes: 11 additions & 0 deletions dozer-ingestion/aerospike/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "dozer-ingestion-aerospike"
version = "0.3.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dozer-ingestion-connector = { path = "../connector" }
h2 = "0.4.2"
http = "1"
261 changes: 261 additions & 0 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::log::{error, info};
use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection;
use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage;
use dozer_ingestion_connector::dozer_types::node::OpIdentifier;
use dozer_ingestion_connector::dozer_types::types::Operation::Insert;
use dozer_ingestion_connector::dozer_types::types::{Field, FieldDefinition, FieldType, Schema};
use dozer_ingestion_connector::tokio::net::TcpListener;
use dozer_ingestion_connector::{
async_trait, dozer_types, Connector, Ingestor, SourceSchema, SourceSchemaResult,
TableIdentifier, TableInfo,
};
use h2::server::handshake;
use http::StatusCode;
use std::collections::HashMap;

use dozer_ingestion_connector::dozer_types::serde::Deserialize;
use dozer_ingestion_connector::tokio;

#[derive(Deserialize, Debug)]
#[serde(crate = "dozer_types::serde")]
pub struct AerospikeEvent {
// msg: String,
key: Vec<Option<String>>,
// gen: u32,
// exp: u32,
// lut: u64,
bins: Vec<Bin>,
}

#[derive(Deserialize, Debug)]
#[serde(crate = "dozer_types::serde")]
pub struct Bin {
name: String,
value: Option<dozer_types::serde_json::Value>,
r#type: String,
}

#[derive(Debug)]
pub struct AerospikeConnector {
pub config: AerospikeConnection,
}

impl AerospikeConnector {
pub fn new(config: AerospikeConnection) -> Self {
Self { config }
}
}

#[async_trait]
impl Connector for AerospikeConnector {
fn types_mapping() -> Vec<(String, Option<FieldType>)>
where
Self: Sized,
{
todo!()
}

async fn validate_connection(&mut self) -> Result<(), BoxedError> {
Ok(())
}

async fn list_tables(&mut self) -> Result<Vec<TableIdentifier>, BoxedError> {
Ok(self
.config
.sets
.iter()
.map(|set| TableIdentifier {
schema: Some(self.config.namespace.clone()),
name: set.to_string(),
})
.collect())
}

async fn validate_tables(&mut self, _tables: &[TableIdentifier]) -> Result<(), BoxedError> {
Ok(())
}

async fn list_columns(
&mut self,
_tables: Vec<TableIdentifier>,
) -> Result<Vec<TableInfo>, BoxedError> {
Ok(vec![])
}

async fn get_schemas(
&mut self,
table_infos: &[TableInfo],
) -> Result<Vec<SourceSchemaResult>, BoxedError> {
info!("Table infos: {:?}", table_infos);
let schemas = table_infos
.iter()
.map(|s| {
Ok(SourceSchema {
schema: Schema {
fields: s
.column_names
.iter()
.map(|name| FieldDefinition {
name: name.clone(),
typ: FieldType::String,
nullable: true,
source: Default::default(),
})
.collect(),
primary_index: vec![s.column_names.iter().position(|n| n == "PK").unwrap()],
},
cdc_type: Default::default(),
})
})
.collect();

info!("Schemas {:?}", schemas);
Ok(schemas)
}

async fn serialize_state(&self) -> Result<Vec<u8>, BoxedError> {
Ok(vec![])
}

async fn start(
&mut self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
_last_checkpoint: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
ingestor
.handle_message(IngestionMessage::SnapshottingStarted)
.await
.unwrap();
ingestor
.handle_message(IngestionMessage::SnapshottingDone { id: None })
.await
.unwrap();

let mut tables_map = HashMap::new();
tables.iter().for_each(|table| {
let mut columns_map = HashMap::new();
table.column_names.iter().enumerate().for_each(|(i, name)| {
columns_map.insert(name.clone(), i);
});
tables_map.insert(table.name.clone(), columns_map);
});

let listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();

loop {
if let Ok((socket, _peer_addr)) = listener.accept().await {
let t = tables_map.clone();
let i = ingestor.clone();
tokio::spawn(async move {
let mut h2 = handshake(socket).await.expect("Connection");
// Accept all inbound HTTP/2 streams sent over the
// connection.

'looop: while let Some(request) = h2.accept().await {
match request {
Ok((request, mut respond)) => {
let mut body = request.into_body();

// let mut byes = Vec::new();
while let Some(chunk) = body.data().await.transpose().unwrap() {
if !chunk.is_empty() {
let event_string = String::from_utf8(Vec::from(chunk));

let event = match dozer_types::serde_json::from_str::<
AerospikeEvent,
>(
&event_string.clone().unwrap()
) {
Ok(event) => event,
Err(e) => {
error!("Error : {:?}", e);
continue;
}
};

let table_name = event.key.get(1).unwrap().clone().unwrap();
if let Some(columns_map) = t.get(table_name.as_str()) {
let mut fields = vec![Field::Null; columns_map.len()];
if let Some(pk) = columns_map.get("PK") {
fields[*pk] = match event.key.last().unwrap() {
None => Field::Null,
Some(s) => Field::String(s.to_string()),
};
}

for bin in event.bins {
if let Some(i) = columns_map.get(bin.name.as_str())
{
match bin.value {
Some(value) => {
match bin.r#type.as_str() {
"str" => {
if let dozer_types::serde_json::Value::String(s) = value {
fields[*i] = Field::String(s);
}
}
"int" => {
if let dozer_types::serde_json::Value::Number(n) = value {
fields[*i] = Field::String(n.to_string());
}
}
"float" => {
if let dozer_types::serde_json::Value::Number(n) = value {
fields[*i] = Field::String(n.to_string());
}
}
unexpected => {
error!(
"Unexpected type: {}",
unexpected
);
}
}
}
None => {
fields[*i] = Field::Null;
}
}
}
}

i.handle_message(IngestionMessage::OperationEvent {
table_index: 0,
op: Insert {
new: dozer_types::types::Record::new(fields),
},
id: None,
})
.await
.unwrap();
} else {
// info!("Not found table: {}", table_name);
}
} else {
info!("empty");
}
}

// info!("Got HTTP/2 request");
// Build a response with no body
let response = http::response::Response::builder()
.status(StatusCode::OK)
.body(())
.unwrap();

// Send the response back to the client
respond.send_response(response, true).unwrap();
}
Err(e) => {
error!("Listiner Error: {:?}", e);
break 'looop;
}
}
}
});
}
}
}
}
1 change: 1 addition & 0 deletions dozer-ingestion/aerospike/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod connector;
3 changes: 2 additions & 1 deletion dozer-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use dozer_ingestion_aerospike::connector::AerospikeConnector;
#[cfg(feature = "ethereum")]
use dozer_ingestion_connector::dozer_types::models::ingestion_types::EthProviderConfig;
use dozer_ingestion_connector::dozer_types::{
Expand Down Expand Up @@ -141,11 +142,11 @@ pub fn get_connector(
runtime,
javascript_config,
))),
ConnectionConfig::Aerospike(config) => Ok(Box::new(AerospikeConnector::new(config))),
ConnectionConfig::Oracle(oracle_config) => Ok(Box::new(OracleConnector::new(
connection.name,
oracle_config,
))),
ConnectionConfig::Aerospike(_) => Err(ConnectorError::Unsupported("aerospike".to_owned())),
}
}

Expand Down
2 changes: 2 additions & 0 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,8 @@ mod tests {
.field(f("json", FieldType::Json), false);
let connection_config = AerospikeConnection {
hosts: "localhost:3000".into(),
namespace: "test".into(),
sets: vec![set.to_owned()],
};
let factory = AerospikeSinkFactory::new(
connection_config,
Expand Down
2 changes: 1 addition & 1 deletion dozer-tests/src/e2e_tests/runner/running_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ fn write_dozer_config_for_running_in_docker_compose(
}
ConnectionConfig::JavaScript(_) => {}
ConnectionConfig::Webhook(_) => {}
ConnectionConfig::Aerospike(_) => {}
ConnectionConfig::Oracle(oracle) => {
oracle.host = connection.name.clone();
oracle.port = map_port(oracle.port);
}
ConnectionConfig::Aerospike(_) => {}
}
}

Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ fn get_sslmode(mode: String) -> Result<SslMode, DeserializationError> {
#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone, Hash)]
pub struct AerospikeConnection {
pub hosts: String,
pub namespace: String,
pub sets: Vec<String>,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)]
Expand Down