Skip to content

Commit

Permalink
Support environment variable substitution
Browse files Browse the repository at this point in the history
In the Kafka, Polling HTTP, WebSocket, and SSE sources, support
environment variables in the passwords, secrets, and headers fields.

This is done by setting the format of the field in the JSON schema to
'var-str', which tells typify to replace the type with the VarStr
struct, which has a function for substituting environment variables.
The VarStr struct serializes/deserializes from a JSON string.
  • Loading branch information
jbeisen committed Dec 6, 2023
1 parent 4d3ff50 commit 20efd7e
Show file tree
Hide file tree
Showing 21 changed files with 335 additions and 116 deletions.
21 changes: 14 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 18 additions & 10 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use anyhow::{anyhow, bail};
use arroyo_rpc::OperatorConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use typify::import_types;

use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::OperatorConfig;
use arroyo_types::VarStr;
use axum::response::sse::Event;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
ClientConfig, Offset, TopicPartitionList,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tonic::Status;
use tracing::{error, info, warn};
use typify::import_types;

use crate::{pull_opt, Connection, ConnectionType};

Expand All @@ -25,7 +25,13 @@ const CONFIG_SCHEMA: &str = include_str!("../../connector-schemas/kafka/connecti
const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/kafka/table.json");
const ICON: &str = include_str!("../resources/kafka.svg");

import_types!(schema = "../connector-schemas/kafka/connection.json",);
import_types!(
schema = "../connector-schemas/kafka/connection.json",
convert = {
{type = "string", format = "var-str"} = VarStr,
{type = "string", format = "var-str", isSensitive = true} = VarStr
}
);
import_types!(schema = "../connector-schemas/kafka/table.json");

pub struct KafkaConnector {}
Expand Down Expand Up @@ -150,14 +156,16 @@ impl Connector for KafkaConnector {
mechanism: pull_opt("auth.mechanism", options)?,
protocol: pull_opt("auth.protocol", options)?,
username: pull_opt("auth.username", options)?,
password: pull_opt("auth.password", options)?,
password: VarStr::new(pull_opt("auth.password", options)?),
},
Some(other) => bail!("unknown auth type '{}'", other),
};

let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = options.remove("schema_registry.api_key");
let api_secret = options.remove("schema_registry.api_secret");
let api_secret = options
.remove("schema_registry.api_secret")
.map(|secret| VarStr::new(secret));
SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
Expand Down Expand Up @@ -251,7 +259,7 @@ impl KafkaTester {
client_config.set("sasl.mechanism", mechanism);
client_config.set("security.protocol", protocol);
client_config.set("sasl.username", username);
client_config.set("sasl.password", password);
client_config.set("sasl.password", password.sub_env_vars()?);
}
};

Expand Down
2 changes: 1 addition & 1 deletion arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF
}
}

fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result<Client> {
fn construct_http_client(endpoint: &str, headers: Option<String>) -> anyhow::Result<Client> {
if let Err(e) = reqwest::Url::parse(&endpoint) {
bail!("invalid endpoint '{}': {:?}", endpoint, e)
};
Expand Down
23 changes: 16 additions & 7 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::Infallible;

use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, VarStr};
use axum::response::sse::Event;
use reqwest::{Client, Request};
use tokio::sync::mpsc::Sender;
Expand All @@ -20,7 +20,10 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json");

import_types!(schema = "../connector-schemas/polling_http/table.json");
import_types!(
schema = "../connector-schemas/polling_http/table.json",
convert = { {type = "string", format = "var-str"} = VarStr }
);
const ICON: &str = include_str!("../resources/http.svg");

pub struct PollingHTTPConnector {}
Expand Down Expand Up @@ -55,8 +58,14 @@ impl PollingHTTPConnector {
config: &PollingHttpTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let headers = config
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()
.map_err(|e| anyhow!(e))?;

let client = construct_http_client(&config.endpoint, headers)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -126,7 +135,7 @@ impl Connector for PollingHTTPConnector {
Err(err) => TestSourceMessage {
error: true,
done: true,
message: format!("{:?}", err),
message: format!("{:?}", err.root_cause()),
},
};

Expand Down Expand Up @@ -166,7 +175,7 @@ impl Connector for PollingHTTPConnector {
EmptyConfig {},
PollingHttpTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
method,
body,
poll_interval_ms: interval,
Expand All @@ -187,7 +196,7 @@ impl Connector for PollingHTTPConnector {
let description = format!("PollingHTTPSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down
17 changes: 10 additions & 7 deletions arroyo-connectors/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;

use anyhow::{anyhow, bail};
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, VarStr};
use axum::response::sse::Event;
use eventsource_client::Client;
use futures::StreamExt;
Expand All @@ -22,7 +22,7 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/sse/table.json");

import_types!(schema = "../connector-schemas/sse/table.json");
import_types!(schema = "../connector-schemas/sse/table.json", convert = { {type = "string", format = "var-str"} = VarStr });
const ICON: &str = include_str!("../resources/sse.svg");

pub struct SSEConnector {}
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Connector for SSEConnector {
let description = format!("SSESource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down Expand Up @@ -134,7 +134,7 @@ impl Connector for SSEConnector {
SseTable {
endpoint,
events,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
},
schema,
)
Expand Down Expand Up @@ -174,11 +174,14 @@ impl SseTester {
.map_err(|_| anyhow!("Endpoint URL is invalid"))?;

let headers = string_to_map(
self.config
&self
.config
.headers
.as_ref()
.map(|t| t.0.as_str())
.unwrap_or(""),
.map(|s| s.sub_env_vars())
.transpose()
.map_err(|e| anyhow!(e))?
.unwrap_or("".to_string()),
)
.ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))?;

Expand Down
30 changes: 21 additions & 9 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::convert::Infallible;
use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;

use arroyo_types::VarStr;
use axum::response::sse::Event;
use reqwest::{Client, Request};
use serde_json::json;
Expand All @@ -21,7 +22,7 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/webhook/table.json");

import_types!(schema = "../connector-schemas/webhook/table.json");
import_types!(schema = "../connector-schemas/webhook/table.json", convert = { {type = "string", format = "var-str"} = VarStr });
const ICON: &str = include_str!("../resources/webhook.svg");

pub struct WebhookConnector {}
Expand All @@ -47,8 +48,14 @@ impl WebhookConnector {
config: &WebhookTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let headers = config
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()
.map_err(|e| anyhow!(e))?;

let client = construct_http_client(&config.endpoint, headers)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -176,14 +183,19 @@ impl Connector for WebhookConnector {
) -> anyhow::Result<Connection> {
let endpoint = pull_opt("endpoint", options)?;

let headers = options
.remove("headers")
.map(|s| s.try_into())
.transpose()
.map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?;
let headers = options.remove("headers").map(|s| VarStr::new(s));

let table = WebhookTable { endpoint, headers };
let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?;

let client = construct_http_client(
&table.endpoint,
table
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()
.map_err(|e| anyhow!(e))?,
)?;
let _ = Self::construct_test_request(&client, &table)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
Expand Down
41 changes: 27 additions & 14 deletions arroyo-connectors/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, VarStr};
use axum::response::sse::Event;
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
Expand All @@ -25,7 +25,7 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/websocket/table.json");

import_types!(schema = "../connector-schemas/websocket/table.json");
import_types!(schema = "../connector-schemas/websocket/table.json", convert = { {type = "string", format = "var-str"} = VarStr });
const ICON: &str = include_str!("../resources/websocket.svg");

pub struct WebsocketConnector {}
Expand Down Expand Up @@ -79,16 +79,29 @@ impl Connector for WebsocketConnector {
}
};

let headers =
match string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or(""))
.ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))
{
Ok(headers) => headers,
Err(e) => {
send(true, true, format!("Failed to parse headers: {:?}", e)).await;
return;
}
};
let headers_str = match table
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()
.map_err(|e| anyhow!(e))
{
Ok(headers) => headers,
Err(e) => {
send(true, true, format!("{}", e)).await;
return;
}
};

let headers = match string_to_map(&headers_str.unwrap_or("".to_string()))
.ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))
{
Ok(headers) => headers,
Err(e) => {
send(true, true, format!("Failed to parse headers: {:?}", e)).await;
return;
}
};

let uri = match Uri::from_str(&table.endpoint.to_string()) {
Ok(uri) => uri,
Expand Down Expand Up @@ -215,7 +228,7 @@ impl Connector for WebsocketConnector {
let description = format!("WebsocketSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down Expand Up @@ -289,7 +302,7 @@ impl Connector for WebsocketConnector {
EmptyConfig {},
WebsocketTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
subscription_message: None,
subscription_messages,
},
Expand Down
1 change: 1 addition & 0 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ export function JsonForm({
}) {
let ajv = new Ajv();
ajv.addKeyword('isSensitive');
ajv.addFormat('var-str', { validate: () => true });
const memoAjv = useMemo(() => addFormats(ajv), [schema]);

const formik = useFormik({
Expand Down

0 comments on commit 20efd7e

Please sign in to comment.