Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to add and remove backends to a LoadBalancer #291

Open
JosiahParry opened this issue Jun 17, 2024 · 10 comments
Open

Ability to add and remove backends to a LoadBalancer #291

JosiahParry opened this issue Jun 17, 2024 · 10 comments
Labels
question Further information is requested

Comments

@JosiahParry
Copy link

What is the problem your feature solves, or the need it fulfills?

A LoadBalancer contains (private) backends. These backends are static and cannot be updated through the LoadBalancer struct.

Request:

provide an interface for adding and removing backends from a LoadBalancer

Reasoning:

I have a service that is serving an (R) application e.g. localhost:9000. When demand increases, I want to spawn a new instance localhost:9001 and add that instance to the load balancer. When demand subsides, I would like to despawn and remove localhost:9001 from the LoadBalancer.

Describe the solution you'd like

A method or trait that allows me to add a Backend to a LoadBalancer struct.

Describe alternatives you've considered

I've tried utilizing the ServiceDiscovery trait with a custom struct that is based on the Static struct. However, I have not had success with this. There are Send issues and what things I have gotten to work has caused the reverse proxy to just hang. So my Rust and Pingora-fu have failed me

#[derive(Default, Debug)]
pub struct ShinyListeners {
    backends: Arc<RwLock<BTreeMap<Backend, Arc<Child>>>>,
}

impl ShinyListeners {
    /// return the collection to backends
    pub fn get(&self) -> BTreeMap<Backend, Arc<Child>> {
        let backends = self.backends.read().unwrap();
        backends.clone()
    }
}

#[async_trait::async_trait]
impl ServiceDiscovery for ShinyListeners {
    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
        let health = HashMap::new();
        let mut backends = BTreeSet::new();
        for (backend, _) in self.get().iter() {
            backends.insert(backend.clone());
        }
        Ok((backends, health))
    }
}

Additional context

This could include references to documentation or papers, prior art, screenshots, or benchmark
results.

@luizfonseca
Copy link

Interesting enough, the Static struct has pub(crate) fns to add/remove backends:

pub(crate) fn set(&self, backends: BTreeSet<Backend>) {

So in theory you could copy this implementation (considering the data race) and create your own struct that is able to remove them:

let backends = ArcSwap::new(Arc::new(Backend {...})); 
let discovery = YourStaticStruct::new(Backends { backends });
let your_load_balancer = LoadBalancer::from_backends(discovery.backends);

// Your struct has a function to swap the backend on removal:
discovery.remove_backend(backend_to_remove); // This is probably a .store in a arcswap
      
discovery.add_backend(backend_to_add); // same here

your_load_balancer.update().now_or_never()
       

But you will either need to use ArcSwap or DashMap to be able to add or remove to cascade changes (or the rwlock if you can)

These are mostly theoretical, I haven't tested these ^ -- maybe someone directly from CF has more details.

@JosiahParry
Copy link
Author

The suggest for a DashMap is quite helpful. I'm getting somewhere with it. I've tried implementing a Dynamic struct like so

#[derive(Clone, Debug)]
pub struct Dynamic {
    pub backends: DashSet<Backend>,
}

impl Dynamic {
    /// Create a new boxed [Dynamic] service discovery with the given backends.
    pub fn new(backends: DashSet<Backend>) -> Box<Self> {
        Box::new(Dynamic { backends })
    }

    pub fn get(&self) -> DashSet<Backend> {
        self.backends.clone()
    }

    pub fn add(&self, backend: Backend) {
        self.backends.insert(backend);
    }

    pub fn remove(&self, backend: &Backend) {
        self.backends.remove(backend);
    }
}

which I wanted to use alongside with my own wrapper struct

pub struct MyService {
    pub backends: Dynamic,
    pub load_balancer: LoadBalancer<RoundRobin>,
}

but the challenge is that since the load_balancer takes the Box<dyn ServiceDiscovery> so getting a struct to store the same Dynamic as that was consumed by the LoadBalancer doesn't seem easily feasible from my POV

For example I tried creating it like:

impl MyService { 
    pub fn new(entrypoint: &str) -> Self {
        // use Arc for cloning the Dynamic to retain after
       // being consumed by the 
        let arc_dynamic = Arc::new(Dynamic {
            backends: DashSet::new(),
        });
        let xx = arc_dynamic.as_ref().clone().to_owned();
        
        let dynamic = Box::new(arc_dynamic.as_ref().clone());
        let backends = Backends::new(dynamic);
        let lb = LoadBalancer::from_backends(backends);

        Self {
            backends: xx,
            load_balancer: lb,
        }      
}

But the issue here is that the LoadBalancer doesn't get updated when the Dynamic does e.g

// from reverse proxy trait impl
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        println!("Dynamic: {:?}", self.0.backends);
        println!(
            "Load Balancer: {:?}",
            self.0.load_balancer.backends().get_backend()
        );
// .... truncated 
// outputs: 
#> Dynamic: Dynamic { backends: {Backend { addr: Inet(0.0.0.0:35462), weight: 1 }: (), Backend { addr: Inet(0.0.0.0:42913), weight: 1 }: ()} }
#> Load Balancer: {}

For the time being, though, I can develop mimicking a random load balancing algorithm by grabbing a random backend from the Dynamic which otherwise works!

@eaufavor eaufavor added the question Further information is requested label Jun 20, 2024
@eaufavor
Copy link
Member

eaufavor commented Jun 20, 2024

Sounds like you (almost) got it.

I would do something like this

struct Discovery(Arc<DashSet<Backend>>);
struct UpdateHandler(Arc<DashSet<Backend>>);

fn create() -> (Discovery, UpdateHandler) {
      let backends = Arc::new(DashSet::new());
      (Discovery(backends.clone()), UpdateHandler(backends))
}

impl ServiceDiscovery for Discovery {...}

Here Discovery and UpdateHandler share the same DashSet. So you can add/remove backends to the UpdateHandler. That update will be reflected in the Discovery and propagates to the LoadBalancer

@JosiahParry
Copy link
Author

Thanks for the suggestion! I've adapted this approach and find that the UpdateHandler still does not update the LoadBalancer itself even after calling lb.update().await.

pub struct Discovery(Arc<DashMap<Backend, Child>>);
pub struct UpdateHandler(Arc<DashMap<Backend, Child>>);

fn create_handlers() -> (Discovery, UpdateHandler) {
    let instances = Arc::new(DashMap::new());
    let discovery = Discovery(instances.clone());
    let update_handler = UpdateHandler(instances);

    (discovery, update_handler)
}

#[async_trait::async_trait]
impl ServiceDiscovery for Discovery {
    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
        // no readiness
        let health = HashMap::new();

        // initialize an empty BTreeSet
        let mut res = BTreeSet::new();

        // populate the BTreeSet
        for backend in self.0.iter() {
            res.insert(backend.key().clone());
        }

        Ok((res, health))
    }
}

The loadbalancer is created as

pub struct TestApp {
    pub loadbalancer: LoadBalancer<RoundRobin>,
    pub lb_handler: UpdateHandler,
}

impl TestApp {
    pub fn new() -> Self {
        let (discovery, lb_handler) = create_handlers();
        let loadbalancer = LoadBalancer::from_backends(Backends::new(Box::new(discovery)));

        Self {
            loadbalancer,
            lb_handler,
        }
    }

As far as i can tell, there's nothing quite missing here 🤔

Copy link

This question has been stale for a week. It will be closed in an additional day if not updated.

@github-actions github-actions bot added the stale label Jun 28, 2024
@JosiahParry
Copy link
Author

As far as I know this is still not possible.

@github-actions github-actions bot removed the stale label Jun 29, 2024
Copy link

github-actions bot commented Jul 7, 2024

This question has been stale for a week. It will be closed in an additional day if not updated.

@github-actions github-actions bot added the stale label Jul 7, 2024
@JosiahParry
Copy link
Author

still relevant

@github-actions github-actions bot removed the stale label Jul 8, 2024
@dalton-oliveira
Copy link

dalton-oliveira commented Jul 11, 2024

@JosiahParry Have you included the update_frequency on the load balancer ?

let mut loadbalancer = LoadBalancer::from_backends(Backends::new(Box::new(discovery)));
loadbalancer.update_frequency = Some(Duration::from_secs(1));

I have tested and it seems to work. In this example, the first backend is added on a separate thread and the second one added 10s later:

use std::{
    collections::{BTreeSet, HashMap},
    sync::Arc,
    thread,
    time::Duration,
};

use async_trait::async_trait;
use dashmap::DashSet;
use pingora::{
    lb::{discovery::ServiceDiscovery, Backend, Backends, LoadBalancer},
    prelude::RoundRobin,
    protocols::l4::socket::SocketAddr,
    proxy::{http_proxy_service, ProxyHttp, Session},
    server::Server,
    services::background::background_service,
    upstreams::peer::HttpPeer,
    Result,
};

pub struct Discovery(Arc<DashSet<Backend>>);

#[async_trait]
impl ServiceDiscovery for Discovery {
    async fn discover(&self) -> pingora::Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
        let mut res = BTreeSet::new();
        for backend in self.0.iter() {
            res.insert(backend.clone());
        }
        Ok((res, Default::default()))
    }
}

pub struct TestApp {
    pub loadbalancer: Arc<LoadBalancer<RoundRobin>>,
}

#[async_trait]
impl ProxyHttp for TestApp {
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        let upstream = self
            .loadbalancer
            .select(b"", 256) // hash doesn't matter for round robin
            .unwrap();

        println!("upstream peer is: {upstream:?}");
        // Set SNI to one.one.one.one
        let peer = Box::new(HttpPeer::new(
            upstream,
            false,
            "one.one.one.one".to_string(),
        ));
        Ok(peer)
    }
}

fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();

    let backends = Arc::new(DashSet::new());
    let discovery = Discovery(backends.clone());
    let mut loadbalancer = LoadBalancer::from_backends(Backends::new(Box::new(discovery)));
    loadbalancer.update_frequency = Some(Duration::from_secs(1));

    let bg_service = background_service("lb service", loadbalancer);

    let app = TestApp {
        loadbalancer: bg_service.task(),
    };

    let mut router_service = http_proxy_service(&my_server.configuration, app);
    router_service.add_tcp("0.0.0.0:6188");

    my_server.add_service(router_service);
    my_server.add_service(bg_service);

    // inserts a second backend after 10s
    thread::spawn(move || {
        let addrs: [SocketAddr; 2] = ["[::1]:4174".parse().unwrap(), "[::1]:4173".parse().unwrap()];
        for addr in addrs.iter().cloned() {
            backends.insert(Backend { addr, weight: 1 });
            thread::sleep(Duration::from_secs(10));
        }
    });
    my_server.run_forever();
}

@JosiahParry
Copy link
Author

For me the issue was calling discovery() directly which did not work. Because I needed to provision the backend and then use it immediately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants