Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Extension {
.map(|s| s.config().clone())
.unwrap_or_else(default_config_producer);

let scheduler_url = format!("http://localhost:{}", addr.port());
let scheduler_url = format!("http://127.0.0.1:{}", addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Expand Down
133 changes: 110 additions & 23 deletions ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::error::Error;
use std::path::PathBuf;

use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::{
BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
};
Expand Down Expand Up @@ -99,25 +100,36 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
}
}

/// starts a ballista cluster for integration tests
/// starts a ballista cluster for integration tests (pull-staged scheduling).
#[allow(dead_code)]
pub async fn setup_test_cluster() -> (String, u16) {
setup_test_cluster_with_scheduling(TaskSchedulingPolicy::PullStaged).await
}

/// starts a ballista cluster using the given [`TaskSchedulingPolicy`].
#[allow(dead_code)]
pub async fn setup_test_cluster_with_scheduling(
scheduling_policy: TaskSchedulingPolicy,
) -> (String, u16) {
let config = SessionConfig::new_with_ballista();
let default_codec = BallistaCodec::default();

let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.expect("scheduler to be created");
let addr = ballista_scheduler::standalone::new_standalone_scheduler_with_scheduling(
scheduling_policy,
)
.await
.expect("scheduler to be created");

let host = "localhost".to_string();
let host = "127.0.0.1".to_string();

let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor(
ballista_executor::new_standalone_executor_with_scheduling_policy(
scheduler,
config.ballista_standalone_parallelism(),
default_codec,
scheduling_policy,
)
.await
.expect("executor to be created");
Expand All @@ -127,26 +139,48 @@ pub async fn setup_test_cluster() -> (String, u16) {
(host, addr.port())
}

/// starts a ballista cluster for integration tests
/// starts a ballista cluster using push-staged scheduling (default executor policy).
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
let config = SessionConfig::new_with_ballista();
pub async fn setup_test_cluster_push_scheduling() -> (String, u16) {
setup_test_cluster_with_scheduling(TaskSchedulingPolicy::PushStaged).await
}

let addr = ballista_scheduler::standalone::new_standalone_scheduler_from_state(
&session_state,
/// starts a cluster with [`SessionState`] (pull scheduling).
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
setup_test_cluster_with_state_and_scheduling(
session_state,
TaskSchedulingPolicy::PullStaged,
)
.await
.expect("scheduler to be created");
}

let host = "localhost".to_string();
/// starts a ballista cluster with selectable [`TaskSchedulingPolicy`].
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state_and_scheduling(
session_state: SessionState,
scheduling_policy: TaskSchedulingPolicy,
) -> (String, u16) {
let config = SessionConfig::new_with_ballista();

let addr =
ballista_scheduler::standalone::new_standalone_scheduler_from_state_with_scheduling_policy(
&session_state,
scheduling_policy,
)
.await
.expect("scheduler to be created");

let host = "127.0.0.1".to_string();

let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor_from_state(
ballista_executor::new_standalone_executor_from_state_with_scheduling_policy(
scheduler,
config.ballista_standalone_parallelism(),
&session_state,
scheduling_policy,
)
.await
.expect("executor to be created");
Expand All @@ -156,11 +190,39 @@ pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (Stri
(host, addr.port())
}

/// starts a cluster with push-staged scheduling and a custom session state.
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state_push_scheduling(
session_state: SessionState,
) -> (String, u16) {
setup_test_cluster_with_state_and_scheduling(
session_state,
TaskSchedulingPolicy::PushStaged,
)
.await
}

#[allow(dead_code)]
pub async fn setup_test_cluster_with_builders(
config_producer: ConfigProducer,
runtime_producer: RuntimeProducer,
session_builder: SessionBuilder,
) -> (String, u16) {
setup_test_cluster_with_builders_and_scheduling(
config_producer,
runtime_producer,
session_builder,
TaskSchedulingPolicy::PullStaged,
)
.await
}

#[allow(dead_code)]
pub async fn setup_test_cluster_with_builders_and_scheduling(
config_producer: ConfigProducer,
runtime_producer: RuntimeProducer,
session_builder: SessionBuilder,
scheduling_policy: TaskSchedulingPolicy,
) -> (String, u16) {
let config = config_producer();

Expand All @@ -171,26 +233,29 @@ pub async fn setup_test_cluster_with_builders(
datafusion_proto::protobuf::PhysicalPlanNode,
> = BallistaCodec::new(logical, physical);

let addr = ballista_scheduler::standalone::new_standalone_scheduler_with_builder(
session_builder,
config_producer.clone(),
codec.clone(),
)
.await
.expect("scheduler to be created");
let addr =
ballista_scheduler::standalone::new_standalone_scheduler_with_builder_and_policy(
session_builder,
config_producer.clone(),
codec.clone(),
scheduling_policy,
)
.await
.expect("scheduler to be created");

let host = "localhost".to_string();
let host = "127.0.0.1".to_string();

let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor_from_builder(
ballista_executor::new_standalone_executor_from_builder_with_scheduling_policy(
scheduler,
config.ballista_standalone_parallelism(),
config_producer,
runtime_producer,
codec,
Default::default(),
scheduling_policy,
)
.await
.expect("executor to be created");
Expand Down Expand Up @@ -234,6 +299,15 @@ pub async fn remote_context() -> SessionContext {
.unwrap()
}

/// Remote [`SessionContext`] against a throwaway cluster using push-staged scheduling.
#[allow(dead_code)]
pub async fn remote_context_push_scheduling() -> SessionContext {
let (host, port) = setup_test_cluster_push_scheduling().await;
SessionContext::remote(&format!("df://{host}:{port}"))
.await
.unwrap()
}

#[allow(dead_code)]
pub async fn standalone_context_with_state() -> SessionContext {
let config = SessionConfig::new_with_ballista();
Expand All @@ -257,6 +331,19 @@ pub async fn remote_context_with_state() -> SessionContext {
.unwrap()
}

#[allow(dead_code)]
pub async fn remote_context_with_state_push_scheduling() -> SessionContext {
let config = SessionConfig::new_with_ballista();
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let (host, port) = setup_test_cluster_with_state_push_scheduling(state.clone()).await;
SessionContext::remote_with_state(&format!("df://{host}:{port}"), state)
.await
.unwrap()
}

#[ctor::ctor(unsafe)]
fn init() {
// Enable RUST_LOG logging configuration for test
Expand Down
Loading
Loading