Skip to content

Commit

Permalink
Fix library usage
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Jan 19, 2023
1 parent 24b5281 commit 600f387
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions dozer-ingestion/src/connectors/kafka/debezium/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::errors::DebeziumSchemaError::{
BinaryDecodeError, DecimalConvertError, FieldNotFound, ScaleIsInvalid, ScaleNotFound,
TypeNotSupported,
};
use base64::STANDARD;
use base64::{engine, Engine};
use dozer_types::chrono::{NaiveDate, NaiveDateTime};

use crate::connectors::kafka::debezium::stream_consumer::DebeziumSchemaStruct;
Expand All @@ -13,7 +13,8 @@ use dozer_types::types::{Field, Schema};
use std::collections::HashMap;

fn convert_decimal(value: &str, scale: u32) -> Result<Field, DebeziumSchemaError> {
let decoded_value = base64::decode_config(value, STANDARD)
let decoded_value = engine::general_purpose::STANDARD
.decode(value)
.map_err(BinaryDecodeError)
.unwrap();

Expand Down Expand Up @@ -45,7 +46,9 @@ fn convert_value(
.map_or(Ok(Field::Null), |s| Ok(Field::from(s.to_string()))),
"bytes" => value.as_str().map_or(Ok(Field::Null), |s| {
Ok(Field::Binary(
base64::decode_config(s, STANDARD).map_err(BinaryDecodeError)?,
engine::general_purpose::STANDARD
.decode(s)
.map_err(BinaryDecodeError)?,
))
}),
"float32" | "float64" | "double" => value
Expand Down Expand Up @@ -142,6 +145,7 @@ mod tests {
use crate::connectors::kafka::debezium::stream_consumer::DebeziumSchemaParameters;
use crate::connectors::kafka::debezium::stream_consumer::DebeziumSchemaStruct;
use crate::errors::DebeziumSchemaError::TypeNotSupported;
use base64::{engine, Engine};
use dozer_types::chrono::{NaiveDate, NaiveDateTime};
use dozer_types::rust_decimal;
use dozer_types::serde_json::{Map, Value};
Expand Down Expand Up @@ -224,7 +228,7 @@ mod tests {

// 4 x 256 + 210 = 1234
test_conversion_debezium!(
base64::encode(vec![4, 210]),
engine::general_purpose::STANDARD.encode(vec![4, 210]),
"-",
Some("org.apache.kafka.connect.data.Decimal".to_string()),
Field::from(rust_decimal::Decimal::new(1234, 2)),
Expand All @@ -237,7 +241,7 @@ mod tests {
let mut v: Map<String, Value> = Map::new();
v.insert(
"value".to_string(),
Value::from(base64::encode(vec![4, 211])),
Value::from(engine::general_purpose::STANDARD.encode(vec![4, 211])),
);
v.insert("scale".to_string(), Value::from(2_u64));
test_conversion_debezium!(
Expand Down

0 comments on commit 600f387

Please sign in to comment.