Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fitness function issues with Multiprocessing #145

Closed
Aleksy9 opened this issue Nov 17, 2022 · 7 comments
Closed

Fitness function issues with Multiprocessing #145

Aleksy9 opened this issue Nov 17, 2022 · 7 comments
Labels
bug Something isn't working

Comments

@Aleksy9
Copy link

Aleksy9 commented Nov 17, 2022

Hi everyone,

I am using the parallel processing feature and am encountering and issue where if I define the fitness function using closure in a class (to pass additional data whilst respecting the number of arguments the function can take) and use parallel processing with the "process" flag, no errors are raised and the program just stays stuck indefinitely. This does not happen when using "thread" or when not using parallel processing.

I have made a simple example of the setup I have below to show how I define my fitness function.

import pygad

class Test:
    def __init__(self):
        self.data = 5
        """ Input data needed for fitness function here """

    def outer_fitness_func(self):
        
        def fitness_func(solution, sol_idx):
            """calculations"""
            fitness_value = self.data
            return fitness_value

        return fitness_func

test = Test()
ga_instance = pygad.GA(num_generations=10,
                       num_parents_mating=3,
                       sol_per_pop=5,
                       num_genes=10,
                       fitness_func=test.outer_fitness_func(),
                       parallel_processing=["process", 2])

if __name__ == '__main__':

    ga_instance.run()

Not using "process" is not feasible for my application as it significantly improves the performance. My current fix is to allow an extra argument to be passed in the fitness function and pass all my data through that. However, this is not very clean and I can still not include my fitness function into a class. Any solutions to this?

@borisarloff
Copy link

Please note that in your example you are setting fitness_func to a call rather than to a reference of the fitness function.
Change fitness_func=test.outer_fitness_func() to fitness_func=test.outer_fitness_func.
In your case neither "thread", "process", or "None" should work.

Please try this change and post the outcome if that does not solve your problem.

@Aleksy9
Copy link
Author

Aleksy9 commented Dec 1, 2022

Thank you for your response Boris!

I do believe though that using test.outer_fitness_func() is the correct way of proceeding as I am using a closure to wrap around the fitness function. If I were to set fitness_func=test.outer_fitness_func, I would be passing the outer function as the fitness function which is not the intended behaviour and it would return a method instead of the fitness value during calculations.

@borisarloff
Copy link

Sorry Alexis, I did not pay attention to your fitness function. You are correct.

Then, looking at pygad.py source code lines 1259-1278, is where parallel processing is executed. Whether "thread" or "process" is the same code, with the exception of ExecutorClass refering to ThreadPoolExecutor or ProcessPoolExecutor, correspondingly. So it should work the same way.

However, you indicated that "This does not happen when using "thread" or when not using parallel processing". If that is the case, I would (logically) surmise that it is working with "process" too but the return fitness_value ends in a separate process space. It probably looks like it is hanging because it is. The pygad method is still waiting for the fitness value to be returned. However, it was returned but in a different memory space.

I am somewhat curious and would try to replicate this over the weekend. Perhaps changing the code executor.map() to executor.submit() would at least give more control over the process and to query its status. The executor.map() is simply a multiprocess version of Python's map(); with not much control over it.

BTW, I use multiprocessing for speed-up too, but I wrap the entire pygad.GA() call instead of using its parallel_processing option.

@borisarloff
Copy link

borisarloff commented Dec 3, 2022

Alexis, you need to add a global declaration to the closure method as below. As suspected, it was a memory space issue, not exactly the way I thought but still a namespace issue. Under multiprocessing, your local space is not visible, while under threading it is all in the same memory space.

def outer_fitness_func(self):

   global fitness_func
    
   def fitness_func(solution, sol_idx):

@BenoitMiquey
Copy link

BenoitMiquey commented Dec 14, 2022

Hi @Aleksy9 ,

I also get somes problems using multithreading. but I've found this piece of code wich have been writen a few month before the feature parallel_processing.

Try this .

This solution works to fix problems in my issue.

@ahmedfgad ahmedfgad added the bug Something isn't working label Feb 25, 2023
@ahmedfgad
Copy link
Owner

This is not an issue in PyGAD. I just used Keras with multiprocessing and the issue exists. This is a reproducible example. I set a random seed to NumPy to make sure that there is no randomness across the different runs.

I also posted a question at StackOverflow: https://stackoverflow.com/questions/75606326/python-parallel-processing-with-threadpoolexecutor-gives-wrong-results-with-kera

import tensorflow.keras
import numpy
import concurrent.futures

numpy.random.seed(1)
tensorflow.random.set_seed(1)

def create_rand_weights(model, num_models):
    random_model_weights = []
    for model_idx in range(num_models):
        random_weights = []
        for layer_idx in range(len(model.weights)):
            layer_shape = model.weights[layer_idx].shape
            if len(layer_shape) > 1:
                layer_weights = numpy.random.rand(layer_shape[0], layer_shape[1])
            else:
                layer_weights = numpy.random.rand(layer_shape[0])
            random_weights.append(layer_weights)
        random_weights = numpy.array(random_weights, dtype=object)
        random_model_weights.append(random_weights)
    
    random_model_weights = numpy.array(random_model_weights)
    return random_model_weights

def model_error(model_weights):
    global data_inputs, data_outputs, model
    model.set_weights(model_weights)
    predictions = model.predict(data_inputs)
    mae = tensorflow.keras.losses.MeanAbsoluteError()
    abs_error = mae(data_outputs, predictions).numpy() + 0.00000001
    return abs_error

input_layer  = tensorflow.keras.layers.Input(3)
dense_layer1 = tensorflow.keras.layers.Dense(5, activation="relu")(input_layer)
output_layer = tensorflow.keras.layers.Dense(1, activation="linear")(dense_layer1)
model = tensorflow.keras.Model(inputs=input_layer, outputs=output_layer)

data_inputs = numpy.array([[0.02, 0.1, 0.15],
                           [0.7, 0.6, 0.8],
                           [1.5, 1.2, 1.7],
                           [3.2, 2.9, 3.1]])    
data_outputs = numpy.array([[0.1],
                            [0.6],
                            [1.3],
                            [2.5]])

num_models = 10
random_model_weights = create_rand_weights(model, num_models)

ExecutorClass = concurrent.futures.ThreadPoolExecutor
thread_output = []
with ExecutorClass(max_workers=2) as executor:
    output = executor.map(model_error, random_model_weights)
for out in output:
    thread_output.append(out)
thread_output=numpy.array(thread_output)
print("Wrong Outputs using Threads")
print(thread_output)

print("\n\n")

correct_output = []
for idx in range(num_models):
    error = model_error(random_model_weights[idx])
    correct_output.append(error)
correct_output=numpy.array(correct_output)
print("Correct Outputs without Threads")
print(correct_output)

This is the correct model outputs without using parallel processing:

[6.78012372 3.42922212 4.96738673 6.64474774 6.83102609 4.41165734 3.34482099 7.6132908  7.97145654 6.98378612]

This is the wrong model outputs without using parallel processing:

[3.42922212 3.42922212 6.90911246 6.64474774 4.41165734 3.34482099 7.6132908  7.97145654 6.98378612 6.98378612]

Even that I set a random seed for NumPy, the outputs using parallel processing still vary for different runs.

@ahmedfgad
Copy link
Owner

Thanks to https://stackoverflow.com/a/75606666/5426539.

Keras is not thread-safe. The issue can be simply solved by cloning the model before calling the predict() method.

model = ...
_model = tensorflow.keras.models.clone_model(model)
predictions = _model.predict(...)

The next release will solve the issue.

ahmedfgad added a commit that referenced this issue Apr 8, 2023
PyGAD 3.0.0 Release Notes
1. The structure of the library is changed and some methods defined in the `pygad.py` module are moved to the `pygad.utils`, `pygad.helper`, and `pygad.visualize` submodules.
  2. The `pygad.utils.parent_selection` module has a class named `ParentSelection` where all the parent selection operators exist. The `pygad.GA` class extends this class.
  3. The `pygad.utils.crossover` module has a class named `Crossover` where all the crossover operators exist. The `pygad.GA` class extends this class.
  4. The `pygad.utils.mutation` module has a class named `Mutation` where all the mutation operators exist. The `pygad.GA` class extends this class.
  5. The `pygad.helper.unique` module has a class named `Unique` some helper methods exist to solve duplicate genes and make sure every gene is unique. The `pygad.GA` class extends this class.
  6. The `pygad.visualize.plot` module has a class named `Plot` where all the methods that create plots exist. The `pygad.GA` class extends this class.

```python
...
class GA(utils.parent_selection.ParentSelection,
         utils.crossover.Crossover,
         utils.mutation.Mutation,
         helper.unique.Unique,
         visualize.plot.Plot):
...
```

2. Support of using the `logging` module to log the outputs to both the console and text file instead of using the `print()` function. This is by assigning the `logging.Logger` to the new `logger` parameter. Check the [Logging Outputs](https://pygad.readthedocs.io/en/latest/README_pygad_ReadTheDocs.html#logging-outputs) for more information.
3. A new instance attribute called `logger` to save the logger.
4. The function/method passed to the `fitness_func` parameter accepts a new parameter that refers to the instance of the `pygad.GA` class. Check this for an example: [Use Functions and Methods to Build Fitness Function and Callbacks](https://pygad.readthedocs.io/en/latest/README_pygad_ReadTheDocs.html#use-functions-and-methods-to-build-fitness-and-callbacks). #163
5. Update the documentation to include an example of using functions and methods to calculate the fitness and build callbacks. Check this for more details: [Use Functions and Methods to Build Fitness Function and Callbacks](https://pygad.readthedocs.io/en/latest/README_pygad_ReadTheDocs.html#use-functions-and-methods-to-build-fitness-and-callbacks). #92 (comment)
6. Validate the value passed to the `initial_population` parameter.
7. Validate the type and length of the `pop_fitness` parameter of the `best_solution()` method.
8. Some edits in the documentation. #106
9. Fix an issue when building the initial population as (some) genes have their value taken from the mutation range (defined by the parameters `random_mutation_min_val` and `random_mutation_max_val`) instead of using the parameters `init_range_low` and `init_range_high`.
10. The `summary()` method returns the summary as a single-line string. Just log/print the returned string it to see it properly.
11. The `callback_generation` parameter is removed. Use the `on_generation` parameter instead.
12. There was an issue when using the `parallel_processing` parameter with Keras and PyTorch. As Keras/PyTorch are not thread-safe, the `predict()` method gives incorrect and weird results when more than 1 thread is used. #145 ahmedfgad/TorchGA#5 ahmedfgad/KerasGA#6. Thanks to this [StackOverflow answer](https://stackoverflow.com/a/75606666/5426539).
13. Replace `numpy.float` by `float` in the 2 parent selection operators roulette wheel and stochastic universal. #168
ahmedfgad added a commit to ahmedfgad/KerasGA that referenced this issue Apr 8, 2023
There was an issue when using the `parallel_processing` parameter with Keras and PyTorch. As Keras/PyTorch are not thread-safe, the `predict()` method gives incorrect and weird results when more than 1 thread is used. ahmedfgad/GeneticAlgorithmPython#145 ahmedfgad/TorchGA#5 #6. Thanks to this StackOverflow answer https://stackoverflow.com/a/75606666/5426539.
ahmedfgad added a commit to ahmedfgad/TorchGA that referenced this issue Apr 8, 2023
There was an issue when using the `parallel_processing` parameter with Keras and PyTorch. As Keras/PyTorch are not thread-safe, the `predict()` method gives incorrect and weird results when more than 1 thread is used. ahmedfgad/GeneticAlgorithmPython#145 #5 ahmedfgad/KerasGA#6. Thanks to this StackOverflow answer https://stackoverflow.com/a/75606666/5426539.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants