Skip to content

Commit

Permalink
feat: Implement schema support for postgres (#1070)
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg authored Mar 13, 2023
1 parent cfbd9bc commit 8c70609
Show file tree
Hide file tree
Showing 30 changed files with 212 additions and 70 deletions.
1 change: 1 addition & 0 deletions dozer-ingestion/examples/postgres/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fn main() {
let (ingestor, mut iterator) = Ingestor::initialize_channel(IngestionConfig::default());
let tables = vec![TableInfo {
name: "users".to_string(),
schema: Some("public".to_string()),
columns: None,
}];
let postgres_config = PostgresConfig {
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/delta_lake/schema_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ impl SchemaHelper {
.into();
let schema_mapper = SchemaMapper::new(self.config.clone());
let schema = schema_mapper.map_schema(id as u32, arrow_schema, table)?;
Ok(SourceSchema::new(table.name.clone(), schema, Nothing))
Ok(SourceSchema::new(table.name.clone(), None, schema, Nothing))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn get_schema_from_deltalake() {
let connector = DeltaLakeConnector::new(1, config);
let table_info = TableInfo {
name: table_name.to_string(),
schema: Some("public".to_string()),
columns: None,
};
let field = connector.get_schemas(Some(&vec![table_info])).unwrap()[0]
Expand Down Expand Up @@ -53,6 +54,7 @@ fn read_deltalake() {
let (ingestor, iterator) = Ingestor::initialize_channel(config);
let table = TableInfo {
name: "test_table".to_string(),
schema: Some("public".to_string()),
columns: None,
};
thread::spawn(move || {
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/ethereum/log/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Connector for EthLogConnector {
) -> Result<Vec<SourceSchema>, ConnectorError> {
let mut schemas = vec![SourceSchema::new(
ETH_LOGS_TABLE.to_string(),
None,
helper::get_eth_schema(),
ReplicationChangesTrackingType::Nothing,
)];
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/ethereum/log/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub fn get_contract_event_schemas(

schemas.push(SourceSchema::new(
get_table_name(&contract_tuple, &event.name),
None,
Schema {
identifier: Some(SchemaIdentifier {
id: schema_id as u32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub fn get_eth_producer(
let schemas = eth_connector.get_schemas(None)?;
for SourceSchema {
name,
schema_name: _,
schema,
replication_type: _,
} in schemas
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/ethereum/trace/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Connector for EthTraceConnector {
) -> Result<Vec<SourceSchema>, ConnectorError> {
Ok(vec![SourceSchema::new(
ETH_TRACE_TABLE.to_string(),
None,
helper::get_trace_schema(),
ReplicationChangesTrackingType::Nothing,
)])
Expand Down
9 changes: 5 additions & 4 deletions dozer-ingestion/src/connectors/grpc/adapter/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ impl ArrowAdapter {

arrow_schemas.insert(id as u32, grpc_schema.schema);

schemas.push(SourceSchema {
name: grpc_schema.name,
schemas.push(SourceSchema::new(
grpc_schema.name,
None,
schema,
replication_type: grpc_schema.replication_type.clone(),
});
grpc_schema.replication_type.clone(),
));
}
Ok((schemas, arrow_schemas))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl NoSchemaRegistry {

schemas.push(SourceSchema::new(
table.name.clone(),
None,
mapped_schema,
ReplicationChangesTrackingType::FullChanges,
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl SchemaRegistry {

schema_data = Some(Ok(vec![SourceSchema::new(
table.name.clone(),
None,
schema,
ReplicationChangesTrackingType::FullChanges,
)]));
Expand Down
2 changes: 2 additions & 0 deletions dozer-ingestion/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub trait Connector: Send + Sync + Debug {
})
.collect(),
),
schema: source_schema.schema_name,
})
.collect::<Vec<TableInfo>>())
}
Expand All @@ -79,6 +80,7 @@ pub trait Connector: Send + Sync + Debug {
#[serde(crate = "self::serde")]
pub struct TableInfo {
pub name: String,
pub schema: Option<String>,
pub columns: Option<Vec<ColumnInfo>>,
}

Expand Down
3 changes: 2 additions & 1 deletion dozer-ingestion/src/connectors/object_store/schema_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl<T: DozerObjectStore> Mapper<T> for SchemaMapper<T> {
.map(|t| TableInfo {
name: t.name.clone(),
columns: None,
schema: None,
})
.collect()
});
Expand Down Expand Up @@ -107,7 +108,7 @@ impl<T: DozerObjectStore> Mapper<T> for SchemaMapper<T> {

let schema = self.map_schema(id as u32, resolved_schema, table)?;

Ok(SourceSchema::new(table_name, schema, Nothing))
Ok(SourceSchema::new(table_name, None, schema, Nothing))
})
.collect()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ fn test_read_parquet_file() {

let table = TableInfo {
name: "all_types_parquet".to_string(),
schema: None,
columns: None,
};
thread::spawn(move || {
Expand Down Expand Up @@ -125,6 +126,7 @@ fn test_csv_read() {

let table = TableInfo {
name: "all_types_csv".to_string(),
schema: None,
columns: None,
};

Expand Down Expand Up @@ -200,6 +202,7 @@ fn test_missing_directory() {
&ingestor,
vec![TableInfo {
name: table.name,
schema: None,
columns: None,
}],
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ mod tests {

let tables = vec![TableInfo {
name: "not_existing".to_string(),
schema: Some("public".to_string()),
columns: None,
}];
let result = validate_connection("pg_test_conn", config, Some(&tables), None);
Expand Down Expand Up @@ -476,6 +477,7 @@ mod tests {

let tables = vec![TableInfo {
name: "existing".to_string(),
schema: Some("public".to_string()),
columns: Some(columns),
}];

Expand Down Expand Up @@ -654,6 +656,7 @@ mod tests {
for (table_name, expected_result) in tables_with_result {
let res = validate_tables_names(&vec![TableInfo {
name: table_name.to_string(),
schema: Some("public".to_string()),
columns: None,
}]);

Expand All @@ -673,6 +676,7 @@ mod tests {
for (column_name, expected_result) in columns_names_with_result {
let res = validate_columns_names(&vec![TableInfo {
name: "column_test_table".to_string(),
schema: Some("public".to_string()),
columns: Some(vec![ColumnInfo {
name: column_name.to_string(),
data_type: None,
Expand Down Expand Up @@ -707,6 +711,7 @@ mod tests {
&mut pg_client,
&vec![TableInfo {
name: table_name,
schema: Some("public".to_string()),
columns: None,
}],
);
Expand All @@ -717,6 +722,7 @@ mod tests {
&mut pg_client,
&vec![TableInfo {
name: view_name,
schema: Some("public".to_string()),
columns: None,
}],
);
Expand Down
17 changes: 15 additions & 2 deletions dozer-ingestion/src/connectors/postgres/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl PostgresConnector {
let mut replication_conn_config = config.config.clone();
replication_conn_config.replication_mode(ReplicationMode::Logical);

let helper = SchemaHelper::new(config.config.clone(), None);
let helper = SchemaHelper::new(config.config.clone());

// conn_str - replication_conn_config
// conn_str_plain- conn_config
Expand Down Expand Up @@ -101,6 +101,7 @@ impl Connector for PostgresConnector {

let lsn = PostgresConnector::get_lsn_with_offset_from_seq(self.name.clone(), from_seq);

info!("TABLES iN STRT {:?}", tables);
let iterator = PostgresIterator::new(
self.id,
self.name.clone(),
Expand Down Expand Up @@ -137,6 +138,7 @@ impl Connector for PostgresConnector {
.map(|table_info| TableInfo {
name: table_info.name,
columns: Some(table_info.columns),
schema: Some(table_info.schema),
})
.collect())
}
Expand Down Expand Up @@ -169,7 +171,18 @@ impl PostgresConnector {
let table_str: String = match self.tables.as_ref() {
None => "ALL TABLES".to_string(),
Some(arr) => {
let table_names: Vec<String> = arr.iter().map(|t| t.name.clone()).collect();
let table_names: Vec<String> = arr
.iter()
.map(|t| {
format!(
"{}.{}",
t.schema
.as_ref()
.map_or("public".to_string(), |s| s.clone()),
t.name.clone()
)
})
.collect();
format!("TABLE {}", table_names.join(" , "))
}
};
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/postgres/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl<'a> PostgresIteratorHandler<'a> {
.map(|table_info| TableInfo {
name: table_info.name.clone(),
columns: Some(table_info.columns.clone()),
schema: Some(table_info.schema.clone()),
})
.collect::<Vec<_>>();
snapshotter.sync_tables(&tables)?;
Expand Down
Loading

0 comments on commit 8c70609

Please sign in to comment.