Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support environment variable substitution #433

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 13 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 20 additions & 10 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use anyhow::{anyhow, bail};
use arroyo_rpc::OperatorConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use typify::import_types;

use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::{var_str::VarStr, OperatorConfig};
use axum::response::sse::Event;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
ClientConfig, Offset, TopicPartitionList,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tonic::Status;
use tracing::{error, info, warn};
use typify::import_types;

use crate::{pull_opt, Connection, ConnectionType};

Expand All @@ -25,7 +24,13 @@ const CONFIG_SCHEMA: &str = include_str!("../../connector-schemas/kafka/connecti
const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/kafka/table.json");
const ICON: &str = include_str!("../resources/kafka.svg");

import_types!(schema = "../connector-schemas/kafka/connection.json",);
import_types!(
schema = "../connector-schemas/kafka/connection.json",
convert = {
{type = "string", format = "var-str"} = VarStr,
{type = "string", format = "var-str", isSensitive = true} = VarStr
}
);
import_types!(schema = "../connector-schemas/kafka/table.json");

pub struct KafkaConnector {}
Expand Down Expand Up @@ -150,14 +155,16 @@ impl Connector for KafkaConnector {
mechanism: pull_opt("auth.mechanism", options)?,
protocol: pull_opt("auth.protocol", options)?,
username: pull_opt("auth.username", options)?,
password: pull_opt("auth.password", options)?,
password: VarStr::new(pull_opt("auth.password", options)?),
},
Some(other) => bail!("unknown auth type '{}'", other),
};

let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = options.remove("schema_registry.api_key");
let api_secret = options.remove("schema_registry.api_secret");
let api_secret = options
.remove("schema_registry.api_secret")
.map(|secret| VarStr::new(secret));
SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
Expand Down Expand Up @@ -251,7 +258,10 @@ impl KafkaTester {
client_config.set("sasl.mechanism", mechanism);
client_config.set("security.protocol", protocol);
client_config.set("sasl.username", username);
client_config.set("sasl.password", password);
client_config.set(
"sasl.password",
password.sub_env_vars().map_err(|e| e.to_string())?,
);
}
};

Expand Down
2 changes: 1 addition & 1 deletion arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF
}
}

fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result<Client> {
fn construct_http_client(endpoint: &str, headers: Option<String>) -> anyhow::Result<Client> {
if let Err(e) = reqwest::Url::parse(&endpoint) {
bail!("invalid endpoint '{}': {:?}", endpoint, e)
};
Expand Down
22 changes: 15 additions & 7 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::convert::Infallible;

use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{var_str::VarStr, OperatorConfig};
use arroyo_types::string_to_map;
use axum::response::sse::Event;
use reqwest::{Client, Request};
Expand All @@ -20,7 +20,10 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json");

import_types!(schema = "../connector-schemas/polling_http/table.json");
import_types!(
schema = "../connector-schemas/polling_http/table.json",
convert = { {type = "string", format = "var-str"} = VarStr }
);
const ICON: &str = include_str!("../resources/http.svg");

pub struct PollingHTTPConnector {}
Expand Down Expand Up @@ -55,8 +58,13 @@ impl PollingHTTPConnector {
config: &PollingHttpTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let headers = config
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()?;

let client = construct_http_client(&config.endpoint, headers)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -126,7 +134,7 @@ impl Connector for PollingHTTPConnector {
Err(err) => TestSourceMessage {
error: true,
done: true,
message: format!("{:?}", err),
message: format!("{}", err.root_cause()),
},
};

Expand Down Expand Up @@ -166,7 +174,7 @@ impl Connector for PollingHTTPConnector {
EmptyConfig {},
PollingHttpTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
method,
body,
poll_interval_ms: interval,
Expand All @@ -187,7 +195,7 @@ impl Connector for PollingHTTPConnector {
let description = format!("PollingHTTPSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars()?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down
16 changes: 9 additions & 7 deletions arroyo-connectors/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::Infallible;
use std::time::Duration;

use anyhow::{anyhow, bail};
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{var_str::VarStr, OperatorConfig};
use arroyo_types::string_to_map;
use axum::response::sse::Event;
use eventsource_client::Client;
Expand All @@ -22,7 +22,7 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/sse/table.json");

import_types!(schema = "../connector-schemas/sse/table.json");
import_types!(schema = "../connector-schemas/sse/table.json", convert = { {type = "string", format = "var-str"} = VarStr });
const ICON: &str = include_str!("../resources/sse.svg");

pub struct SSEConnector {}
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Connector for SSEConnector {
let description = format!("SSESource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars()?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down Expand Up @@ -134,7 +134,7 @@ impl Connector for SSEConnector {
SseTable {
endpoint,
events,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
},
schema,
)
Expand Down Expand Up @@ -174,11 +174,13 @@ impl SseTester {
.map_err(|_| anyhow!("Endpoint URL is invalid"))?;

let headers = string_to_map(
self.config
&self
.config
.headers
.as_ref()
.map(|t| t.0.as_str())
.unwrap_or(""),
.map(|s| s.sub_env_vars())
.transpose()?
.unwrap_or("".to_string()),
)
.ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))?;

Expand Down
37 changes: 23 additions & 14 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ use std::convert::Infallible;
use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;

use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::var_str::VarStr;
use axum::response::sse::Event;
use reqwest::{Client, Request};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::sync::mpsc::Sender;
use typify::import_types;

use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use serde::{Deserialize, Serialize};

use crate::{construct_http_client, pull_opt, Connection, EmptyConfig};

use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/webhook/table.json");

import_types!(schema = "../connector-schemas/webhook/table.json");
import_types!(schema = "../connector-schemas/webhook/table.json", convert = { {type = "string", format = "var-str"} = VarStr });
const ICON: &str = include_str!("../resources/webhook.svg");

pub struct WebhookConnector {}
Expand All @@ -47,8 +47,13 @@ impl WebhookConnector {
config: &WebhookTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let headers = config
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()?;

let client = construct_http_client(&config.endpoint, headers)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -176,14 +181,18 @@ impl Connector for WebhookConnector {
) -> anyhow::Result<Connection> {
let endpoint = pull_opt("endpoint", options)?;

let headers = options
.remove("headers")
.map(|s| s.try_into())
.transpose()
.map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?;
let headers = options.remove("headers").map(|s| VarStr::new(s));

let table = WebhookTable { endpoint, headers };
let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?;

let client = construct_http_client(
&table.endpoint,
table
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()?,
)?;
let _ = Self::construct_test_request(&client, &table)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
Expand Down
34 changes: 21 additions & 13 deletions arroyo-connectors/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::anyhow;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::var_str::VarStr;
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use axum::response::sse::Event;
Expand All @@ -25,7 +26,7 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/websocket/table.json");

import_types!(schema = "../connector-schemas/websocket/table.json");
import_types!(schema = "../connector-schemas/websocket/table.json", convert = { {type = "string", format = "var-str"} = VarStr });
const ICON: &str = include_str!("../resources/websocket.svg");

pub struct WebsocketConnector {}
Expand Down Expand Up @@ -79,16 +80,23 @@ impl Connector for WebsocketConnector {
}
};

let headers =
match string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or(""))
.ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))
{
Ok(headers) => headers,
Err(e) => {
send(true, true, format!("Failed to parse headers: {:?}", e)).await;
return;
}
};
let headers_str = match table.headers.as_ref().map(|s| s.sub_env_vars()).transpose() {
Ok(headers) => headers,
Err(e) => {
send(true, true, format!("{}", e.root_cause())).await;
return;
}
};

let headers = match string_to_map(&headers_str.unwrap_or("".to_string()))
.ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))
{
Ok(headers) => headers,
Err(e) => {
send(true, true, format!("Failed to parse headers: {:?}", e)).await;
return;
}
};

let uri = match Uri::from_str(&table.endpoint.to_string()) {
Ok(uri) => uri,
Expand Down Expand Up @@ -215,7 +223,7 @@ impl Connector for WebsocketConnector {
let description = format!("WebsocketSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars()?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down Expand Up @@ -289,7 +297,7 @@ impl Connector for WebsocketConnector {
EmptyConfig {},
WebsocketTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
subscription_message: None,
subscription_messages,
},
Expand Down
1 change: 1 addition & 0 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ export function JsonForm({
}) {
let ajv = new Ajv();
ajv.addKeyword('isSensitive');
ajv.addFormat('var-str', { validate: () => true });
const memoAjv = useMemo(() => addFormats(ajv), [schema]);

const formik = useFormik({
Expand Down