# Tutorial de Ray

Ray provee una API para Python con funcionalidades para desarrollar y ejecutar aplicaciones distribuidas. Permite a los usuarios agilizar la ejecución de un codigo mediante la paralelización, es decir, que se encarga de que ciertos procesos se puedan ejecutar al mismo tiempo, y así evitar que los tiempos de espera sean mas extendidos. 

A continuación, veremos detalladamente algunos de los puntos mas esenciales de Ray.

## - Paralelización (Funciones remotas)

Con Ray las funciones se convierten en "funciones remotas", y son llamadas `Tareas` (o `Task` por su nombre en ingles). Estas funciones pueden ser ejecutadas de forma paralela. Para ello añadiremos "@ray.remote" sobre aquellas funciones que queramos transformar en funciones remotas. Este es un ejemplo de como convertiriamos una funcion regular de Python en una "función remota".

```python
# Función regular de Python.
def funcion_regular():
    return 1

# Función remota de Ray.
@ray.remote
def funcion_remota():
    return 1
```

A pesar de ser casi identicas a simple vista, una función remota tiene ciertas diferencias en comparación a una funcion regular de Python.

1. **Llamada a la función:** La función regular se llama con `funcion_regular()`, mientras que la funcion remota es invocada con `funcion_remota.remote()`.

2. **Retornar valores:** `funcion_regular` se ejecuta y retorna el valor de la función (`1`), mientras que `funcion_remota` retorna inmediatamente un `IdObjeto` (un "Futuro") y ejecuta la tarea en segundo plano. El resultado del Futuro retornado se puede obtener llamando `ray.get` on the `ObjectID`.

    ```python
    >>> funcion_regular()
    1  # La funcion regular retorna directamente el resultado de la función
    
    >>> funcion_remota.remote()
    IdObjeto(1c80d6937802cd7786ad25e50caf2f023c95e350) # La funcion remota devuelve el ID del resultado futuro
    
    >>> ray.get(funcion_remota.remote()) # Obtenemos el resultado haciendo ray.get sobre el ID retornado por la funcion remota
    1
    ```
    
3. **Paralelismo:** Las ejecuciones de `funcion_regular` son **en serie**, es decir se ejecutan de una en una:

    ```python
    result = 0
    for _ in range(4): # En este loop se llama a funcion_regular una vez por iteración.
        result += funcion_regular()
    assert result == 4
    ```
    Mientras que las ejecuciones de `funcion_remota` ocurren en **paralelo**:
    
    ```python
    results = []
    for _ in range(4): # En este loop funcion_remota se ejecuta las 4 veces de forma simultanea.
        results.append(funcion_remota.remote())
    assert sum(ray.get(results)) == 4 # obtenemos el resultado con ray.get
     ```
  
  
**Ejemplo:** A continuación veremos un ejemplo de código para ver como transformaríamos una funcion regular, en una funcion remota, y como esto mejora notoriamente los tiempos de ejecución.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import time

print('Ray importado con exito!')

Por defecto, Ray no ejecuta mas tareas al mismo tiempo que el numero de CPUs, pero este ejemplo requiere que se ejecuten cuatro tareas al mismo tiempo, por lo que le decimos a Ray que hay 4 CPUs.

In [None]:
ray.init(num_cpus=4, ignore_reinit_error=True)

# Aplicamos la funcion Sleep para mejorar la precision de las mediciones de tiempo que se utilizarán, porque es posible
# que algunos procesos todavia se esten iniciando en segundo plano. Esto se aplicará en todos los ejercicios del tutorial.
time.sleep(2.0)

In [None]:
# Esta es una función que simula un trabajo mas "pesado", que tomaría 1 segundo en ejecutar
def funcion_reg(i):
    time.sleep(1)
    return i

start_time = time.time()

resultados = []
for i in range(4):
  resultados.append(funcion_reg(i)) # Llamamos 4 veces a funcion regular y retornamos el valor de i. Esto debería tardar
                                    # unos 4 segundos (1 segundo por llamado)
  
duration = time.time() - start_time # Calculamos el tiempo que tomó en ejecutar esta tarea
print('Ejecutar esta iteración tomó {:.3f} segundos.'.format(duration))
print('Los resultados son:', resultados)

**Comparación:** Ahora veremos como se comporta la misma función, pero si le aplicamos Ray:

In [None]:
@ray.remote  # Agregamos el @ray.remote para indicar que es una funcion remota
def funcion_rem(i):
    time.sleep(1)
    return i

start_time = time.time()

resultados = []
for i in range(4):
  resultados.append(funcion_rem.remote(i)) # En esta linea estamos almacenando en el arreglo "resultados" los ID de los objetos
                                           # (futuros) retornados por la funcion remota (la cual es llamada con .remote).
                                           # Acá se realizarán los 4 llamados a funcion_rem de forma paralela.

resultados = ray.get(resultados) # Obtenemos el valor de los futuros aplicando el ray.get a los id oobtenidos previamente
duration = time.time() - start_time # Calculamos el tiempo de ejecución

print('Ejecutar esta iteracion tomó {:.3f} segundos.'.format(duration))
print('Los resultados son:', resultados)

**Resultado:** Se puede apreciar como con unos pocos cambios de codigo, la ejecución de la función remota fue mas rápida que la función regular, ya que al momento de ejecutar el loop "for", Ray realizó las 4 iteraciones en paralelo.

## - Procesamiento de datos en paralelo

La siguiente función remota, retorna el mismo argumento que se le pasa. Si le pasamos un objeto, el resultado obtenido por ray.get debería ser el mismo objeto, como se puede apreciar en el siguiente ejemplo:

```python
@ray.remote
def f(x): # Le pasamos un objeto por parametro
    return x # Retornamos el mismo objeto

>>> x1_id = f.remote(1) # El valor retornado por la función es el ID del objeto 
>>> ray.get(x1_id) # obtenemos su valor utilizando el ray.get
1

>>> x2_id = f.remote([1, 2, 3]) # Le pasamos un arreglo como argumento a la función, y esta retorna un id
                                # que referencia a ese arreglo
>>> ray.get(x2_id)
[1, 2, 3]
```

Tambien podemos pasar el ID de algun objeto por parametro en una funcion remota. Cuando se ejecute la funcion, Ray automaticamente convertira ese ID en el objeto al que esta referenciando.

```python
>>> y1_id = f.remote(x1_id) # Le pasamos el id del objeto por parametro y lo almacenamos dentro de una variable 
>>> ray.get(y1_id) # Al hacer ray.get obetenemos el objeto referenciado
1

>>> y2_id = f.remote(x2_id)
>>> ray.get(y2_id)
[1, 2, 3]
```

En este ultimo ejemplo, la tarea que crea `y1_id` no se ejecutará hasta que finalice de ejecutarse la tarea que crea `x1_id`.

**Ejemplo:** Ahora veremos un ejemplo de como  

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import time

import ray

In [None]:
ray.init(num_cpus=4, ignore_reinit_error=True)

time.sleep(2.0)

In [None]:
# Estas funciones simulan procesos mas complicados, usando el time.sleep para que tarden mas en ejecutar.
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

def compute_loss(features):
    num_data, dim = features.shape # retorna cantidad de elementos en cada dimension de la matriz
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2) 

In [None]:
start_time = time.time()
losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data(filename)
    normalized_data = normalize_data(data)
    features = extract_features(normalized_data)
    loss = compute_loss(features)
    losses.append(loss)
    
    inner_end = time.time()
    

print('Las perdidas son {}.'.format(losses) + '\n')
loss = sum(losses)

duration = time.time() - start_time

print('La perdida total es {}. Esto tardó {:.3f} segundos.'.format(loss, duration))

**Comparación:** Ahora probemos el mismo ejemplo, pero esta vez utilizando Ray, y veamos como afecta en el tiempo de ejecución

In [None]:
@ray.remote # Aplicamos el ray.remote sobre todas las funciones para convertirlas en funciones remotas
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

In [None]:
start_time = time.time()
losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss) 
    
    inner_end = time.time()
    
        
losses = ray.get(losses) # Obtenemos los objetos haciendo ray.get sobre los IDs
print('Las perdidas son {}.'.format(losses) + '\n')
loss = sum(losses)

duration = time.time() - start_time

print('La perdida total es {}. Esto tardó {:.3f} segundos.'.format(loss, duration))

## - Paralelismo anidado

En este punto veremos como crear tareas anidadas, llamando una funcion remota dentro de otra funcion remota. También veremos unos ejemplos para apreciar mejor como se reducen los tiempos de ejecución.

Al igual que las funciones regulares, las funciones remotas pueden llamar a otras funciones remotas, como se muestra en el siguiente ejemplo:

```python
@ray.remote
def f():
    return 1

@ray.remote
def g():
    # Llama a la funcion f 4 veces y retorna el arreglo con los IDs de los objetos.
    results = []
    for _ in range(4):
      results.append(f.remote())
    return results

@ray.remote
def h():
    # Llama a f 4 veces y antes de retornar el resultado debe esperar a que finalicen las 4 tareas,
    # entonces retornará un arreglo con los objetos.
    results = []
    for _ in range(4):
      results.append(f.remote())
    return ray.get(results)
```

Las funciones g() y h() retornarán los siguientes valores

```python

>>> ray.get(g.remote()) # g retorna un arreglo con los IDs de los objetos, ya que al llamar a f no se aplica ray.remote
[ObjectID(b1457ba0911ae84989aae86f89409e953dd9a80e),
 ObjectID(7c14a1d13a56d8dc01e800761a66f09201104275),
 ObjectID(99763728ffc1a2c0766a2000ebabded52514e9a6),
 ObjectID(9c2f372e1933b04b2936bb6f58161285829b9914)]

>>> ray.get(h.remote()) # h retorna el arreglo con los objetos que retorna f
[1, 1, 1, 1]
```

**Ejemplo:** Ahora veremos un codigo ejecutable para apreciar de mejor manera como se paraleliza un codigo con funciones anidadas.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import time

import ray

In [None]:
ray.init(num_cpus=9, ignore_reinit_error=True)

time.sleep(2.0)

In [None]:
def compute_gradient(data, current_model): # Simula una funcion que tardaría 0.03 segundos en ejecutar
    time.sleep(0.03)
    return 1

def train_model(hyperparameters):
    current_model = 0
    for _ in range(10):
        gradients = []
        for j in range(2):
            gradients.append(compute_gradient(j, current_model)) # Añade un 1 por iteración en el arreglo gradients
        current_model += sum(gradients) # Suma los valores dentro del arreglo gradients y los almacena en una variable

    return current_model # Retorna la suma total de 


In [None]:
# Aplicamos time.sleep para mejorar la precision en las mediciones de tiempo.
time.sleep(2.0)
start_time = time.time()


results = []
for hyperparameters in [{'learning_rate': 1e-1, 'batch_size': 100},
                        {'learning_rate': 1e-2, 'batch_size': 100},
                        {'learning_rate': 1e-3, 'batch_size': 100}]:
    results.append(train_model(hyperparameters))

end_time = time.time()
duration = end_time - start_time

print('El resultado es {}. Esto tardó {:.3f} segundos.'.format(results, duration))

**Comparación:** Veamos ahora como se comportaría este mismo codigo si le aplicamos Ray.

In [None]:
@ray.remote
def compute_gradient(data, current_model):
    time.sleep(0.03)
    return 1

@ray.remote
def train_model(hyperparameters):
    current_model = 0
    for _ in range(10):
        gradients = []
        for j in range(2):
            gradients.append(compute_gradient.remote(j, current_model))
        current_model += sum(ray.get(gradients))


    return current_model

In [None]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Run some hyperparaameter experiments.
results = []
for hyperparameters in [{'learning_rate': 1e-1, 'batch_size': 100},
                        {'learning_rate': 1e-2, 'batch_size': 100},
                        {'learning_rate': 1e-3, 'batch_size': 100}]:
    results.append(train_model.remote(hyperparameters))


results = ray.get(results)
end_time = time.time()
duration = end_time - start_time

print('El resultado es {}. Esto tardó {:.3f} segundos.'.format(results, duration))

## - Actores

En Ray, un Actor es muy parecido a lo que es una clase normal de Python. Se inicializa de la misma manera (con la función `__init__` ) y puede contener funciones internas, a las que se deben acceder con los metodos remotos.

Para crear un Actor solo debemos colocar el `@ray.remote` sobre una clase, como se muestra en el siguiente ejemplo:

```python
@ray.remote
class Example(object):
    def __init__(self, x):
        self.x = x
    
    def set(self, x):
        self.x = x
    
    def get(self):
        return self.x
```
Un Actor se diferencia de una clase regular de Python en algunos puntos muy especificos que se presentan a continuación:

1. **Instanciación:** Una clase regular de Python es instanciada con: `e = Example(1)` 

    Mientras que un Actor se instancia con el metodo remoto:  `e = Example.remote(1)`
    

2. **Invocación de metodos:** Los metodos de una clase regular serían invocados con `e.set(2)` o `e.get()`, mientras que los metodos de un Actor serían invocados de la siguiente manera:

    ```python
        >>> e.set.remote(2)
        ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)
    
        >>> e.get.remote()
        ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)
    ```

3. **Retornar valores:** Los metodos de un Actor retornar de manera inmediata el ID del objeto resultante. Este resultado se puede obtener mediante el `ray.get`.

    ```python
        >>> ray.get(e.set.remote(2))
        None
    
        >>> ray.get(e.get.remote())
        2
    ```

**Ejemplo:** Ahora veremos un ejemplo de como aplicar los Actores de Ray:

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from collections import defaultdict
import numpy as np
import time

import ray

print('Ray importado con exito!')

In [None]:
ray.init(num_cpus=4, ignore_reinit_error=True)

In [None]:
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

In [None]:
# Creamos dos objetos de la clase Foo
f1 = Foo()
f2 = Foo()

In [None]:
start_time = time.time()

f1.reset()
f2.reset()

results = []
for _ in range(5): # Incrementaremos 1 en 1 el estado de los objetos hasta llegar a 5.
    results.append(f1.increment())
    results.append(f2.increment())

duration = time.time() - start_time # Calculamos el tiempo de ejecución de este ejemplo.

print('Los resultados son: {}. Esto tardó {:.3f} segundos'.format(results, duration))

**Comparación:** Ahora veremos como funciona si usamos actores en lugar de clases

In [None]:
@ray.remote
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

In [None]:
# Creamos dos actores
f1 = Foo.remote()
f2 = Foo.remote()

In [None]:
start_time = time.time()

# Reiniciamos el estado del actor y asi podemos correr esta celda multiples veces sin cambiar los resultados.
f1.reset.remote()
f2.reset.remote()

results = []
for _ in range(5):
    results.append(f1.increment.remote()) # Llenamos el arreglo results con los ID retornados por los metodos del actor
    results.append(f2.increment.remote())    
    
results = ray.get(results) # Obtenemos el valor de los resultados retornados por los metodos del Actor
                           # usando ray.get

duration = time.time() - start_time

print('Los resultados son: {}. Esto tardó {:.3f} segundos'.format(results, duration))

## - Compartir referencias a un Actor

Supongamos que en algun momento queramos tener varias tareas remotas que invocan metodos en el mismo actor. Para esto, podemos pasar el identificador del Actor como un argumento para las tareas.

Primero creamos el actor de la siguiente manera:

```python
@ray.remote
class Actor(object):
    def method(self):
        pass

# Creamos el actor
actor = Actor.remote()
```
Luego, definimos una funcion remota que reciba un identificador del actor como argumento


```python
@ray.remote
def f(actor):
    # Podemos invocar un metodo en el actor y esperar sus resultados
    return ray.get(actor.method.remote())
```

Despues podemos invocar la funcion remota varias veces y pasarle el identificador del actor como argumento

```python
# Cada una de estas funciones remotas invocará metodos en el mismo actor.
f.remote(actor)
f.remote(actor)
f.remote(actor)
```

**Ejemplo:** Ahora veremos un ejemplo de como compartir referencias usando actores

In [None]:
@ray.remote
class LoggingActor(object):
    def __init__(self):
        self.logs = defaultdict(lambda: [])
    
    def log(self, index, message):
        self.logs[index].append(message)
    
    def get_logs(self):
        return dict(self.logs)


In [None]:
logging_actor = LoggingActor.remote()

In [None]:
@ray.remote
def run_experiment(experiment_index, logging_actor):
    for i in range(60):
        time.sleep(1)
        # Push a logging message to the actor.
        logging_actor.log.remote(experiment_index, 'Iteración numero: {}'.format(i))

In [None]:
experiment_ids = [run_experiment.remote(i, logging_actor) for i in range(3)]

In [None]:
logs = logging_actor.get_logs.remote()
logs = ray.get(logs)

logs

**Resultado:** Prueba ejecutando la ultima instruccion reiteradas veces y mira como cambian los resultados de salida.

## - Ray.wait

Luego de ejecutar un numero de tareas, en ciertos casos querrás saber cuales fueron las que terminaron primero en ejecutar. Esto se puede resolver usando `ray.wait`. Esta funcion se utiliza de la siguiente manera:

    ids_listos, ids_restantes = ray.wait(ids_totales, num_retornos = 1, timeout = None)
    
Donde

**Argumentos:**
- ids_totales = Es una lista de ids de objetos.
- num_retornos = Este es el numero maximo de IDs a esperar que terminen.
- timeout = Esta es la cantidad de tiempo maxima (en milisegundos) a esperar. Ray.wait esperará hasta que los num_retornos objetos esten listos, o hasta que el tiempo estipulado en timeout haya pasado.

**Valores retornados:**
- ids_listos = Esta es una lista de objetos que estan disponibles.
- ids_restantes = Es la lista de objetos que que restan de los ids_totales.

En pocas palabras, a este metodo le debemos pasar como argumento una lista de ids, y ray.wait guardará en ids_listos todos aquellos objetos que ya esten disponibles, y en ids_restantes todos los que van quedando.

**Ejemplo:** Ahora veremos un ejemplo de como aplicar ray.wait para cambiar los resultados a obtener.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

print('Ray importado con exito!')

In [None]:
ray.init(num_cpus=6, ignore_reinit_error=True)

time.sleep(2.0)

In [None]:
@ray.remote
def f(i):
    np.random.seed(5 + i)
    x = np.random.uniform(0, 4)
    time.sleep(x)
    return i, time.time()

In [None]:
start_time = time.time()

# Esto ejecuta la funcion f 6 veces, lo que le da a cada iteración un resultado aleatorio de tiempo de ejecución

result_ids = [f.remote(i) for i in range(6)]

initial_results = ray.get(result_ids[:3]) # Se guardan en initial_results los resultados de las primeras 3 iteraciones

print(initial_results)

end_time = time.time()
duration = end_time - start_time

In [None]:
remaining_results = ray.get(result_ids[3:]) # Se guardan en remaining_results los resultados de las ultimas 3 iteraciones
print(remaining_results)

In [None]:
initial_indices = [result[0] for result in initial_results]
initial_times = [result[1] for result in initial_results]
remaining_indices = [result[0] for result in remaining_results]
remaining_times = [result[1] for result in remaining_results]

print('Este ejemplo tardó {} segundos.'.format(duration))

Ahora el mismo codigo, pero utilizando la funcion ray.wait

In [None]:
@ray.remote
def f(i):
    np.random.seed(5 + i)
    x = np.random.uniform(0, 4)
    time.sleep(x)
    return i, time.time()

In [None]:
start_time = time.time()

result_ids = [f.remote(i) for i in range(6)]

# Ahora con ray.wait, guardaremos en initial_results el id de los primeros 3 resultados en finalizar, y en
# remaining_results, los id de los ultimos 3
initial_results, remaining_results = ray.wait(result_ids, num_returns = 3, timeout = None)
initial_results = ray.get(initial_results) # Obtenemos los valores de initial_results usando ray.get

print(initial_results)

end_time = time.time()
duration = end_time - start_time

In [None]:
remaining_results = ray.get(remaining_results) # Obtenemos los valores de remaining_results usando ray.get
print(remaining_results)

In [None]:
initial_indices = [result[0] for result in initial_results]
initial_times = [result[1] for result in initial_results]
remaining_indices = [result[0] for result in remaining_results]
remaining_times = [result[1] for result in remaining_results]

print('Este ejemplo tardó {} segundos.'.format(duration))

## - Procesar tareas en orden de termino con ray.wait

Ahora veremos un ejemplo de como utilizar ray.wait para procesar tareas en el orden en que terminan.
Primero veamos un codigo sin ray.wait y veamos como procesa sus tareas:

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [None]:
ray.init(num_cpus=5,  ignore_reinit_error=True)

In [None]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5)) # Esta funcion se pausará un cantidad de tiempo aleatorio, entre 0 y 5 segundos.
    return time.time()

In [None]:
start_time = time.time()

remaining_result_ids = [f.remote() for _ in range(10)]

# Get the results.
results = []
while len(remaining_result_ids) > 0: 
    
    #Con esta iteración mostraremos los resultados por orden de indice, es decir primero el i=0, luego i=1, y asi sucesivamente
    
    result_id = remaining_result_ids[0]
    remaining_result_ids = remaining_result_ids[1:]
    result = ray.get(result_id)
    results.append(result)
    print('Este procesamiento terminó en {} segundos.'
          .format(result - start_time))    

end_time = time.time()
duration = end_time - start_time

In [None]:
print('Este ejemplo tardó {} segundos.'.format(duration))

En este ejemplo de arriba podemos ver como los resultados fueron procesados en el orden que son ejecutados, y no en el orden en que terminaron. Esto se puede cambiar utilizando ray.wait de la siguiente forma:

In [None]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

In [None]:
start_time = time.time()

remaining_result_ids = [f.remote() for _ in range(10)]

results = []
while len(remaining_result_ids) > 0:
    
    # Ahora usando ray.wait mostraremos los procesos en el orden en que se terminan de ejecutar.
    
    result_id, remaining_result_ids = ray.wait(remaining_result_ids, num_returns = 1, timeout = None)
    result = ray.get(result_id)[0]
    results.append(result)
    print('Este procesamiento terminó en {} segundos.'
          .format(result - start_time))    

end_time = time.time()
duration = end_time - start_time

In [None]:
print('Este ejemplo tardó {} segundos.'.format(duration))

Podemos ver ahora como los resutaldos fueron procesados en el orden en que finalizaron.