# Autoscaling and concurrency in Ray Serve example
This notebook explains
1. How to expose a Python function as a Ray Serve API.
2. What parameters are available to tweak the performance of the Ray Serve application
3. How the Ray serve application scales when invoked concurrently

In [12]:
RAY_SERVE_URL = "http://ray-cluster-serve.mlnow.frenoid.com:30080"
ROUTE_PREFIX = "/mprimes"

In [1]:
import ray
import os

os.environ["RAY_ADDRESS"] = "ray://192.168.3.201:30101"

ray.init()

2025-03-01 17:38:00,985	INFO worker.py:1514 -- Using address ray://192.168.3.201:30101 set in the environment variable RAY_ADDRESS
2025-03-01 17:38:00,991	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.


0,1
Python version:,3.11.11
Ray version:,2.41.0
Dashboard:,http://10.244.2.9:8265


[33m(raylet, ip=10.244.1.10)[0m [2025-03-01 09:41:07,515 C 66 66] worker_pool.cc:1436:  Check failed: worker->GetAssignedJobId().IsNil() || worker->GetAssignedJobId() == pop_worker_request->job_id 
[33m(raylet, ip=10.244.1.10)[0m *** StackTrace Information ***
[33m(raylet, ip=10.244.1.10)[0m /home/ray/anaconda3/lib/python3.11/site-packages/ray/core/src/ray/raylet/raylet(+0xc17c7a) [0x5603a8830c7a] ray::operator<<()
[33m(raylet, ip=10.244.1.10)[0m /home/ray/anaconda3/lib/python3.11/site-packages/ray/core/src/ray/raylet/raylet(+0xc1a0c1) [0x5603a88330c1] ray::RayLog::~RayLog()
[33m(raylet, ip=10.244.1.10)[0m /home/ray/anaconda3/lib/python3.11/site-packages/ray/core/src/ray/raylet/raylet(+0x34afe0) [0x5603a7f63fe0] ray::raylet::WorkerPool::PopWorker()
[33m(raylet, ip=10.244.1.10)[0m /home/ray/anaconda3/lib/python3.11/site-packages/ray/core/src/ray/raylet/raylet(+0x34b6ef) [0x5603a7f646ef] ray::raylet::WorkerPool::PopWorker()
[33m(raylet, ip=10.244.1.10)[0m /home/ray/anaconda

## Mersenne Primes calculation function

In [4]:
@ray.remote
def generate_mersenne_primes(start, end):
    m_primes = []
    def lucas_lehmer(p):
        s = 4
        m = 2 ** p - 1
        for _ in range(p - 2):
            s = ((s * s) - 2) % m
        return s == 0

    def is_prime(number):
        """
        the efficiency of this doesn't matter much as we're
        only using it to test the primeness of the exponents
        not the mersenne primes themselves
        """

        if number % 2 == 0:
            return number == 2

        i = 3
        while i * i <= number:
            if number % i == 0:
                return False
            i += 2

        return True
    
    for i in range(start, end, 2):  # generate up to M20, found in 1961
        if is_prime(i) and lucas_lehmer(i):
            m_primes.append(2 ** i - 1)
    
    return m_primes

In [5]:
ref = generate_mersenne_primes.remote(1, 500)
ray.get(ref)

[7,
 31,
 127,
 8191,
 131071,
 524287,
 2147483647,
 2305843009213693951,
 618970019642690137449562111,
 162259276829213363391578010288127,
 170141183460469231731687303715884105727]

### Calling the function concurrently
You can do so using ray.remote and the futures package

In [8]:
import concurrent.futures

CONCURRENCY = 3
TOTAL_CALLS = 30

START = 1
STOP = 1000

def call_mprime(start: int, stop: int) -> list[int]:
    ref = generate_mersenne_primes.remote(start, stop)
    return ray.get(ref)

pool = concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY)

futures = []
for i in range(TOTAL_CALLS):
    futures.append(pool.submit(call_mprime, START, STOP))
    
responses, response_count = 0, 0
for f in futures:
    res = f.result()
    response_count += 1
    print(f"responses: {response_count}")
    #print(res)

responses: 1
responses: 2
responses: 3
responses: 4
responses: 5
responses: 6
responses: 7
responses: 8
responses: 9
responses: 10
responses: 11
responses: 12
responses: 13
responses: 14
responses: 15
responses: 16
responses: 17
responses: 18
responses: 19
responses: 20
responses: 21
responses: 22
responses: 23
responses: 24
responses: 25
responses: 26
responses: 27
responses: 28
responses: 29
responses: 30


## Deploying Mersenne Primes function Ray Serve application

### First startup  Ray Serve

In [9]:
from ray import serve

serve.start(http_options={"location": "EveryNode",
                          "host": "0.0.0.0",
                          "port": 8000,
                          "request_timeout_s": 180.0})

INFO 2025-03-01 17:39:07,295 serve 2116265 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.


### Declare and deploy the Ray Serve application

In [21]:
from ray import serve
from ray.serve.handle import DeploymentHandle
from starlette.requests import Request

@serve.deployment(
    ray_actor_options={"num_cpus": 1.0, "num_gpus": 0},
    autoscaling_config={
        "target_num_ongoing_requests_per_replica": 2,
        "min_replicas": 1,
        "initial_replicas": 1,
        "max_replicas": 4
    },
)
class MersennePrimes:
    def __init__(self):
        print("MersennePrimes is ready to serve")
    
    def generate_mersenne_primes(self, start, end) -> list[int]:    
        def lucas_lehmer(p):
            s = 4
            m = 2 ** p - 1
            for _ in range(p - 2):
                s = ((s * s) - 2) % m
            return s == 0

        def is_prime(number):
            """
            the efficiency of this doesn't matter much as we're
            only using it to test the primeness of the exponents
            not the mersenne primes themselves
            """

            if number % 2 == 0:
                return number == 2

            i = 3
            while i * i <= number:
                if number % i == 0:
                    return False
                i += 2

            return True
        
        m_primes = []
        for i in range(start, end, 2):  # generate up to M20, found in 1961
            if is_prime(i) and lucas_lehmer(i):
                m_primes.append(2 ** i - 1)

        return m_primes

    async def __call__(self, request: Request) -> list[str]:
        start = (await request.json())["start"]
        stop = (await request.json())["stop"]
        print(f"MersennePrimes called with {start} {stop}")
        m_primes = self.generate_mersenne_primes(start, stop)
        
        return m_primes

In [22]:
app = MersennePrimes.bind()
handle: DeploymentHandle = serve.run(app, name="MersennePrimes", route_prefix=ROUTE_PREFIX)

INFO 2025-03-01 17:41:07,301 serve 2116265 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
INFO 2025-03-01 17:41:21,365 serve 2116265 -- Application 'MersennePrimes' is ready at http://0.0.0.0:8000/mprimes.
INFO 2025-03-01 17:41:21,365 serve 2116265 -- Deployed app 'MersennePrimes' successfully.


### Declare and deploy the Ray Serve application

In [23]:
app_url = f"{RAY_SERVE_URL}{ROUTE_PREFIX}"

print(app_url)

http://ray-cluster-serve.mlnow.frenoid.com:30080/mprimes


In [24]:
import requests

start = 3
stop = 1000

res = requests.get(app_url,
                   json={"start":start, "stop": stop})
res

<Response [200]>

In [29]:
res.json()

[7,
 31,
 127,
 8191,
 131071,
 524287,
 2147483647,
 2305843009213693951,
 618970019642690137449562111,
 162259276829213363391578010288127,
 170141183460469231731687303715884105727,
 6864797660130609714981900799081393217269435300143305409394463459185543183397656052122559640661454554977296311391480858037121987999716643812574028291115057151,
 531137992816767098689588206552468627329593117727031923199444138200403559860852242739162502265229285668889329486246501015346579337652707239409519978766587351943831270835393219031728127]

### Call the Ray Serve application concurrently

Observe the replica scaling

In [38]:
import concurrent.futures
import random
from time import sleep

CONCURRENCY = 5
TOTAL_CALLS_PER_ITERATION = 15
ITERATIONS = 3

def invoke_mprime_application(start: int, stop: int) -> list[int]:
    res = requests.get(app_url,
                   json={"start":start, "stop": stop})
    
    return res.json()

for i in range(ITERATIONS):
    print(f"Begin iteration {i}")
    sleep_seconds = random.randint(1,60)
    start = random.randint(3, 100)
    stop = random.randint(240,4000)
    
    print(f"Params: sleep_Seconds {sleep_seconds}, start {start}, stop {stop}")
    pool = concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY)
    
    futures = []
    for _ in range(TOTAL_CALLS_PER_ITERATION):
        futures.append(pool.submit(invoke_mprime_application, start, stop))
        
    response_count = 0
    for f in futures:
        res = f.result() # Blocks until result is returned
        response_count += 1
        
    print(f"Responses received: {response_count}")
    print(f"Iteration {i} complete")
    sleep(sleep_seconds)

Begin iteration 0
Params: sleep_Seconds 39, start 12, stop 2981
Responses received: 15
Iteration 0 complete
Begin iteration 1
Params: sleep_Seconds 32, start 31, stop 2736
Responses received: 15
Iteration 1 complete
Begin iteration 2
Params: sleep_Seconds 22, start 69, stop 3084
Responses received: 15
Iteration 2 complete


## Cleanup

In [39]:
from ray import serve

serve.delete("MersennePrimes")

INFO 2025-03-01 17:55:04,600 serve 2116265 -- Deleting app ['MersennePrimes']


In [40]:
ray.shutdown()